提交 a9659913 创建 作者: David Benjamin's avatar David Benjamin

Allow placing nodes at a higher level in the tree

First version that works. There are some inefficiencies that need to go. Also it doesn't yet un-collapse nodes when there's no need to do so. Although, experimentally, it doesn't appear that the collapsing feature gets used much. If I bring bits_per_level down to 4, it seems to end up a bit more useful.
上级 72cdef8d
......@@ -146,8 +146,13 @@ class radix_elem : public rcu_freed {
public:
radix_elem() : rcu_freed("radix_elem"), deleted_(false), ref_(0) {}
bool deleted() { return deleted_; }
void decref() { if (--ref_ == 0) { deleted_ = true; gc_delayed(this); } }
void incref() { ref_++; }
void decref(u64 delta = 1) {
if ((ref_ -= delta) == 0) {
deleted_ = true;
gc_delayed(this);
}
}
void incref(u64 delta = 1) { ref_ += delta; }
};
struct radix_node : public rcu_freed {
......
......@@ -5,6 +5,9 @@ enum { radix_debug = 0 };
#define dprintf(...) do { if (radix_debug) cprintf(__VA_ARGS__); } while(0)
static_assert(key_bits == bits_per_level * radix_levels,
"for now, we only support exact multiples of bits_per_level");
// Returns the index needed to reach |level| from |level+1|.
static u32
index(u64 key, u32 level)
......@@ -14,43 +17,94 @@ index(u64 key, u32 level)
return idx;
}
// Returns the level we stopped at.
template<class CB>
u32
descend(u64 key, radix_ptr *n, u32 level, CB cb, bool create)
static radix_entry
push_down(radix_entry cur, radix_ptr *ptr)
{
static_assert(key_bits == bits_per_level * radix_levels,
"for now, we only support exact multiples of bits_per_level");
assert(n);
while (cur.state() != entry_dead && cur.state() != entry_node) {
// If we're locked, just spin and try again.
if (cur.state() == entry_locked) {
cur = ptr->load();
continue;
}
radix_entry v = n->load();
if (v.is_null() && create) {
assert(v.state() == entry_unlocked);
// Make a new node.
assert(cur.state() == entry_unlocked);
radix_elem *elem = cur.elem();
radix_node *new_rn = new radix_node();
radix_entry cur = v;
v = radix_entry(new_rn);
do {
if (!cur.is_null()) {
assert(cur.is_node());
v = cur;
delete new_rn;
break;
if (elem != nullptr) {
for (int i = 0; i < (1<<bits_per_level); i++) {
new_rn->child[i].store(radix_entry(elem));
}
} while (!n->compare_exchange_weak(cur, v));
}
// Node isn't there. Just return.
if (v.is_null()) {
return level+1;
elem->incref(1<<bits_per_level);
}
if (ptr->compare_exchange_weak(cur, radix_entry(new_rn))) {
// Release the ref from the pointer we replaced. FIXME: Bouncing
// on the reference count here is annoying. Maybe the reference
// count should be dependent on the high of the leaf?
if (elem != nullptr)
elem->decref();
} else {
// Someone else beat us to it. Back out. FIXME: docs say
// compare_exchange_weak can return spuriously. Should avoid
// reallocating new_rn if elem doesn't change.
// Avoid bouncing on the refcount 1<<bits_per_level times.
if (elem != nullptr) {
for (int i = 0; i < (1<<bits_per_level); i++) {
new_rn->child[i].store(radix_entry(nullptr));
}
elem->decref(1<<bits_per_level);
}
new_rn->do_gc();
}
}
return cur;
}
radix_node *rn = v.node();
// Returns the next node to be processed, whether or not it falls in
// the range. Success is to return cur_start + cur_size. Otherwise we
// stopped early and bubble up the error.
template <class CB>
u64
update_range(radix_entry cur, radix_ptr *ptr, CB cb,
u64 cur_start, u64 cur_end,
u64 start, u64 end)
{
// If ranges are disjoint, do nothing. We manage to process everyone
// for free.
if (cur_start >= end || start >= cur_end)
return cur_end;
radix_ptr *vptr = &rn->child[index(key, level)];
if (level == 0) {
cb(vptr);
return level;
// If our range is not strictly contained in the target, ensure we
// are at a node.
if (start > cur_start || end < cur_end) {
cur = push_down(cur, ptr);
// Failed. Next time resume at cur_start.
if (cur.state() == entry_dead)
return cur_start;
}
if (cur.is_node()) {
// Descend.
u64 child_size = (cur_end - cur_start) >> bits_per_level;
u64 child_start = cur_start;
for (int i = 0; i < (1<<bits_per_level); i++, child_start += child_size) {
radix_ptr *child = &cur.node()->child[i];
// FIXME: This results in loading every child. We shouldn't even
// touch pointers with no intersection with ours.
u64 ret = update_range(child->load(), child, cb,
child_start, child_start + child_size,
start, end);
if (ret != child_start + child_size) return ret;
}
return cur_end;
} else {
return descend(key, vptr, level-1, cb, create);
// If we're here, the target range must completely contain this
// element.
assert(start <= cur_start && cur_end <= end);
// Callback returns how far it processed.
return cb(cur, ptr, cur_start, cur_end);
}
}
......@@ -87,7 +141,7 @@ radix::search(u64 key)
for (u32 level = radix_levels-1; level >= 0 && !cur.is_elem(); level--) {
cur = cur.node()->child[index(key >> shift_, level)].load();
}
dprintf("%p: search(%lu) -> %p\n", this, key >> shift_, cur.elem());
dprintf("%p: search(%lx) -> %p\n", this, key >> shift_, cur.elem());
return cur.elem();
}
......@@ -97,18 +151,50 @@ radix::search_lock(u64 start, u64 size)
return radix_range(this, start >> shift_, size >> shift_);
}
// This should be a lambda, but it's awkward for a lambda to call
// itself.
struct entry_locker {
u64 start_;
u64 end_;
entry_locker(u64 start, u64 end) : start_(start), end_(end) { }
u64 operator()(radix_entry cur, radix_ptr *ptr, u64 cur_start, u64 cur_end) const {
while (cur.state() != entry_dead && cur.state() != entry_node) {
// Locked -> spin and try again.
if (cur.state() == entry_locked) {
cur = ptr->load();
continue;
}
// Otherwise it's unlocked. Try to load it.
if (ptr->compare_exchange_weak(cur, cur.with_state(entry_locked))) {
// Success. Remember the current value and break out.
cur = cur.with_state(entry_locked);
break;
}
}
// Someone deleted this leaf. Abort this iteration.
if (cur.state() == entry_dead)
return cur_start;
// Someone pushed down. Recurse some more.
if (cur.state() == entry_node)
return update_range(cur, ptr, *this, cur_start, cur_end, start_, end_);
// We managed to lock!
assert(cur.state() == entry_locked);
return cur_end;
}
};
radix_range::radix_range(radix *r, u64 start, u64 size)
: r_(r), start_(start), size_(size)
{
for (u64 k = start_; k != start_ + size_; k++) {
if (descend(k, &r_->root_, radix_levels-1, [](radix_ptr *v) {
radix_entry cur = v->load();
while (cur.state() == entry_locked ||
!v->compare_exchange_weak(cur, cur.with_state(entry_locked)))
; // spin
}, true) != 0) {
panic("radix_range");
}
u64 next_start = start_;
u64 end = start_ + size_;
// Lock the range from left to right. If we hid a dead element re-load the root.
while (next_start < end) {
const entry_locker& cb = entry_locker(next_start, end);
next_start = update_range(r_->root_.load(), &r_->root_, cb,
0, 1L << key_bits, next_start, end);
assert(next_start >= start_);
}
}
......@@ -117,16 +203,16 @@ radix_range::~radix_range()
if (!r_)
return;
for (u64 k = start_; k != start_ + size_; k++) {
if (descend(k, &r_->root_, radix_levels-1, [](radix_ptr *v) {
radix_entry cur = v->load();
do {
assert(cur.state() == entry_locked);
} while (!v->compare_exchange_weak(cur, cur.with_state(entry_unlocked)));
}, true) != 0) {
panic("~radix_range");
}
}
u64 ret = update_range(r_->root_.load(), &r_->root_, [](radix_entry cur, radix_ptr *ptr, u64 cur_start, u64 cur_end) -> u64 {
do {
// It had better still be locked.
assert(cur.state() == entry_locked);
} while (!ptr->compare_exchange_weak(cur, cur.with_state(entry_unlocked)));
return cur_end;
}, 0, 1L << key_bits, start_, start_ + size_);
// Impossible to hit entry_dead. We own the lock.
if (ret != 1L << key_bits)
panic("~radix_range");
}
void
......@@ -134,30 +220,33 @@ radix_range::replace(u64 start, u64 size, radix_elem *val)
{
start = start >> r_->shift_;
size = size >> r_->shift_;
dprintf("%p: replace: [%lu, %lu) with %p\n", r_, start, start + size, val);
dprintf("%p: replace: [%lx, %lx) with %p\n", r_, start, start + size, val);
assert(start >= start_);
assert(start + size <= start_ + size_);
for (u64 k = start; k != start + size; k++) {
if (descend(k, &r_->root_, radix_levels-1, [val](radix_ptr *v) {
radix_entry cur = v->load();
do {
assert(cur.state() == entry_locked);
} while (!v->compare_exchange_weak(cur, radix_entry(val, entry_locked)));
if (val)
val->incref();
if (!cur.is_null())
cur.elem()->decref();
}, true)) {
panic("radix_range::replace");
}
}
u64 ret = update_range(r_->root_.load(), &r_->root_, [val](radix_entry cur, radix_ptr *ptr, u64 cur_start, u64 cur_end) -> u64 {
dprintf(" -> [%lx, %lx); size = %lx\n", cur_start, cur_end, cur_end - cur_start);
do {
assert(cur.state() == entry_locked);
} while (!ptr->compare_exchange_weak(cur, radix_entry(val, entry_locked)));
if (val)
val->incref();
if (!cur.is_null())
cur.elem()->decref();
return cur_end;
}, 0, 1L << key_bits, start, start + size);
// Impossible to hit entry_dead. We own the lock.
if (ret != 1L << key_bits)
panic("radix_range::replace");
// TODO: If we can, collapse some intermediate nodes, RCU-freeing
// them.
}
radix_iterator::radix_iterator(const radix* r, u64 k)
: r_(r), k_(k) {
dprintf("%p: Made iterator with k = %lu\n", r_, k_);
dprintf("%p: Made iterator with k = %lx\n", r_, k_);
if (k_ != ~0ULL) {
path_[radix_levels] = r_->root_.load();
if (path_[radix_levels].is_elem())
......@@ -165,7 +254,7 @@ radix_iterator::radix_iterator(const radix* r, u64 k)
else if (!find_first_leaf(radix_levels - 1))
k_ = ~0ULL;
}
dprintf("%p: Adjusted: k = %lu\n", r_, k_);
dprintf("%p: Adjusted: k = %lx\n", r_, k_);
}
bool
......
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论