提交 c7d9a2ad 创建 作者: Silas Boyd-Wickizer's avatar Silas Boyd-Wickizer

an xv6 version of my languishing work queue implementation..

上级 3e89a345
......@@ -58,6 +58,7 @@ OBJS = \
vm.o \
trap.o \
trapasm.o \
wq.o
ULIB = ulib.o usys.o printf.o umalloc.o uthread.o
......
......@@ -17,10 +17,12 @@ static inline void *p2v(uptr a) { return (void *) a + KBASE; }
#define cmpswap(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new)
#define subfetch(ptr, val) __sync_sub_and_fetch(ptr, val)
#define fetchadd(ptr, val) __sync_fetch_and_add(ptr, val)
#define __offsetof offsetof
struct spinlock;
struct condvar;
struct wqframe;
struct context;
struct vmnode;
struct inode;
......@@ -270,3 +272,9 @@ int vmn_load(struct vmnode *, struct inode*, u64, u64);
int vmap_remove(struct vmap *, uptr, u64);
void updatepages(pml4e_t*, void*, void*, int);
struct vmap * vmap_copy(struct vmap *, int);
// wq.c
void wq_push(void *rip, u64 arg0, u64 arg1);
void wq_start(void);
void wq_end(void);
void initwqframe(struct wqframe *wq);
......@@ -23,6 +23,7 @@ extern void initinode(void);
extern void initdisk(void);
extern void inituser(void);
extern void inithz(void);
extern void initwq(void);
static volatile int bstate;
......@@ -95,6 +96,7 @@ cmain(void)
initbio(); // buffer cache
initinode(); // inode cache
initdisk(); // disk
initwq(); // work queues
cprintf("ncpu %d %lu MHz\n", ncpu, cpuhz / 1000000);
......
......@@ -17,3 +17,4 @@
#define PHYSTOP 0xE000000 // use phys mem up to here as free pool
#define CPUKSTACKS (NPROC + NCPU)
#define QUANTUN 10 // scheduling time quantum and tick length (in msec)
#define WQSHIFT 4 // 2^WORKSHIFT work queue slots
......@@ -196,6 +196,7 @@ allocproc(void)
snprintf(p->lockname, sizeof(p->lockname), "cv:proc:%d", p->pid);
initlock(&p->lock, p->lockname+3);
initcondvar(&p->cv, p->lockname);
initwqframe(&p->wqframe);
if (ns_insert(nspid, KI(p->pid), (void *) p) < 0)
panic("allocproc: ns_insert");
......@@ -314,6 +315,10 @@ scheduler(void)
mycpu()->proc = schedp;
myproc()->cpu_pin = 1;
// Test the work queue
extern void testwq(void);
testwq();
// Enabling mtrace calls in scheduler generates many mtrace_call_entrys.
// mtrace_call_set(1, cpu->id);
mtstart(scheduler, schedp);
......
......@@ -12,6 +12,11 @@ struct context {
u64 rip;
} __attribute__((packed));
// Work queue frame
struct wqframe {
volatile u64 ref;
};
// Per-process, per-stack meta data for mtrace
#if MTRACE
#define MTRACE_NSTACKS 16
......@@ -59,6 +64,7 @@ struct proc {
struct mtrace_stacks mtrace_stacks;
#endif
struct runq *runq;
struct wqframe wqframe;
STAILQ_ENTRY(proc) runqlink;
};
......
#include "types.h"
#include "kernel.h"
#include "param.h"
#include "amd64.h"
#include "cpu.h"
#include "bits.h"
#include "spinlock.h"
#include "condvar.h"
#include "queue.h"
#include "proc.h"
#define NSLOTS (1 << WQSHIFT)
struct wqueue {
struct wqthread *thread[NSLOTS];
volatile int head __mpalign__;
struct spinlock lock;
volatile int tail;
__padout__;
} __mpalign__;
struct wqthread {
u64 rip;
u64 arg0;
u64 arg1;
volatile u64 *ref; // pointer to parent wqframe.ref
__padout__;
} __mpalign__;
struct wqstat {
u64 push;
u64 full;
u64 pop;
u64 steal;
__padout__;
} __mpalign__;
struct wqueue queue[NCPU] __mpalign__;
struct wqstat stat[NCPU] __mpalign__;
static struct wqueue *
wq_cur(void)
{
return &queue[mycpu()->id];
}
static struct wqframe *
wq_frame(void)
{
return &myproc()->wqframe;
}
static struct wqstat *
wq_stat(void)
{
return &stat[mycpu()->id];
}
static int
__wq_push(struct wqueue *q, struct wqthread *t)
{
int i;
i = q->head;
if ((i - q->tail) == NSLOTS) {
wq_stat()->full++;
return -1;
}
i = i & (NSLOTS-1);
q->thread[i] = t;
q->head++;
wq_stat()->push++;
return 0;
}
static struct wqthread *
__wq_pop(struct wqueue *q)
{
int i;
acquire(&q->lock);
i = q->head;
if ((i - q->tail) == 0) {
release(&q->lock);
return NULL;
}
i = (i-1) & (NSLOTS-1);
q->head--;
release(&q->lock);
wq_stat()->pop++;
return q->thread[i];
}
static struct wqthread *
__wq_steal(struct wqueue *q)
{
int i;
acquire(&q->lock);
i = q->tail;
if ((i - q->head) == 0) {
release(&q->lock);
return NULL;
}
i = i & (NSLOTS-1);
q->tail++;
release(&q->lock);
wq_stat()->steal++;
return q->thread[i];
}
static void
__wq_run(struct wqthread *th)
{
void (*fn)(uptr arg0, uptr arg1) = (void*)th->rip;
fn(th->arg0, th->arg1);
subfetch(th->ref, 1);
}
// Add the (rip, arg0, arg1) work to the local work queue.
// Guarantees some core will at some point execute the work.
// The current core might execute the work immediately.
void
wq_push(void *rip, u64 arg0, u64 arg1)
{
void (*fn)(uptr, uptr) = rip;
struct wqthread *th;
th = (struct wqthread *) kalloc();
if (th == NULL) {
fn(arg0, arg1);
return;
}
th->rip = (uptr) rip;
th->arg0 = arg0;
th->arg1 = arg1;
th->ref = &wq_frame()->ref;
if (__wq_push(wq_cur(), th))
fn(arg0, arg1);
else
fetchadd(&wq_frame()->ref, 1);
}
// Try to execute one wqthread.
// Check local queue then steal from other queues.
int
wq_trywork(void)
{
struct wqthread *th;
int i;
th = __wq_pop(wq_cur());
if (th != NULL) {
__wq_run(th);
return 1;
}
// XXX(sbw) should be random
for (i = 0; i < NCPU; i++) {
if (i == mycpu()->id)
continue;
th = __wq_steal(&queue[i]);
if (th != NULL) {
__wq_run(th);
return 1;
}
}
return 0;
}
void
wq_start(void)
{
pushcli();
if (myproc()->wqframe.ref != 0)
panic("wq_start");
}
void
wq_end(void)
{
while (myproc()->wqframe.ref != 0) {
struct wqthread *th;
int i;
while ((th = __wq_pop(wq_cur())) != NULL)
__wq_run(th);
for (i = 0; i < NCPU; i++) {
th = __wq_steal(&queue[i]);
if (th != NULL) {
__wq_run(th);
break;
}
}
}
popcli();
}
static void
__test_stub(uptr a0, uptr a1)
{
//cprintf("%lu, %lu\n", a0, a1);
}
void
testwq(void)
{
enum { iters = 1000 };
static volatile int running = 1;
u64 e, s;
int i;
pushcli();
if (mycpu()->id == 0) {
microdelay(1);
s = rdtsc();
wq_start();
for (i = 0; i < iters; i++)
wq_push(__test_stub, i, i);
wq_end();
e = rdtsc();
cprintf("testwq: %lu\n", (e-s)/iters);
for (i = 0; i < NCPU; i++)
cprintf("push %lu full %lu pop %lu steal %lu\n",
stat[i].push, stat[i].full, stat[i].pop, stat[i].steal);
running = 0;
} else {
while (running)
wq_trywork();
}
popcli();
}
void
initwqframe(struct wqframe *wq)
{
memset(wq, 0, sizeof(*wq));
}
void
initwq(void)
{
int i;
for (i = 0; i < NCPU; i++) {
initlock(&queue[i].lock, "queue lock");
}
}
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论