A Linux userspace wq implementation..

..I should be able to merge all the wq code into one file..
上级 64f096c7
......@@ -66,6 +66,9 @@ du(int fd)
int
main(int ac, char **av)
{
extern void initwq(void);
//initwq();
printf("%d\n", du(open(".", 0)));
return 0;
}
NCXXFLAGS = -static -g -MD -m64 -O3 -Wall -Werror -DHW_$(HW) -DXV6 \
-fno-builtin -fno-strict-aliasing -fno-omit-frame-pointer \
-fms-extensions -mno-sse -mcx16 -mno-red-zone -std=c++0x \
-fms-extensions -mcx16 -mno-red-zone -std=c++0x \
-Wno-sign-compare -fno-exceptions -fno-rtti -fcheck-new -I.
$(O)/user/%.o: user/%.cc
@echo " CXX $@"
$(Q)mkdir -p $(@D)
$(Q)$(CXX) -DLINUX $(NCXXFLAGS) -c -o $@ $<
$(O)/user/%.o: bin/%.cc
@echo " CXX $@"
$(Q)mkdir -p $(@D)
$(Q)$(CXX) -DLINUX $(NCXXFLAGS) -c -o $@ $<
$(O)/xdu: $(O)/user/xdu.o
$(O)/xdu: $(O)/user/xdu.o $(O)/user/wq.o
@echo " LD $@"
$(Q)mkdir -p $(@D)
$(Q)$(CXX) -o $@ $^ -lpthread
......
#include <stdio.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
#include <sched.h>
#define __noret__ __attribute__((noreturn))
static inline __noret__ void
die(const char* errstr, ...)
{
va_list ap;
va_start(ap, errstr);
vfprintf(stderr, errstr, ap);
va_end(ap);
fprintf(stderr, "\n");
exit(EXIT_FAILURE);
}
static inline __noret__ void
edie(const char* errstr, ...)
{
va_list ap;
va_start(ap, errstr);
vfprintf(stderr, errstr, ap);
va_end(ap);
fprintf(stderr, ": %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
static inline void
setaffinity(int c)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(c, &cpuset);
if (sched_setaffinity(0, sizeof(cpuset), &cpuset) < 0)
edie("setaffinity, sched_setaffinity failed");
}
#define WQSHIFT 7
#define NSLOTS (1 << WQSHIFT)
#define CACHELINE 64
#define NCPU 2
#include "include/compiler.h"
#include "include/wq.hh"
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdio.h>
#include "util.h"
typedef uint64_t u64;
typedef uint32_t u32;
static inline u64
rdtsc(void)
{
u32 hi, lo;
__asm volatile("rdtsc" : "=a"(lo), "=d"(hi));
return ((u64)lo)|(((u64)hi)<<32);
}
struct wqueue {
struct work *w[NSLOTS];
volatile int head __mpalign__;
volatile int tail;
pthread_spinlock_t lock;
__padout__;
} __mpalign__;;
struct wqstat {
u64 push;
u64 full;
u64 pop;
u64 steal;
__padout__;
} __mpalign__;
struct wqueue queue[NCPU] __mpalign__;
struct wqstat stat[NCPU] __mpalign__;
static __thread int myid_;
#define acquire pthread_spin_lock
#define release pthread_spin_unlock
static inline int
tryacquire(pthread_spinlock_t *l)
{
return (pthread_spin_trylock(l) == 0);
}
static inline int
myid(void)
{
return myid_;
}
static inline struct wqstat *
wq_stat(void)
{
return &stat[myid()];
}
void
freework(struct work *w)
{
free(w);
}
struct work *
allocwork(void)
{
return (struct work *)malloc(4096);
}
int
wq_push(struct work *w)
{
int i;
struct wqueue *wq = &queue[myid()];
i = wq->head;
if ((i - wq->tail) == NSLOTS) {
wq_stat()->full++;
return -1;
}
i = i & (NSLOTS-1);
wq->w[i] = w;
barrier();
wq->head++;
wq_stat()->push++;
return 0;
}
static void
__wq_run(struct work *w)
{
void (*fn)(struct work*, void*, void*, void*, void*, void*) =
(void(*)(work*,void*,void*,void*,void*,void*))w->rip;
fn(w, w->arg0, w->arg1, w->arg2, w->arg3, w->arg4);
freework(w);
}
static inline struct work *
__wq_pop(int c)
{
struct wqueue *wq = &queue[c];
struct work *w;
int i;
i = wq->head;
if ((i - wq->tail) == 0)
return 0;
acquire(&wq->lock);
i = wq->head;
if ((i - wq->tail) == 0) {
release(&wq->lock);
return 0;
}
i = (i-1) & (NSLOTS-1);
w = wq->w[i];
wq->head--;
release(&wq->lock);
wq_stat()->pop++;
return w;
}
static inline struct work *
__wq_steal(int c)
{
struct wqueue *wq = &queue[c];
struct work *w;
int i;
if (tryacquire(&wq->lock) == 0)
return 0;
i = wq->tail;
if ((i - wq->head) == 0) {
release(&wq->lock);
return 0;
}
i = i & (NSLOTS-1);
w = wq->w[i];
wq->tail++;
release(&wq->lock);
wq_stat()->steal++;
return w;
}
int
wq_trywork(void)
{
struct work *w;
u64 i, k;
// A "random" victim CPU
k = rdtsc();
w = __wq_pop(myid());
if (w != nullptr) {
__wq_run(w);
return 1;
}
for (i = 0; i < NCPU; i++) {
u64 j = (i+k) % NCPU;
if (j == myid())
continue;
w = __wq_steal(j);
if (w != nullptr) {
__wq_run(w);
return 1;
}
}
return 0;
}
static void
worker_loop(void)
{
while (1) {
wq_trywork();
}
}
static void*
workerth(void *x)
{
u64 c = (u64)x;
myid_ = c;
setaffinity(c);
worker_loop();
return NULL;
}
void
initwq(void)
{
pthread_t th;
int r;
myid_ = 0;
setaffinity(0);
for (int i = 0; i < NCPU; i++) {
pthread_spin_init(&queue[i].lock, 0);
}
for (int i = 1; i < NCPU; i++) {
r = pthread_create(&th, NULL, workerth, (void*)(u64)i);
if (r < 0)
edie("pthread_create");
}
}
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论