Clean up wq workers

上级 a797cb71
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "filetable.hh" #include "filetable.hh"
class uwq; class uwq;
class uwq_worker;
// Saved registers for kernel context switches. // Saved registers for kernel context switches.
// (also implicitly defined in swtch.S) // (also implicitly defined in swtch.S)
...@@ -46,6 +47,7 @@ enum procstate { EMBRYO, SLEEPING, RUNNABLE, RUNNING, ZOMBIE }; ...@@ -46,6 +47,7 @@ enum procstate { EMBRYO, SLEEPING, RUNNABLE, RUNNING, ZOMBIE };
struct proc : public rcu_freed { struct proc : public rcu_freed {
struct vmap *vmap; // va -> vma struct vmap *vmap; // va -> vma
uwq* uwq; uwq* uwq;
uwq_worker* worker;
char *kstack; // Bottom of kernel stack for this process char *kstack; // Bottom of kernel stack for this process
volatile int pid; // Process ID volatile int pid; // Process ID
struct proc *parent; // Parent process struct proc *parent; // Parent process
......
...@@ -6,6 +6,18 @@ struct padded_length { ...@@ -6,6 +6,18 @@ struct padded_length {
}; };
#if defined (XV6_KERNEL) #if defined (XV6_KERNEL)
struct uwq;
struct uwq_worker {
long wait();
uwq* uwq;
bool running;
proc *proc;
struct spinlock lock;
struct condvar cv;
};
struct uwq : public referenced, public rcu_freed { struct uwq : public referenced, public rcu_freed {
static uwq* alloc(vmap* vmap, filetable *ftable); static uwq* alloc(vmap* vmap, filetable *ftable);
bool haswork(); bool haswork();
...@@ -13,11 +25,13 @@ struct uwq : public referenced, public rcu_freed { ...@@ -13,11 +25,13 @@ struct uwq : public referenced, public rcu_freed {
void* buffer(); void* buffer();
void setuentry(uptr uentry); void setuentry(uptr uentry);
// XXX(sbw) should be part of struct worker
void tryexit(uwq_worker *w);
virtual void do_gc(void) { delete this; } virtual void do_gc(void) { delete this; }
protected: protected:
virtual void onzero() const { gc_delayed((uwq*)this); } virtual void onzero() const;
private: private:
uwq(vmap* vmap, filetable* ftable, padded_length *len); uwq(vmap* vmap, filetable* ftable, padded_length *len);
...@@ -27,6 +41,7 @@ private: ...@@ -27,6 +41,7 @@ private:
proc* getworker(); proc* getworker();
proc* allocworker(); proc* allocworker();
void finishworkers(); void finishworkers();
void finish();
NEW_DELETE_OPS(uwq); NEW_DELETE_OPS(uwq);
struct spinlock lock_; struct spinlock lock_;
...@@ -35,12 +50,9 @@ private: ...@@ -35,12 +50,9 @@ private:
padded_length* len_; padded_length* len_;
uptr uentry_; uptr uentry_;
uptr ustack_; uptr ustack_;
std::atomic<u64> uref_;
struct worker { uwq_worker worker_[NCPU];
bool running;
proc *proc;
};
worker worker_[NCPU];
}; };
int uwq_trywork(void); int uwq_trywork(void);
......
...@@ -75,7 +75,7 @@ initworker(void) ...@@ -75,7 +75,7 @@ initworker(void)
pthread_setspecific(idkey, (void*)(u64)id); pthread_setspecific(idkey, (void*)(u64)id);
while (1) { while (1) {
if (!wq_trywork()) if (!wq_trywork())
wqwait(); assert(wqwait() == 0);
} }
} }
DEFINE_XV6_ADDRNOTE(xnote, XV6_ADDR_ID_WQ, &initworker); DEFINE_XV6_ADDRNOTE(xnote, XV6_ADDR_ID_WQ, &initworker);
......
...@@ -36,7 +36,7 @@ struct kstack_tag kstack_tag[NCPU]; ...@@ -36,7 +36,7 @@ struct kstack_tag kstack_tag[NCPU];
enum { sched_debug = 0 }; enum { sched_debug = 0 };
proc::proc(int npid) : proc::proc(int npid) :
rcu_freed("proc"), vmap(0), uwq(0), kstack(0), rcu_freed("proc"), vmap(0), uwq(0), worker(0), kstack(0),
pid(npid), parent(0), tf(0), context(0), killed(0), pid(npid), parent(0), tf(0), context(0), killed(0),
ftable(0), cwd(0), tsc(0), curcycles(0), cpuid(0), epoch(0), ftable(0), cwd(0), tsc(0), curcycles(0), cpuid(0), epoch(0),
on_runq(-1), cpu_pin(0), runq(0), oncv(0), cv_wakeup(0), on_runq(-1), cpu_pin(0), runq(0), oncv(0), cv_wakeup(0),
......
...@@ -55,6 +55,24 @@ uwq_trywork(void) ...@@ -55,6 +55,24 @@ uwq_trywork(void)
long long
sys_wqwait(void) sys_wqwait(void)
{ {
uwq_worker* w = myproc()->worker;
if (w == nullptr)
return -1;
return w->wait();
}
//
// uwq_worker
//
long
uwq_worker::wait(void)
{
acquire(&lock);
uwq->tryexit(this);
cv_sleep(&cv, &lock);
uwq->tryexit(this);
release(&lock);
return 0; return 0;
} }
...@@ -97,13 +115,21 @@ uwq::alloc(vmap* vmap, filetable *ftable) ...@@ -97,13 +115,21 @@ uwq::alloc(vmap* vmap, filetable *ftable)
uwq::uwq(vmap* vmap, filetable *ftable, padded_length *len) uwq::uwq(vmap* vmap, filetable *ftable, padded_length *len)
: rcu_freed("uwq"), : rcu_freed("uwq"),
vmap_(vmap), ftable_(ftable), len_(len), vmap_(vmap), ftable_(ftable), len_(len),
uentry_(0), ustack_(UWQSTACK) uentry_(0), ustack_(UWQSTACK), uref_(0)
{ {
for (int i = 0; i < NCPU; i++) for (int i = 0; i < NCPU; i++)
len_[i].v_ = 0; len_[i].v_ = 0;
initlock(&lock_, "uwq_lock", 0); initlock(&lock_, "uwq_lock", 0);
memset(worker_, 0, sizeof(worker_));
// XXX(sbw) move to uwq_worker constructor
for (int i = 0; i < NCPU; i++) {
initlock(&worker_[i].lock, "worker_lock", 0);
initcondvar(&worker_[i].cv, "worker_cv");
worker_[i].uwq = this;
worker_[i].running = false;
worker_[i].proc = nullptr;
}
} }
uwq::~uwq(void) uwq::~uwq(void)
...@@ -112,7 +138,18 @@ uwq::~uwq(void) ...@@ -112,7 +138,18 @@ uwq::~uwq(void)
ksfree(slab_userwq, len_); ksfree(slab_userwq, len_);
vmap_->decref(); vmap_->decref();
ftable_->decref(); ftable_->decref();
finishworkers(); }
void
uwq::tryexit(uwq_worker *w)
{
if (ref() == 0) {
if (--uref_ == 0)
gc_delayed(this);
release(&w->lock);
w->proc = nullptr;
exit();
}
} }
bool bool
...@@ -147,11 +184,29 @@ uwq::trywork(void) ...@@ -147,11 +184,29 @@ uwq::trywork(void)
} }
void void
uwq::finishworkers(void) uwq::finish(void)
{ {
for (int i = 0; i < NCPU; i++) bool gcnow = true;
if (worker_[i].proc != nullptr)
panic("uwq::finishworkers"); scoped_acquire lock0(&lock_);
for (int i = 0; i < NCPU; i++) {
if (worker_[i].proc != nullptr) {
gcnow = false;
acquire(&worker_[i].lock);
cv_wakeup(&worker_[i].cv);
release(&worker_[i].lock);
}
}
if (gcnow)
gc_delayed(this);
}
void
uwq::onzero() const
{
uwq *u = (uwq*)this;
u->finish();
} }
void* void*
...@@ -217,10 +272,14 @@ uwq::getworker(void) ...@@ -217,10 +272,14 @@ uwq::getworker(void)
scoped_acquire lockx(&lock_); scoped_acquire lockx(&lock_);
if (ref() == 0)
return nullptr;
for (int i = 0; i < NCPU; i++) { for (int i = 0; i < NCPU; i++) {
if (worker_[i].running) if (worker_[i].running)
continue; continue;
if (worker_[i].proc != nullptr) { if (worker_[i].proc != nullptr) {
panic("uwq::getworker: oops");
worker_[i].running = true; worker_[i].running = true;
return worker_[i].proc; return worker_[i].proc;
} else if (slot == -1) { } else if (slot == -1) {
...@@ -231,6 +290,8 @@ uwq::getworker(void) ...@@ -231,6 +290,8 @@ uwq::getworker(void)
if (slot != -1) { if (slot != -1) {
proc* p = allocworker(); proc* p = allocworker();
if (p != nullptr) { if (p != nullptr) {
++uref_;
p->worker = &worker_[slot];
worker_[slot].proc = p; worker_[slot].proc = p;
worker_[slot].running = true; worker_[slot].running = true;
return worker_[slot].proc; return worker_[slot].proc;
......
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论