提交 1a129849 创建 作者: Frans Kaashoek's avatar Frans Kaashoek

One gc thread moves a delay list to a tofree list

Each core's gc thread free elements on the tofree list
上级 cdaa456b
...@@ -9,20 +9,25 @@ ...@@ -9,20 +9,25 @@
#include "cpu.h" #include "cpu.h"
#include "kmtrace.h" #include "kmtrace.h"
#define NGC 1 // GC scheme based on Fraser's:
// a machine has a global_epoch
// 1 worker in total. more workers makes sense, if we have per-core process // a process maintain an epoch (>= global_epoch)
// lists. // one gc thread and state (e.g., NEPOCH delaylists and one tofreelists) per core
#define NWORKER 1 // a process add to its core epoch's delayed freelist on delayed_free
static struct { struct condvar cv __mpalign__; } rcu_cv[NWORKER]; // a gcc performs two jobs:
static struct { struct spinlock l __mpalign__; } gc_lock; // 1. one gcc thread perform step 1:
// updates a thread's epoch, when not in an epoch
// compute min over all process's epochs, and sets global_epoch to min
// move a core's (global_epoch-2)'s delayed list to a core's tofree list
// (costs linear in the number of processes.)
// 2. in parallel gc threads free the elements on the tofree list (till global_epoach)
// (costs linear in the number of elements to be freed)
enum { gc_debug = 0 }; enum { gc_debug = 0 };
struct gc { struct gc {
u64 epoch; u64 epoch;
struct gc *next; struct gc *next;
struct gc *free;
union { union {
struct { struct {
void (*dofree)(void *); void (*dofree)(void *);
...@@ -38,16 +43,23 @@ struct gc { ...@@ -38,16 +43,23 @@ struct gc {
int type; int type;
} __mpalign__; } __mpalign__;
u64 global_epoch __mpalign__; static struct gc_state {
struct condvar cv;
struct gc delayed[NEPOCH];
struct gc tofree[NEPOCH];
int ndelayed;
int min_epoch;
} __mpalign__ gc_state[NCPU] __mpalign__;
enum { rcu_debug = 0 }; static struct { struct spinlock l __mpalign__; } gc_lock;
u64 global_epoch __mpalign__;
struct gc * struct gc *
gc_alloc() gc_alloc()
{ {
struct gc *r = kmalloc(sizeof(struct gc)); struct gc *r = kmalloc(sizeof(struct gc));
assert(r); assert(r);
myproc()->ndelayed++; gc_state[mycpu()->id].ndelayed++;
return r; return r;
} }
...@@ -55,6 +67,11 @@ static void * ...@@ -55,6 +67,11 @@ static void *
gc_min(void *vkey, void *v, void *arg){ gc_min(void *vkey, void *v, void *arg){
u64 *min_epoch_p = arg; u64 *min_epoch_p = arg;
struct proc *p = (struct proc *) v; struct proc *p = (struct proc *) v;
acquire(&p->gc_epoch_lock);
if (p->epoch_depth == 0) {
p->epoch = global_epoch;
}
release(&p->gc_epoch_lock);
if (*min_epoch_p > p->epoch) { if (*min_epoch_p > p->epoch) {
*min_epoch_p = p->epoch; *min_epoch_p = p->epoch;
} }
...@@ -78,62 +95,70 @@ gc_free_elem(struct gc *r) ...@@ -78,62 +95,70 @@ gc_free_elem(struct gc *r)
} }
static int static int
gc_free_list(struct gc *head, u64 epoch) gc_free_tofreelist(struct gc **head, u64 epoch)
{ {
int nfree = 0; int nfree = 0;
struct gc *r, *nr; struct gc *r, *nr;
for (r = head; r != NULL; r = nr) { for (r = *head; r != NULL; r = nr) {
if (r->epoch > epoch) { if (r->epoch > epoch) {
cprintf("%lu %lu\n", r->epoch, epoch); cprintf("gc_free_tofreelist: r->epoch %ld > epoch %ld\n", r->epoch, epoch);
assert(0); assert(0);
} }
nr = r->next; nr = r->next;
gc_free_elem(r); gc_free_elem(r);
nfree++; nfree++;
} }
*head = r;
return nfree; return nfree;
} }
// move to free delayed list to free list so that a process can do its own freeing
// move to free delayed list to tofreelist so that a process can do its own freeing
void * void *
gc_move_to_free_proc(void *vkey, void *v, void *arg){ gc_move_to_tofree_cpu(int c, u64 epoch)
u64 *epoch = arg; {
struct proc *p = (struct proc *) v;
struct gc *head; struct gc *head;
uint32 fe = (*epoch - (NEPOCH-2)) % NEPOCH; uint32 fe = (epoch - (NEPOCH-2)) % NEPOCH;
int cas; int cas;
assert(p->gc_epoch[fe].epoch == *epoch-(NEPOCH-2)); // XXX race with setting epoch = 0 assert(gc_state[c].delayed[fe].epoch == epoch-(NEPOCH-2)); // XXX race with setting epoch = 0
// unhook list for fe epoch atomically // unhook list for fe epoch atomically; this shouldn't fail
head = p->gc_epoch[fe].next; head = gc_state[c].delayed[fe].next;
// this shouldn't fail, because no core is modifying it. cas = __sync_bool_compare_and_swap(&(gc_state[c].delayed[fe].next), head, 0);
cas = __sync_bool_compare_and_swap(&(p->gc_epoch[fe].next), head, 0);
assert(cas); assert(cas);
// insert list into local free list so that each core can do its own frees
assert (p->gc_epoch[fe].free == 0); // insert list into tofree list so that each core can free in parallel and free its elements
cas = __sync_bool_compare_and_swap(&(p->gc_epoch[fe].free), 0, head); if(gc_state[c].tofree[fe].epoch != gc_state[c].delayed[fe].epoch) {
cprintf("%d: tofree epoch %lu delayed epoch %lu\n", c, gc_state[c].tofree[fe].epoch,
gc_state[c].delayed[fe].epoch);
assert(0);
}
cas = __sync_bool_compare_and_swap(&(gc_state[c].tofree[fe].next), 0, head);
assert(cas); assert(cas);
assert(p->gc_epoch[fe].next == 0);
// move delayed NEPOCH's adhead
gc_state[c].delayed[fe].epoch += NEPOCH;
assert(gc_state[c].delayed[fe].next == 0);
return 0; return 0;
} }
// Fraser's reclaimation scheme: free all delayed-free items in global_epoch-2
// only one thread should call this function // only one thread should call this function
static void static void
gc_move_to_free(u64 epoch) gc_move_to_tofree(u64 epoch)
{ {
if (gc_debug) if (gc_debug)
cprintf("%d: free epoch %ld\n", myproc()->pid, epoch); cprintf("%d: free epoch %ld\n", mycpu()->id, epoch);
myproc()->rcu_read_depth++; // ensure ns_enumate's call to gc_begin_epoch doesn't call gc() for (int c = 0; c < NCPU; c++) {
ns_enumerate(nspid, gc_move_to_free_proc, &epoch); gc_move_to_tofree_cpu(c, epoch);
myproc()->rcu_read_depth--; }
int ok = __sync_bool_compare_and_swap(&global_epoch, epoch, epoch+1); int ok = __sync_bool_compare_and_swap(&global_epoch, epoch, epoch+1);
assert(ok); assert(ok);
} }
// If all threads have seen global_epoch, we can free elements in global_epoch-2 // If all threads have seen global_epoch, we can move elements in global_epoch-2 to tofreelist
static void static void
gc(void) gc_delayfreelist(void)
{ {
int r = tryacquire(&gc_lock.l); int r = tryacquire(&gc_lock.l);
if (r == 0) return; if (r == 0) return;
...@@ -141,11 +166,18 @@ gc(void) ...@@ -141,11 +166,18 @@ gc(void)
u64 global = global_epoch; u64 global = global_epoch;
u64 min = global; u64 min = global;
myproc()->rcu_read_depth++; // ensure ns_enumate's call to gc_begin_epoch doesn't call gc() // make that global_epoch doesn't run into a core's min_epoch
for (int c = 0; c < NCPU; c++) {
int w = gc_state[c].min_epoch + NEPOCH-1;
if (w < min) {
min = w;
}
}
myproc()->epoch_depth++;// ensure ns_enumate's call to gc_begin_epoch doesn't have sideeffects
ns_enumerate(nspid, gc_min, &min); ns_enumerate(nspid, gc_min, &min);
myproc()->rcu_read_depth--; myproc()->epoch_depth--;
if (min >= global) { if (min >= global) {
gc_move_to_free(min); gc_move_to_tofree(min);
} }
release(&gc_lock.l); release(&gc_lock.l);
} }
...@@ -154,18 +186,19 @@ static void ...@@ -154,18 +186,19 @@ static void
gc_delayed_int(struct gc *r) gc_delayed_int(struct gc *r)
{ {
pushcli(); pushcli();
int c = mycpu()->id;
u64 myepoch = myproc()->epoch; u64 myepoch = myproc()->epoch;
u64 minepoch = myproc()->gc_epoch[myepoch % NEPOCH].epoch; u64 minepoch = gc_state[c].delayed[myepoch % NEPOCH].epoch;
if (gc_debug) if (gc_debug)
cprintf("%d: gc_delayed: %lu ndelayed %d\n", myproc()->pid, global_epoch, myproc()->ndelayed); cprintf("(%d, %d): gc_delayed: %lu ndelayed %d\n", c, myproc()->pid, global_epoch, gc_state[c].ndelayed);
if (myepoch != minepoch) { if (myepoch != minepoch) {
cprintf("%d: myepoch %lu minepoch %lu\n", myproc()->pid, myepoch, minepoch); cprintf("%d: myepoch %lu minepoch %lu\n", myproc()->pid, myepoch, minepoch);
panic("gc_delayed_int"); panic("gc_delayed_int");
} }
r->epoch = myepoch; r->epoch = myepoch;
do { do {
r->next = myproc()->gc_epoch[myepoch % NEPOCH].next; r->next = gc_state[c].delayed[myepoch % NEPOCH].next;
} while (!__sync_bool_compare_and_swap(&(myproc()->gc_epoch[myepoch % NEPOCH].next), r->next, r)); } while (!__sync_bool_compare_and_swap(&(gc_state[c].delayed[myepoch % NEPOCH].next), r->next, r));
popcli(); popcli();
} }
...@@ -194,65 +227,32 @@ gc_delayed2(int a1, u64 a2, void (*dofree)(int,u64)) ...@@ -194,65 +227,32 @@ gc_delayed2(int a1, u64 a2, void (*dofree)(int,u64))
gc_delayed_int(r); gc_delayed_int(r);
} }
static void*
gc_free(void *vkey, void *v, void *arg)
{
struct proc *p = (struct proc *) v;
acquire(&p->gc_lock);
u64 global = global_epoch;
for (u64 epoch = p->epoch; epoch < global; epoch++) {
int j = (epoch - (NEPOCH - 2)) % NEPOCH;
assert(p->gc_epoch[j].epoch == epoch-2);
struct gc *free = p->gc_epoch[j].free;
int ok = __sync_bool_compare_and_swap(&(p->gc_epoch[j].free), free, NULL);
assert(ok);
int nfree = gc_free_list(free, epoch - 2);
p->ndelayed -= nfree;
if (gc_debug && nfree > 0)
cprintf("%d: epoch %d freed %d\n", p->pid, epoch - 2, nfree);
p->gc_epoch[j].epoch = p->gc_epoch[j].epoch + NEPOCH;
}
p->epoch = global; // not atomic, but it never goes backwards
__sync_synchronize();
release(&p->gc_lock);
return NULL;
}
void void
gc_start(void) gc_start(void)
{ {
cv_wakeup(&rcu_cv[0].cv); // NWORKER = 1 cv_wakeup(&gc_state[mycpu()->id].cv);
// cv_wakeup(&rcu_cv[mycpu()->id].cv);
} }
void void
gc_begin_epoch(void) gc_begin_epoch(void)
{ {
if (myproc() == NULL) return; if (myproc() == NULL) return;
if (myproc()->rcu_read_depth++ > 0) acquire(&myproc()->gc_epoch_lock);
return; if (myproc()->epoch_depth++ > 0)
gc_free(NULL, (void *) myproc(), NULL); goto done;
myproc()->epoch = global_epoch; // not atomic, but it never goes backwards
// __sync_synchronize();
done:
release(&myproc()->gc_epoch_lock);
} }
void void
gc_end_epoch(void) gc_end_epoch(void)
{ {
if (myproc() == NULL) return; if (myproc() == NULL) return;
if (--myproc()->rcu_read_depth > 0) acquire(&myproc()->gc_epoch_lock);
return; --myproc()->epoch_depth;
release(&myproc()->gc_epoch_lock);
#if 0
// kick gcc early if under memory pressure
int free = 0;
for (int j = 0; j < NEPOCH; j++) {
if (myproc()->gc_epoch[j].free)
free = 1;
}
u64 nd = myproc()->ndelayed;
if (!free && nd > NGC) {
gc_start();
}
#endif
} }
static void static void
...@@ -260,18 +260,25 @@ gc_worker(void *x) ...@@ -260,18 +260,25 @@ gc_worker(void *x)
{ {
struct spinlock wl; struct spinlock wl;
cprintf("gc_worker: %d\n", mycpu()->id);
initlock(&wl, "rcu_gc_worker dummy"); // dummy lock initlock(&wl, "rcu_gc_worker dummy"); // dummy lock
for (;;) { for (;;) {
u64 i;
acquire(&wl); acquire(&wl);
cv_sleep(&gc_state[mycpu()->id].cv, &wl);
myproc()->rcu_read_depth++; // call gc_free once for gc_worker
ns_enumerate(nspid, gc_free, NULL);
myproc()->rcu_read_depth--;
gc();
cv_sleep(&rcu_cv[0].cv, &wl); // NWORKER = 1
release(&wl); release(&wl);
u64 global = global_epoch;
for (i = gc_state[mycpu()->id].min_epoch; i < global-2; i++) {
int nfree = gc_free_tofreelist(&(gc_state[mycpu()->id].tofree[i%NEPOCH].next), i);
gc_state[mycpu()->id].tofree[i%NEPOCH].epoch += NEPOCH;
if (gc_debug && nfree > 0) {
cprintf("%d: epoch %d freed %d\n", mycpu()->id, i, nfree);
}
}
gc_state[mycpu()->id].min_epoch = i;
gc_delayfreelist();
} }
} }
...@@ -279,13 +286,7 @@ void ...@@ -279,13 +286,7 @@ void
initprocgc(struct proc *p) initprocgc(struct proc *p)
{ {
p->epoch = global_epoch; p->epoch = global_epoch;
p->gc_epoch = kmalloc(sizeof(struct gc) * NEPOCH); initlock(&p->gc_epoch_lock, "per process gc_lock");
initlock(&p->gc_lock, "per process gc_lock");
for (u64 i = global_epoch-2; i < global_epoch+2; i++) {
p->gc_epoch[i % NEPOCH].epoch = i;
p->gc_epoch[i % NEPOCH].free = NULL;
p->gc_epoch[i % NEPOCH].next = NULL;
}
} }
...@@ -295,12 +296,15 @@ initgc(void) ...@@ -295,12 +296,15 @@ initgc(void)
initlock(&gc_lock.l, "gc"); initlock(&gc_lock.l, "gc");
global_epoch = NEPOCH-2; global_epoch = NEPOCH-2;
for (int i = 0; i < NWORKER; i++) { for (int i = 0; i < NCPU; i++) {
initcondvar(&rcu_cv[i].cv, "rcu_gc_cv"); for (int j = 0; j < NEPOCH; j++) {
gc_state[i].delayed[j].epoch = j;
gc_state[i].tofree[j].epoch = j;
}
initcondvar(&gc_state[i].cv, "gc_cv");
} }
// one worker for now for (u32 c = 0; c < NCPU; c++) {
for (u32 c = 0; c < NWORKER; c++) {
struct proc *gcp; struct proc *gcp;
gcp = threadalloc(gc_worker, NULL); gcp = threadalloc(gc_worker, NULL);
......
...@@ -23,7 +23,6 @@ struct stat; ...@@ -23,7 +23,6 @@ struct stat;
struct proc; struct proc;
struct vmap; struct vmap;
struct pipe; struct pipe;
struct gc;
// bio.c // bio.c
void binit(void); void binit(void);
......
...@@ -54,10 +54,8 @@ struct proc { ...@@ -54,10 +54,8 @@ struct proc {
SLIST_ENTRY(proc) child_next; SLIST_ENTRY(proc) child_next;
struct condvar cv; struct condvar cv;
u64 epoch; u64 epoch;
u64 ndelayed; struct spinlock gc_epoch_lock;
struct gc *gc_epoch; u64 epoch_depth;
struct spinlock gc_lock;
u64 rcu_read_depth;
char lockname[16]; char lockname[16];
int on_runq; int on_runq;
int cpu_pin; int cpu_pin;
......
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论