uwq tweaks

上级 8c6d5d22
......@@ -9,13 +9,16 @@ struct padded_length {
struct uwq;
struct uwq_worker {
uwq_worker(uwq*, proc*);
long wait();
uwq* uwq;
bool running;
proc *proc;
struct spinlock lock;
struct condvar cv;
uwq* uwq_;
proc *proc_;
bool running_;
struct spinlock lock_;
struct condvar cv_;
NEW_DELETE_OPS(uwq_worker);
};
struct uwq : public referenced, public rcu_freed {
......@@ -51,7 +54,7 @@ private:
uptr ustack_;
std::atomic<u64> uref_;
uwq_worker worker_[NCPU];
uwq_worker* worker_[NCPU];
};
int uwq_trywork(void);
......
......@@ -65,15 +65,22 @@ sys_wqwait(void)
//
// uwq_worker
//
uwq_worker::uwq_worker(uwq* u, proc* p)
: uwq_(u), proc_(p), running_(false)
{
initlock(&lock_, "worker_lock", 0);
initcondvar(&cv_, "worker_cv");
}
long
uwq_worker::wait(void)
{
acquire(&lock);
uwq->tryexit(this);
running = false;
cv_sleep(&cv, &lock);
uwq->tryexit(this);
release(&lock);
acquire(&lock_);
uwq_->tryexit(this);
running_ = false;
cv_sleep(&cv_, &lock_);
uwq_->tryexit(this);
release(&lock_);
return 0;
}
......@@ -122,15 +129,7 @@ uwq::uwq(vmap* vmap, filetable *ftable, padded_length *len)
len_[i].v_ = 0;
initlock(&lock_, "uwq_lock", 0);
// XXX(sbw) move to uwq_worker constructor
for (int i = 0; i < NCPU; i++) {
initlock(&worker_[i].lock, "worker_lock", 0);
initcondvar(&worker_[i].cv, "worker_cv");
worker_[i].uwq = this;
worker_[i].running = false;
worker_[i].proc = nullptr;
}
memset(worker_, 0, sizeof(worker_));
}
uwq::~uwq(void)
......@@ -147,8 +146,8 @@ uwq::tryexit(uwq_worker *w)
if (ref() == 0) {
if (--uref_ == 0)
gc_delayed(this);
release(&w->lock);
w->proc = nullptr;
release(&w->lock_);
delete w;
exit();
}
}
......@@ -177,38 +176,46 @@ uwq::trywork(void)
int slot = -1;
for (int i = 0; i < NCPU; i++) {
if (worker_[i].running)
if (worker_[i] == nullptr) {
if (slot == -1)
slot = i;
continue;
}
uwq_worker *w = worker_[i];
if (w->running_)
continue;
else if (worker_[i].proc != nullptr) {
scoped_acquire lock1(&worker_[i].lock);
proc* p = worker_[i].proc;
else {
scoped_acquire lock1(&w->lock_);
proc* p = w->proc_;
//cprintf("uwq::trywork: untested\n");
cprintf("uwq::trywork: untested\n");
acquire(&p->lock);
p->cpuid = mycpuid();
release(&p->lock);
worker_[i].running = true;
cv_wakeup(&worker_[i].cv);
w->running_ = true;
cv_wakeup(&w->cv_);
return true;
} else if (slot == -1) {
slot = i;
}
}
if (slot != -1) {
proc* p = allocworker();
if (p != nullptr) {
uwq_worker* w = new uwq_worker(this, p);
assert(w != nullptr);
++uref_;
p->worker = &worker_[slot];
worker_[slot].proc = p;
worker_[slot].running = true;
p->worker = w;
w->running_ = true;
acquire(&p->lock);
p->cpuid = mycpuid();
addrun(p);
release(&p->lock);
worker_[slot] = w;
return true;
}
}
......@@ -223,11 +230,12 @@ uwq::finish(void)
scoped_acquire lock0(&lock_);
for (int i = 0; i < NCPU; i++) {
if (worker_[i].proc != nullptr) {
if (worker_[i] != nullptr) {
uwq_worker* w = worker_[i];
gcnow = false;
acquire(&worker_[i].lock);
cv_wakeup(&worker_[i].cv);
release(&worker_[i].lock);
acquire(&w->lock_);
cv_wakeup(&w->cv_);
release(&w->lock_);
}
}
......
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论