Some rough kernel code to start WQ workers on demand.

Missing code to cleanup worker state.
上级 f91ddbe4
......@@ -43,7 +43,6 @@ int
main(int ac, char **av)
{
initwq();
sleep(5);
test();
exitwq();
printf("all done!\n");
......
......@@ -10,7 +10,7 @@ extern pgmap kpml4;
void freevm(pgmap *pml4);
pgmap* setupkvm(void);
int setupuvm(pgmap *pml4, char *kshared, char *uwq);
int mapkva(pgmap *pml4, char* kva, uptr uva, size_t size);
std::atomic<pme_t>* walkpgdir(pgmap *pml4, u64, int);
void tlbflush(void);
......
......@@ -51,6 +51,7 @@ long sys_pread(int fd, void *ubuf, size_t count, off_t offset);
long sys_async(int, size_t, off_t, u32, u32);
long sys_script(void *addr, u64 len, u64 chunk);
long sys_setfs(u64 base);
long sys_wqwait(void);
extern long (*syscalls[])(u64, u64, u64, u64, u64);
// other exported/imported functions
......
......@@ -3,3 +3,4 @@
#define KSHARED 0xFFFFF00000000000ull
#define USERWQ 0xFFFFF00100000000ull
#define USERTOP 0x0000800000000000ull
#define UWQSTACK 0x0000700000000000ull
......@@ -7,6 +7,8 @@
#include "file.hh"
#include "filetable.hh"
class uwq;
// Saved registers for kernel context switches.
// (also implicitly defined in swtch.S)
struct context {
......@@ -43,6 +45,7 @@ enum procstate { EMBRYO, SLEEPING, RUNNABLE, RUNNING, ZOMBIE };
// Per-process state
struct proc : public rcu_freed {
struct vmap *vmap; // va -> vma
uwq* uwq;
char *kstack; // Bottom of kernel stack for this process
volatile int pid; // Process ID
struct proc *parent; // Parent process
......
......@@ -31,4 +31,5 @@
#define SYS_async 30
#define SYS_script 31
#define SYS_setfs 32
#define SYS_ncount 33 /* total number of system calls */
#define SYS_wqwait 33
#define SYS_ncount 34 /* total number of system calls */
......@@ -6,27 +6,40 @@ struct padded_length {
};
#if defined (XV6_KERNEL)
struct uwq {
uwq(vmap* vmap, padded_length *len);
~uwq();
struct uwq : public referenced, public rcu_freed {
static uwq* alloc(vmap* vmap, filetable *ftable);
bool haswork();
int trywork();
void* buffer();
void setuentry(uptr uentry);
virtual void do_gc(void) { delete this; }
protected:
virtual void onzero() const { gc_delayed((uwq*)this); }
private:
uwq(vmap* vmap, filetable* ftable, padded_length *len);
~uwq();
uwq& operator=(const uwq&);
uwq(const uwq& x);
proc* getworker();
proc* allocworker();
void finishworkers();
NEW_DELETE_OPS(uwq);
struct spinlock lock_;
vmap* vmap_;
filetable* ftable_;
padded_length* len_;
uptr uentry_;
uptr ustack_;
struct worker {
int running;
bool running;
proc *proc;
};
worker worker_[NCPU];
};
......
......@@ -81,7 +81,7 @@ struct vmap : public rcu_freed {
bool replace_vma(vma *a, vma *b);
void decref();
bool tryinc();
void incref();
vmap* copy(int share);
vma* lookup(uptr start, uptr len);
......@@ -94,7 +94,6 @@ struct vmap : public rcu_freed {
virtual void do_gc(void) { delete this; }
uptr brk_; // Top of heap
uwq uwq_;
private:
vmap();
......
......@@ -5,17 +5,17 @@
#include "wq.hh"
#include "pthread.h"
#include "memlayout.h"
#include "uwq.hh"
#include "atomic.hh"
#include "lib.h"
#include "elf.hh"
typedef struct uspinlock wqlock_t;
static pthread_key_t idkey;
static std::atomic<int> nextid;
static volatile int exiting;
struct padded_length {
volatile u64 v_ __mpalign__;;
__padout__;
};
int
mycpuid(void)
{
......@@ -64,42 +64,29 @@ wqlock_init(wqlock_t *lock)
initlock(lock);
}
static void
setaffinity(int c)
{
// XXX(sbw)
}
extern "C" long wqwait(void);
static void*
workerth(void *x)
static void __attribute__((used))
initworker(void)
{
u64 c = (u64)x;
setaffinity(c);
pthread_setspecific(idkey, (void*)c);
while (!exiting)
wq_trywork();
return 0;
int id;
forkt_setup(0);
id = nextid++;
pthread_setspecific(idkey, (void*)(u64)id);
while (1) {
if (!wq_trywork())
wqwait();
}
}
DEFINE_XV6_ADDRNOTE(xnote, XV6_ADDR_ID_WQ, &initworker);
static inline void
wqarch_init(void)
{
pthread_t th;
int r;
if (pthread_key_create(&idkey, 0))
die("wqarch_init: pthread_key_create");
pthread_setspecific(idkey, 0);
setaffinity(0);
for (int i = 1; i < NCPU; i++) {
r = pthread_create(&th, 0, workerth, (void*)(u64)i);
if (r < 0)
die("wqarch_init: pthread_create");
}
}
static inline void
......
......@@ -61,5 +61,5 @@ long (*syscalls[])(u64, u64, u64, u64, u64) = {
SYSCALL(async),
SYSCALL(script),
SYSCALL(setfs),
SYSCALL(wqwait),
};
......@@ -27,7 +27,7 @@ struct eargs {
};
static int
donotes(struct inode *ip, vmap *vmap, u64 off)
donotes(struct inode *ip, uwq *uwq, u64 off)
{
struct proghdr ph;
struct elfnote note;
......@@ -49,7 +49,7 @@ donotes(struct inode *ip, vmap *vmap, u64 off)
return -1;
if (desc.id == XV6_ADDR_ID_WQ) {
vmap->uwq_.setuentry(desc.vaddr);
uwq->setuentry(desc.vaddr);
return 0;
}
}
......@@ -179,6 +179,7 @@ exec(const char *path, char **argv)
{
struct inode *ip = nullptr;
struct vmap *vmp = nullptr;
uwq* uwq = nullptr;
struct elfhdr elf;
struct proghdr ph;
u64 off;
......@@ -201,6 +202,9 @@ exec(const char *path, char **argv)
if((vmp = vmap::alloc()) == 0)
goto bad;
if((uwq = uwq::alloc(vmp, myproc()->ftable)) == 0)
goto bad;
// Arguments for work queue
struct eargs args;
args.proc = myproc();
......@@ -217,7 +221,7 @@ exec(const char *path, char **argv)
sizeof(type)) != sizeof(type))
goto bad;
if (type == ELF_PROG_NOTE) {
if (donotes(ip, vmp, off) < 0) {
if (donotes(ip, uwq, off) < 0) {
cilk_abort(-1);
break;
}
......@@ -238,7 +242,10 @@ exec(const char *path, char **argv)
// Commit to the user image.
oldvmap = myproc()->vmap;
myproc()->vmap = vmp;
myproc()->tf->rip = elf.entry; // main
if (myproc()->uwq != nullptr)
myproc()->uwq->dec();
myproc()->uwq = uwq;
myproc()->tf->rip = elf.entry;
switchvm(myproc());
oldvmap->decref();
......@@ -250,7 +257,8 @@ exec(const char *path, char **argv)
cprintf("exec failed\n");
if(vmp)
vmp->decref();
if(uwq)
uwq->dec();
gc_end_epoch();
return 0;
}
......@@ -93,6 +93,18 @@ setupkvm(void)
}
int
mapkva(pgmap *pml4, char* kva, uptr uva, size_t size)
{
for (u64 off = 0; off < size; off+=4096) {
atomic<pme_t> *pte = walkpgdir(pml4, (u64) (uva+off), 1);
if (pte == nullptr)
return -1;
*pte = v2p(kva+off) | PTE_P | PTE_U | PTE_W;
}
return 0;
}
int
setupuvm(pgmap *pml4, char *kshared, char *uwq)
{
struct todo {
......
......@@ -120,6 +120,10 @@ idleloop(void)
// XXX(sbw)
worked = uwq_trywork();
if (worked == 1) {
cprintf("did some work..\n");
break;
}
worked = wq_trywork();
// If we are no longer the idle thread, exit
......
......@@ -36,7 +36,7 @@ struct kstack_tag kstack_tag[NCPU];
enum { sched_debug = 0 };
proc::proc(int npid) :
rcu_freed("proc"), vmap(0), kstack(0),
rcu_freed("proc"), vmap(0), uwq(0), kstack(0),
pid(npid), parent(0), tf(0), context(0), killed(0),
ftable(0), cwd(0), tsc(0), curcycles(0), cpuid(0), epoch(0),
on_runq(-1), cpu_pin(0), runq(0), oncv(0), cv_wakeup(0),
......@@ -376,6 +376,8 @@ finishproc(struct proc *p)
{
if (p->vmap != nullptr)
p->vmap->decref();
if (p->uwq != nullptr)
p->uwq->dec();
ksfree(slab_stack, p->kstack);
p->kstack = 0;
if (!xnspid->remove(p->pid, &p))
......
......@@ -158,6 +158,7 @@ trap(struct trapframe *tf)
#endif
return;
}
cprintf("pagefault: failed\n");
cli();
}
......
//
// XXX
// - vmap doesn't need to be rcu_freed anymore
// - workers should have a uwq
// - pin workers
#include "types.h"
#include "amd64.h"
#include "kernel.hh"
......@@ -6,10 +12,14 @@
#include "percpu.hh"
#include "spinlock.h"
#include "condvar.h"
#include "uwq.hh"
#include "proc.hh"
#include "uwq.hh"
#include "vm.hh"
#include "kalloc.hh"
#include "bits.hh"
extern "C" {
#include "kern_c.h"
}
int
uwq_trywork(void)
......@@ -28,12 +38,13 @@ uwq_trywork(void)
scoped_gc_epoch xgc();
barrier();
struct proc *p = c->proc;
if (p == nullptr || p->vmap == nullptr)
if (p == nullptr || p->uwq == nullptr)
continue;
uwq* uwq = &p->vmap->uwq_;
uwq* uwq = p->uwq;
if (uwq->haswork()) {
uwq->trywork();
if (uwq->trywork() == 1)
return 1;
// XXX(sbw) start a worker thread..
break;
}
......@@ -42,18 +53,55 @@ uwq_trywork(void)
return 0;
}
long
sys_wqwait(void)
{
return 0;
}
//
// uwq
//
uwq::uwq(vmap* vmap, padded_length *len)
: vmap_(vmap), len_(len)
uwq*
uwq::alloc(vmap* vmap, filetable *ftable)
{
padded_length* len;
uwq* u;
len = (padded_length*) ksalloc(slab_userwq);
if (len == nullptr)
return nullptr;
ftable->incref();
vmap->incref();
u = new uwq(vmap, ftable, len);
if (u == nullptr) {
ftable->decref();
vmap->decref();
ksfree(slab_userwq, len);
return nullptr;
}
u->inc();
if (mapkva(vmap->pml4, (char*)len, USERWQ, USERWQSIZE)) {
ftable->decref();
vmap->decref();
ksfree(slab_userwq, len);
u->dec();
return nullptr;
}
return u;
}
uwq::uwq(vmap* vmap, filetable *ftable, padded_length *len)
: rcu_freed("uwq"),
vmap_(vmap), ftable_(ftable), len_(len),
uentry_(0), ustack_(UWQSTACK)
{
if (len_ != nullptr) {
for (int i = 0; i < NCPU; i++)
len_[i].v_ = 0;
} else {
cprintf("uwq::uwq: nullptr len\n");
}
initlock(&lock_, "uwq_lock", 0);
memset(worker_, 0, sizeof(worker_));
......@@ -63,7 +111,9 @@ uwq::~uwq(void)
{
if (len_ != nullptr)
ksfree(slab_userwq, len_);
// XXX(sbw) clean up worker procs
vmap_->decref();
ftable_->decref();
finishworkers();
}
bool
......@@ -89,13 +139,23 @@ uwq::trywork(void)
if (p == nullptr)
return -1;
if (!vmap_->tryinc())
return -1;
// XXX(sbw) filetable, etc
p->cpuid = mycpuid();
// XXX(sbw)
vmap_->decref();
panic("XXX");
return 0;
acquire(&p->lock);
addrun(p);
release(&p->lock);
cprintf("trying to run..\n");
return 1;
}
void
uwq::finishworkers(void)
{
for (int i = 0; i < NCPU; i++)
if (worker_[i].proc != nullptr)
panic("uwq::finishworkers");
}
void*
......@@ -104,6 +164,57 @@ uwq::buffer(void)
return (void*)len_;
}
void
uwq::setuentry(uptr uentry)
{
uentry_ = uentry;
}
proc*
uwq::allocworker(void)
{
uptr uentry = uentry_;
if (uentry == 0)
return nullptr;
proc* p = allocproc();
if (p == nullptr)
return nullptr;
safestrcpy(p->name, "uwq_worker", sizeof(p->name));
vmap_->incref();
ftable_->incref();
// finishproc will drop these refs
p->vmap = vmap_;
p->ftable = ftable_;
struct vmnode *vmn;
if ((vmn = new vmnode(UWQSTACKPAGES)) == nullptr) {
finishproc(p);
return nullptr;
}
uptr stacktop = ustack_ + (UWQSTACKPAGES*PGSIZE);
if (vmap_->insert(vmn, ustack_, 1) < 0) {
delete vmn;
finishproc(p);
return nullptr;
}
ustack_ += (UWQSTACKPAGES*PGSIZE);
p->tf->rsp = stacktop - 8;
cprintf("stacktop %lx\n", stacktop);
p->tf->rip = uentry;
p->tf->cs = UCSEG | 0x3;
p->tf->ds = UDSEG | 0x3;
p->tf->ss = p->tf->ds;
p->tf->rflags = FL_IF;
return p;
}
proc*
uwq::getworker(void)
{
......@@ -112,8 +223,10 @@ uwq::getworker(void)
scoped_acquire lockx(&lock_);
for (int i = 0; i < NCPU; i++) {
if (worker_[i].running)
continue;
if (worker_[i].proc != nullptr) {
worker_[i].running = 1;
worker_[i].running = true;
return worker_[i].proc;
} else if (slot == -1) {
slot = i;
......@@ -121,10 +234,10 @@ uwq::getworker(void)
}
if (slot != -1) {
proc* p = allocproc();
proc* p = allocworker();
if (p != nullptr) {
worker_[slot].proc = p;
worker_[slot].running = 1;
worker_[slot].running = true;
return worker_[slot].proc;
}
}
......
......@@ -154,7 +154,7 @@ vmap::vmap() :
rx(PGSHIFT),
#endif
ref(1), pml4(setupkvm()), kshared((char*) ksalloc(slab_kshared)),
brk_(0), uwq_(this, (padded_length*) ksalloc(slab_userwq))
brk_(0)
{
initlock(&brklock_, "brk_lock", LOCKSTAT_VM);
if (pml4 == 0) {
......@@ -167,13 +167,8 @@ vmap::vmap() :
goto err;
}
if (uwq_.buffer() == nullptr) {
cprintf("vmap::vmap: userwq out of memory\n");
goto err;
}
if (setupuvm(pml4, kshared, (char*)uwq_.buffer())) {
cprintf("vmap::vmap: setupkshared out of memory\n");
if (mapkva(pml4, kshared, KSHARED, KSHAREDSIZE)) {
cprintf("vmap::vmap: mapkva out of memory\n");
goto err;
}
......@@ -202,18 +197,10 @@ vmap::decref()
gc_delayed(this);
}
bool
vmap::tryinc()
void
vmap::incref()
{
u64 o;
do {
o = ref.load();
if (o == 0)
return false;
} while (!cmpxch(&ref, o, o+1));
return true;
++ref;
}
bool
......
......@@ -48,3 +48,4 @@ SYSCALL(pread)
SYSCALL(async)
SYSCALL(script)
SYSCALL(setfs)
SYSCALL(wqwait)
......@@ -24,6 +24,7 @@
#define ALLOC_MEMSET DEBUG
#define KSHAREDSIZE (32 << 10)
#define USERWQSIZE (1 << 14)
#define UWQSTACKPAGES 2
#define WQSHIFT 7
#define CILKENABLE 0
#if defined(HW_josmp)
......
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论