mutation_partition: Switch cache of rows onto B-tree

The switch is pretty straightforward, and consists of

- change less-compare into tri-compare

- rename insert/insert_check into insert_before_hint

- use tree::key_grabber in mutation_partition::apply_monotonically to
  exception-safely transfer a row from one tree to another

- explicitly erase the row from tree in rows_entry::on_evicted, there's
  a O(1) tree::iterator method for this

- rewrite rows_entry -> cache_entry transofrmation in the on_evicted to
  fit the B-tree API

- include the B-tree's external memory usage into stats

That's it. The number of keys per node was is set to 12 with linear search
and linear extention of 20 because

- experimenting with tree shows that numbers 8 through 10 keys with linear
  search show the best performance on stress tests for insert/find-s of
  keys that are memcmp-able arrays of bytes (which is an approximation of
  current clustring key compare). More keys work slower, but still better
  than any bigger value with any type of search up to 64 keys per node

- having 12 keys per nodes is the threshold at which the memory footprint
  for B-tree becomes smaller than for boost::intrusive::set for partitions
  with 32+ keys

- 20 keys for linear root eats the first-split peak and still performs
  well in linear search

As a result the footpring for B tree is bigger than the one for BST only for
trees filled with 21...32 keys by 0.1...0.7 bytes per key.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2020-07-15 13:49:56 +03:00
parent 165255e2bd
commit 5c0f9a8180
7 changed files with 56 additions and 59 deletions

View File

@@ -342,14 +342,14 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
this->maybe_update_continuity();
} else if (can_populate()) {
rows_entry::compare less(*_schema);
rows_entry::tri_compare cmp(*_schema);
auto& rows = _snp->version()->partition().clustered_rows();
if (query::is_single_row(*_schema, *_ck_ranges_curr)) {
with_allocator(_snp->region().allocator(), [&] {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(_ck_ranges_curr->start()->value()));
// Use _next_row iterator only as a hint, because there could be insertions after _upper_bound.
auto insert_result = rows.insert_check(_next_row.get_iterator_in_latest_version(), *e, less);
auto insert_result = rows.insert_before_hint(_next_row.get_iterator_in_latest_version(), *e, cmp);
auto inserted = insert_result.second;
auto it = insert_result.first;
if (inserted) {
@@ -365,7 +365,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_schema, _upper_bound, is_dummy::yes, is_continuous::yes));
// Use _next_row iterator only as a hint, because there could be insertions after _upper_bound.
auto insert_result = rows.insert_check(_next_row.get_iterator_in_latest_version(), *e, less);
auto insert_result = rows.insert_before_hint(_next_row.get_iterator_in_latest_version(), *e, cmp);
auto inserted = insert_result.second;
if (inserted) {
clogger.trace("csm {}: inserted dummy at {}", fmt::ptr(this), _upper_bound);
@@ -405,12 +405,12 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
if (!_last_row.is_in_latest_version()) {
with_allocator(_snp->region().allocator(), [&] {
auto& rows = _snp->version()->partition().clustered_rows();
rows_entry::compare less(*_schema);
rows_entry::tri_compare cmp(*_schema);
// FIXME: Avoid the copy by inserting an incomplete clustering row
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_schema, *_last_row));
e->set_continuous(false);
auto insert_result = rows.insert_check(rows.end(), *e, less);
auto insert_result = rows.insert_before_hint(rows.end(), *e, cmp);
auto inserted = insert_result.second;
if (inserted) {
clogger.trace("csm {}: inserted lower bound dummy at {}", fmt::ptr(this), e->position());
@@ -456,7 +456,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
mutation_partition& mp = _snp->version()->partition();
rows_entry::compare less(*_schema);
rows_entry::tri_compare cmp(*_schema);
if (_read_context->digest_requested()) {
cr.cells().prepare_hash(*_schema, column_kind::regular_column);
@@ -465,8 +465,8 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
current_allocator().construct<rows_entry>(*_schema, cr.key(), cr.as_deletable_row()));
new_entry->set_continuous(false);
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
: mp.clustered_rows().lower_bound(cr.key(), less);
auto insert_result = mp.clustered_rows().insert_check(it, *new_entry, less);
: mp.clustered_rows().lower_bound(cr.key(), cmp);
auto insert_result = mp.clustered_rows().insert_before_hint(it, *new_entry, cmp);
if (insert_result.second) {
_snp->tracker()->insert(*new_entry);
new_entry.release();

View File

@@ -32,7 +32,6 @@
#include "mutation_query.hh"
#include "service/priority_manager.hh"
#include "mutation_compactor.hh"
#include "intrusive_set_external_comparator.hh"
#include "counters.hh"
#include "row_cache.hh"
#include "view_info.hh"
@@ -157,8 +156,8 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
#ifdef SEASTAR_DEBUG
assert(x._schema_version == _schema_version);
#endif
auto cloner = [&s] (const auto& x) {
return current_allocator().construct<rows_entry>(s, x);
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
return current_allocator().construct<rows_entry>(s, *x);
};
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
}
@@ -181,7 +180,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
_rows.insert(_rows.end(), *ce, rows_entry::compare(schema));
_rows.insert_before_hint(_rows.end(), *ce, rows_entry::tri_compare(schema));
ce.release();
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
@@ -322,7 +321,6 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
return stop_iteration::no;
}
rows_entry::compare less(s);
rows_entry::tri_compare cmp(s);
auto del = current_deleter<rows_entry>();
auto p_i = p._rows.begin();
@@ -331,17 +329,16 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
try {
rows_entry& src_e = *p_i;
if (i != _rows.end() && cmp(*i, src_e) < 0) {
i = _rows.lower_bound(src_e, less);
i = _rows.lower_bound(src_e, cmp);
}
if (i == _rows.end() || cmp(src_e, *i) < 0) {
p_i = p._rows.erase(p_i);
bool insert = true;
if (i != _rows.end() && i->continuous()) {
// When falling into a continuous range, preserve continuity.
src_e.set_continuous(true);
if (src_e.dummy()) {
p_i = p._rows.erase(p_i);
if (tracker) {
tracker->on_remove(src_e);
}
@@ -350,7 +347,8 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
}
}
if (insert) {
_rows.insert_before(i, src_e);
rows_type::key_grabber pi_kg(p_i);
_rows.insert_before(i, std::move(pi_kg));
}
} else {
auto continuous = i->continuous() || src_e.continuous();
@@ -444,7 +442,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
check_schema(schema);
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
auto j = _rows.find(key, rows_entry::compare(schema));
auto j = _rows.find(key, rows_entry::tri_compare(schema));
if (j != _rows.end()) {
t.apply(j->row().deleted_at(), j->row().marker());
}
@@ -531,7 +529,7 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key, std::move(row)));
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
_rows.insert_before_hint(_rows.end(), *e, rows_entry::tri_compare(s));
e.release();
}
@@ -539,14 +537,14 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
check_schema(s);
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, key, row));
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
_rows.insert_before_hint(_rows.end(), *e, rows_entry::tri_compare(s));
e.release();
}
const row*
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
return nullptr;
}
@@ -556,11 +554,11 @@ mutation_partition::find_row(const schema& s, const clustering_key& key) const {
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(std::move(key)));
i = _rows.insert(i, *e, rows_entry::compare(s));
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
e.release();
}
return i->row();
@@ -569,11 +567,11 @@ mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert(i, *e, rows_entry::compare(s));
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
e.release();
}
return i->row();
@@ -582,11 +580,11 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
check_schema(s);
auto i = _rows.find(key, rows_entry::compare(s));
auto i = _rows.find(key, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(key));
i = _rows.insert(i, *e, rows_entry::compare(s));
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
e.release();
}
return i->row();
@@ -595,11 +593,11 @@ mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
deletable_row&
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
auto i = _rows.find(pos, rows_entry::compare(s));
auto i = _rows.find(pos, rows_entry::tri_compare(s));
if (i == _rows.end()) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert(i, *e, rows_entry::compare(s));
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
e.release();
}
return i->row();
@@ -608,7 +606,6 @@ mutation_partition::clustered_row(const schema& s, position_in_partition_view po
deletable_row&
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
check_schema(s);
const auto less = rows_entry::compare(s);
const auto cmp = rows_entry::tri_compare(s);
auto i = _rows.end();
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
@@ -616,7 +613,7 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
", last clustering row is equal or greater: {}", i->key(), std::prev(i)->key()));
}
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
i = _rows.insert(i, *e, less);
i = _rows.insert_before_hint(i, *e, cmp).first;
e.release();
return i->row();
@@ -628,7 +625,7 @@ mutation_partition::lower_bound(const schema& schema, const query::clustering_ra
if (!r.start()) {
return std::cbegin(_rows);
}
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::compare(schema));
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
}
mutation_partition::rows_type::const_iterator
@@ -637,7 +634,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
if (!r.end()) {
return std::cend(_rows);
}
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::compare(schema));
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
}
boost::iterator_range<mutation_partition::rows_type::const_iterator>
@@ -1317,6 +1314,7 @@ size_t mutation_partition::external_memory_usage(const schema& s) const {
check_schema(s);
size_t sum = 0;
sum += static_row().external_memory_usage(s, column_kind::static_column);
sum += clustered_rows().external_memory_usage();
for (auto& clr : clustered_rows()) {
sum += clr.memory_usage(s);
}
@@ -2434,14 +2432,13 @@ void mutation_partition::make_fully_continuous() {
}
void mutation_partition::set_continuity(const schema& s, const position_range& pr, is_continuous cont) {
auto less = rows_entry::compare(s);
auto cmp = rows_entry::tri_compare(s);
if (cmp(pr.start(), pr.end()) >= 0) {
return; // empty range
}
auto end = _rows.lower_bound(pr.end(), less);
auto end = _rows.lower_bound(pr.end(), cmp);
if (end == _rows.end() || cmp(pr.end(), end->position()) < 0) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pr.end(), is_dummy::yes,
@@ -2450,7 +2447,7 @@ void mutation_partition::set_continuity(const schema& s, const position_range& p
e.release();
}
auto i = _rows.lower_bound(pr.start(), less);
auto i = _rows.lower_bound(pr.start(), cmp);
if (cmp(pr.start(), i->position()) < 0) {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(s, pr.start(), is_dummy::yes, i->continuous()));
@@ -2526,10 +2523,9 @@ stop_iteration mutation_partition::clear_gently(cache_tracker* tracker) noexcept
bool
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) const {
check_schema(s);
auto less = rows_entry::compare(s);
auto cmp = rows_entry::tri_compare(s);
auto i = _rows.lower_bound(r.start(), less);
auto end = _rows.lower_bound(r.end(), less);
auto i = _rows.lower_bound(r.start(), cmp);
auto end = _rows.lower_bound(r.end(), cmp);
if (cmp(r.start(), r.end()) >= 0) {
return bool(cont);
}

View File

@@ -44,7 +44,7 @@
#include "hashing_partition_visitor.hh"
#include "range_tombstone_list.hh"
#include "clustering_key_filter.hh"
#include "intrusive_set_external_comparator.hh"
#include "utils/intrusive_btree.hh"
#include "utils/preempt.hh"
#include "utils/managed_ref.hh"
@@ -1023,7 +1023,7 @@ class cache_tracker;
class rows_entry {
using lru_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
friend class size_calculator;
intrusive_set_external_comparator_member_hook _link;
intrusive_b::member_hook _link;
clustering_key _key;
deletable_row _row;
lru_link_type _lru_link;
@@ -1039,7 +1039,6 @@ class rows_entry {
flags() : _before_ck(0), _after_ck(0), _continuous(true), _dummy(false), _last_dummy(false) { }
} _flags{};
public:
using container_type = intrusive_set_external_comparator<rows_entry, &rows_entry::_link>;
using lru_type = bi::list<rows_entry,
bi::member_hook<rows_entry, rows_entry::lru_link_type, &rows_entry::_lru_link>,
bi::constant_time_size<false>>; // we need this to have bi::auto_unlink on hooks.
@@ -1158,6 +1157,8 @@ public:
friend std::ostream& operator<<(std::ostream& os, const printer& p);
};
friend std::ostream& operator<<(std::ostream& os, const printer& p);
using container_type = intrusive_b::tree<rows_entry, &rows_entry::_link, rows_entry::tri_compare, 12, 20, intrusive_b::key_search::linear>;
};
struct mutation_application_stats {

View File

@@ -56,7 +56,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
class lsa_partition_reader {
const schema& _schema;
reader_permit _permit;
rows_entry::compare _less;
rows_entry::tri_compare _cmp;
position_in_partition::equal_compare _eq;
heap_compare _heap_cmp;
@@ -89,10 +89,10 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
}
for (auto&& v : _snapshot->versions()) {
auto cr_end = v.partition().upper_bound(_schema, ck_range);
mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(_schema, ck_range);
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
if (last_row) {
return v.partition().clustered_rows().upper_bound(*last_row, _less);
return v.partition().clustered_rows().upper_bound(*last_row, _cmp);
} else {
return v.partition().lower_bound(_schema, ck_range);
}
@@ -131,7 +131,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
bool digest_requested)
: _schema(s)
, _permit(std::move(permit))
, _less(s)
, _cmp(s)
, _eq(s)
, _heap_cmp(s)
, _snapshot(std::move(snp))

View File

@@ -72,11 +72,11 @@ public:
return false;
}
_change_mark = snp_cm;
rows_entry::compare less(*snp.schema());
rows_entry::tri_compare cmp(*snp.schema());
_in_latest = true;
for (auto&& v : snp.versions()) {
auto& rows = v.partition().clustered_rows();
_it = rows.find(_pos, less);
_it = rows.find(_pos, cmp);
if (_it != rows.end()) {
return true;
}
@@ -163,7 +163,7 @@ class partition_snapshot_row_cursor final {
void prepare_heap(position_in_partition_view lower_bound) {
memory::on_alloc_point();
rows_entry::compare less(_schema);
rows_entry::tri_compare cmp(_schema);
position_in_version::less_compare heap_less(_schema);
_heap.clear();
_current_row.clear();
@@ -174,7 +174,7 @@ class partition_snapshot_row_cursor final {
for (auto&& v : _snp.versions()) {
unique_owner = unique_owner && (first || !v.is_referenced());
auto& rows = v.partition().clustered_rows();
auto pos = rows.lower_bound(lower_bound, less);
auto pos = rows.lower_bound(lower_bound, cmp);
auto end = rows.end();
_iterators.push_back(pos);
if (pos != end) {
@@ -233,11 +233,11 @@ public:
// before it and after cursor's position. There cannot be any
// insertions for non-latest versions, so we don't have to update them.
if (_current_row[0].version_no != 0) {
rows_entry::compare less(_schema);
rows_entry::tri_compare cmp(_schema);
position_in_partition::equal_compare eq(_schema);
position_in_version::less_compare heap_less(_schema);
auto& rows = _snp.version()->partition().clustered_rows();
auto it = _iterators[0] = rows.lower_bound(_position, less);
auto it = _iterators[0] = rows.lower_bound(_position, cmp);
auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
if (it == rows.end()) {
if (heap_i != _heap.end()) {

View File

@@ -1198,7 +1198,8 @@ void cache_entry::on_evicted(cache_tracker& tracker) noexcept {
}
void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
auto it = mutation_partition::rows_type::iterator_to(*this);
mutation_partition::rows_type::iterator it(this);
if (is_last_dummy()) {
// Every evictable partition entry must have a dummy entry at the end,
// so don't remove it, just unlink from the LRU.
@@ -1206,16 +1207,15 @@ void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
// with no regular rows, and we need to track them.
unlink_from_lru();
} else {
++it;
it = it.erase_and_dispose(current_deleter<rows_entry>());
it->set_continuous(false);
current_deleter<rows_entry>()(this);
tracker.on_row_eviction();
}
if (mutation_partition::rows_type::is_only_member(*it)) {
mutation_partition::rows_type* rows = it.tree_if_singular();
if (rows != nullptr) {
assert(it->is_last_dummy());
partition_version& pv = partition_version::container_of(mutation_partition::container_of(
mutation_partition::rows_type::container_of_only_member(*it)));
partition_version& pv = partition_version::container_of(mutation_partition::container_of(*rows));
if (pv.is_referenced_from_entry()) {
partition_entry& pe = partition_entry::container_of(pv);
if (!pe.is_locked()) {

View File

@@ -77,7 +77,7 @@ static void add_tombstone(mutation& m, range_tombstone rt) {
}
static void set_row_continuous(mutation_partition& mp, int ck, is_continuous value) {
auto it = mp.clustered_rows().find(make_ck(ck), rows_entry::compare(*SCHEMA));
auto it = mp.clustered_rows().find(make_ck(ck), rows_entry::tri_compare(*SCHEMA));
assert(it != mp.clustered_rows().end());
it->set_continuous(value);
}