One wq.cc file for xv6 kernel and Linux userspace

上级 a9331f63
......@@ -9,6 +9,7 @@
#include <stdint.h>
#include <stddef.h>
#include <errno.h>
typedef uint64_t u64;
#include "include/wq.hh"
#include "user/dirit.hh"
#define ST_SIZE(st) (st).st_size
......@@ -22,6 +23,7 @@
#include "user.h"
#include "lib.h"
#include "fs.h"
#include "uspinlock.h"
#include "wq.hh"
#include "dirit.hh"
#define ST_SIZE(st) (st).size
......@@ -55,7 +57,7 @@ du(int fd)
int nfd = openat(fd, name, 0);
if (nfd >= 0)
;//size += du(nfd); // should go into work queue
size += du(nfd); // should go into work queue
free((void*)name);
});
}
......@@ -67,18 +69,6 @@ du(int fd)
int
main(int ac, char **av)
{
dirit di(open(".", 0));
wq_for<dirit>(di,
[](dirit &i)->bool { return !i.end(); },
[&](const char *name)->void
{
printf("no..\n");
});
//initwq();
printf("%d\n", du(open(".", 0)));
return 0;
}
#ifdef XV6_KERNEL
#if defined(XV6_KERNEL)
typedef struct spinlock wqlock_t;
#elif defined(LINUX)
typedef pthread_spinlock_t wqlock_t;
#else
typedef int wqlock_t;
typedef struct uspinlock wqlock_t;
#endif
#include "percpu.hh"
......@@ -63,45 +65,12 @@ struct work * allocwork(void);
void freework(struct work *w);
int wq_push(work *w);
template<typename IT, typename BODY>
struct for_work : public work {
for_work(IT &it, BODY body) : it_(it), body_(body) {}
virtual void run() { printf("hi %s\n", *it_); }
IT &it_;
bool (*cond_)(IT &it);
BODY &body_;
};
static inline void
wq_push_cpp(work *w)
{
w->run();
}
template <typename IT, typename BODY>
static void
wq_for_one(struct work *w, void *a0, void *a1, void *a2)
{
}
template <typename IT, typename BODY>
static inline void
wq_for(IT &init, bool (*cond)(IT &it), BODY body)
{
for_work<IT, BODY> goo(init, body);
wq_push_cpp(&goo);
#if 0
BODY foo = body;
// XXX(sbw) should be able to coarsen loop
for (IT &it = init; cond(it); ++it) {
foo(*it);
body(*it);
}
#endif
}
#if defined(LINUX)
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <pthread.h>
#include "include/types.h"
#include "include/wq.hh"
static __thread int myid_;
int
mycpuid(void)
{
return myid_;
}
static inline void*
allocwq(void)
{
return malloc(sizeof(wq));
}
static inline void
wqlock_acquire(wqlock_t *lock)
{
pthread_spin_lock(lock);
}
static inline int
wqlock_tryacquire(wqlock_t *lock)
{
return (pthread_spin_trylock(lock) == 0);
}
static inline void
wqlock_release(wqlock_t *lock)
{
pthread_spin_unlock(lock);
}
static inline void
wqlock_init(wqlock_t *lock)
{
pthread_spin_init(lock, 0);
}
static inline u64
rdtsc(void)
{
u32 hi, lo;
__asm volatile("rdtsc" : "=a"(lo), "=d"(hi));
return ((u64)lo)|(((u64)hi)<<32);
}
#define xprintf printf
#define xmalloc(n) malloc(n)
#define xfree(p, sz) free(p)
#define pushcli()
#define popcli()
#elif defined(XV6_KERNEL)
#include "types.h"
#include "kernel.hh"
#include "spinlock.h"
#include "amd64.h"
#include "cpu.hh"
#include "wq.hh"
#include "kalloc.hh"
#include "wq.hh"
static inline int
mywqid(void)
{
return mycpu()->id;
}
static inline void*
allocwq(void)
{
return ksalloc(slab_wq);
}
static inline void
wqlock_acquire(wqlock_t *lock)
{
acquire(lock);
}
static inline int
wqlock_tryacquire(wqlock_t *lock)
{
return tryacquire(lock);
}
static inline void
wqlock_release(wqlock_t *lock)
{
release(lock);
}
static inline void
wqlock_init(wqlock_t *lock)
{
initlock(lock, "wq lock", LOCKSTAT_WQ);
}
#define xprintf cprintf
#define xmalloc(n) kmalloc(n)
#define xfree(p, sz) kmfree(p, sz)
#else
#warning "Unknown wq implementation"
#endif
static wq *wq_;
......@@ -39,7 +144,7 @@ void*
wq::operator new(unsigned long nbytes)
{
assert(nbytes == sizeof(wq));
return ksalloc(slab_wq);
return allocwq();
}
wq::wq(void)
......@@ -47,7 +152,7 @@ wq::wq(void)
int i;
for (i = 0; i < NCPU; i++)
initlock(&q_[i].lock, "wq lock", LOCKSTAT_WQ);
wqlock_init(&q_[i].lock);
}
void
......@@ -55,7 +160,7 @@ wq::dump(void)
{
int i;
for (i = 0; i < NCPU; i++)
cprintf("push %lu full %lu pop %lu steal %lu\n",
xprintf("push %lu full %lu pop %lu steal %lu\n",
stat_[i].push, stat_[i].full,
stat_[i].pop, stat_[i].steal);
}
......@@ -92,16 +197,16 @@ wq::pop(int c)
if ((i - q->tail) == 0)
return 0;
acquire(&q->lock);
wqlock_acquire(&q->lock);
i = q->head;
if ((i - q->tail) == 0) {
release(&q->lock);
wqlock_release(&q->lock);
return 0;
}
i = (i-1) & (NSLOTS-1);
w = q->w[i];
q->head--;
release(&q->lock);
wqlock_release(&q->lock);
stat_->pop++;
return w;
......@@ -114,17 +219,17 @@ wq::steal(int c)
work *w;
int i;
if (tryacquire(&q->lock) == 0)
if (wqlock_tryacquire(&q->lock) == 0)
return 0;
i = q->tail;
if ((i - q->head) == 0) {
release(&q->lock);
wqlock_release(&q->lock);
return 0;
}
i = i & (NSLOTS-1);
w = q->w[i];
q->tail++;
release(&q->lock);
wqlock_release(&q->lock);
stat_->steal++;
return w;
......@@ -177,7 +282,7 @@ void*
cwork::operator new(unsigned long nbytes)
{
assert(nbytes == sizeof(cwork));
return kmalloc(sizeof(cwork));
return xmalloc(sizeof(cwork));
}
void*
......@@ -190,5 +295,5 @@ cwork::operator new(unsigned long nbytes, cwork* buf)
void
cwork::operator delete(void *p)
{
kmfree(p, sizeof(cwork));
xfree(p, sizeof(cwork));
}
......@@ -41,6 +41,10 @@
#define NCPU 256
#define MTRACE 0
#define PERFSIZE (16<<20ull)
#elif defined(HW_wq)
#define NCPU 2
#define MTRACE 0
#define PERFSIZE (16<<20ull)
#else
#error "Unknown HW"
#endif
NCXXFLAGS = -static -g -MD -m64 -O3 -Wall -Werror -DHW_$(HW) -DXV6 \
NCXXFLAGS = -static -g -MD -m64 -O3 -Wall -Werror -DHW_$(HW) \
-fno-builtin -fno-strict-aliasing -fno-omit-frame-pointer \
-fms-extensions -mcx16 -mno-red-zone -std=c++0x \
-Wno-sign-compare -fno-exceptions -fno-rtti -fcheck-new -I.
-Wno-sign-compare -fno-exceptions -fno-rtti -fcheck-new \
-I. -include param.h -include include/compiler.h
$(O)/user/%.o: user/%.cc
$(O)/user/%.o: lib/%.cc
@echo " CXX $@"
$(Q)mkdir -p $(@D)
$(Q)$(CXX) -DLINUX $(NCXXFLAGS) -c -o $@ $<
......
#define WQSHIFT 7
#define NSLOTS (1 << WQSHIFT)
#define CACHELINE 64
#define NCPU 2
#include "include/compiler.h"
#include "include/wq.hh"
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include "util.h"
typedef uint64_t u64;
typedef uint32_t u32;
static inline u64
rdtsc(void)
{
u32 hi, lo;
__asm volatile("rdtsc" : "=a"(lo), "=d"(hi));
return ((u64)lo)|(((u64)hi)<<32);
}
struct wqueue {
struct work *w[NSLOTS];
volatile int head __mpalign__;
volatile int tail;
pthread_spinlock_t lock;
__padout__;
} __mpalign__;;
struct wqstat {
u64 push;
u64 full;
u64 pop;
u64 steal;
__padout__;
} __mpalign__;
struct wqueue queue[NCPU] __mpalign__;
struct wqstat stat[NCPU] __mpalign__;
static __thread int myid_;
#define acquire pthread_spin_lock
#define release pthread_spin_unlock
static inline int
tryacquire(pthread_spinlock_t *l)
{
return (pthread_spin_trylock(l) == 0);
}
static inline int
myid(void)
{
return myid_;
}
static inline struct wqstat *
wq_stat(void)
{
return &stat[myid()];
}
void
freework(struct work *w)
{
free(w);
}
struct work *
allocwork(void)
{
return (struct work *)malloc(4096);
}
int
wq_push(struct work *w)
{
int i;
struct wqueue *wq = &queue[myid()];
i = wq->head;
if ((i - wq->tail) == NSLOTS) {
wq_stat()->full++;
return -1;
}
i = i & (NSLOTS-1);
wq->w[i] = w;
barrier();
wq->head++;
wq_stat()->push++;
return 0;
}
static void
__wq_run(work *w)
{
void (*fn)(struct work*, void*, void*, void*, void*, void*) =
(void(*)(work*,void*,void*,void*,void*,void*))w->rip;
fn(w, w->arg0, w->arg1, w->arg2, w->arg3, w->arg4);
freework(w);
}
static inline struct work *
__wq_pop(int c)
{
struct wqueue *wq = &queue[c];
struct work *w;
int i;
i = wq->head;
if ((i - wq->tail) == 0)
return 0;
acquire(&wq->lock);
i = wq->head;
if ((i - wq->tail) == 0) {
release(&wq->lock);
return 0;
}
i = (i-1) & (NSLOTS-1);
w = wq->w[i];
wq->head--;
release(&wq->lock);
wq_stat()->pop++;
return w;
}
static inline struct work *
__wq_steal(int c)
{
struct wqueue *wq = &queue[c];
struct work *w;
int i;
if (tryacquire(&wq->lock) == 0)
return 0;
i = wq->tail;
if ((i - wq->head) == 0) {
release(&wq->lock);
return 0;
}
i = i & (NSLOTS-1);
w = wq->w[i];
wq->tail++;
release(&wq->lock);
wq_stat()->steal++;
return w;
}
int
wq_trywork(void)
{
struct work *w;
u64 i, k;
// A "random" victim CPU
k = rdtsc();
w = __wq_pop(myid());
if (w != nullptr) {
__wq_run(w);
return 1;
}
for (i = 0; i < NCPU; i++) {
u64 j = (i+k) % NCPU;
if (j == myid())
continue;
w = __wq_steal(j);
if (w != nullptr) {
__wq_run(w);
return 1;
}
}
return 0;
}
static void
worker_loop(void)
{
while (1) {
wq_trywork();
}
}
static void*
workerth(void *x)
{
u64 c = (u64)x;
myid_ = c;
setaffinity(c);
worker_loop();
return NULL;
}
void
initwq(void)
{
pthread_t th;
int r;
myid_ = 0;
setaffinity(0);
for (int i = 0; i < NCPU; i++) {
pthread_spin_init(&queue[i].lock, 0);
}
for (int i = 1; i < NCPU; i++) {
r = pthread_create(&th, NULL, workerth, (void*)(u64)i);
if (r < 0)
edie("pthread_create");
}
}
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论