mutation_partition: Switch cache of rows onto B-tree
The switch is pretty straightforward, and consists of - change less-compare into tri-compare - rename insert/insert_check into insert_before_hint - use tree::key_grabber in mutation_partition::apply_monotonically to exception-safely transfer a row from one tree to another - explicitly erase the row from tree in rows_entry::on_evicted, there's a O(1) tree::iterator method for this - rewrite rows_entry -> cache_entry transofrmation in the on_evicted to fit the B-tree API - include the B-tree's external memory usage into stats That's it. The number of keys per node was is set to 12 with linear search and linear extention of 20 because - experimenting with tree shows that numbers 8 through 10 keys with linear search show the best performance on stress tests for insert/find-s of keys that are memcmp-able arrays of bytes (which is an approximation of current clustring key compare). More keys work slower, but still better than any bigger value with any type of search up to 64 keys per node - having 12 keys per nodes is the threshold at which the memory footprint for B-tree becomes smaller than for boost::intrusive::set for partitions with 32+ keys - 20 keys for linear root eats the first-split peak and still performs well in linear search As a result the footpring for B tree is bigger than the one for BST only for trees filled with 21...32 keys by 0.1...0.7 bytes per key. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -342,14 +342,14 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
|
||||
this->maybe_update_continuity();
|
||||
} else if (can_populate()) {
|
||||
rows_entry::compare less(*_schema);
|
||||
rows_entry::tri_compare cmp(*_schema);
|
||||
auto& rows = _snp->version()->partition().clustered_rows();
|
||||
if (query::is_single_row(*_schema, *_ck_ranges_curr)) {
|
||||
with_allocator(_snp->region().allocator(), [&] {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(_ck_ranges_curr->start()->value()));
|
||||
// Use _next_row iterator only as a hint, because there could be insertions after _upper_bound.
|
||||
auto insert_result = rows.insert_check(_next_row.get_iterator_in_latest_version(), *e, less);
|
||||
auto insert_result = rows.insert_before_hint(_next_row.get_iterator_in_latest_version(), *e, cmp);
|
||||
auto inserted = insert_result.second;
|
||||
auto it = insert_result.first;
|
||||
if (inserted) {
|
||||
@@ -365,7 +365,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_schema, _upper_bound, is_dummy::yes, is_continuous::yes));
|
||||
// Use _next_row iterator only as a hint, because there could be insertions after _upper_bound.
|
||||
auto insert_result = rows.insert_check(_next_row.get_iterator_in_latest_version(), *e, less);
|
||||
auto insert_result = rows.insert_before_hint(_next_row.get_iterator_in_latest_version(), *e, cmp);
|
||||
auto inserted = insert_result.second;
|
||||
if (inserted) {
|
||||
clogger.trace("csm {}: inserted dummy at {}", fmt::ptr(this), _upper_bound);
|
||||
@@ -405,12 +405,12 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
|
||||
if (!_last_row.is_in_latest_version()) {
|
||||
with_allocator(_snp->region().allocator(), [&] {
|
||||
auto& rows = _snp->version()->partition().clustered_rows();
|
||||
rows_entry::compare less(*_schema);
|
||||
rows_entry::tri_compare cmp(*_schema);
|
||||
// FIXME: Avoid the copy by inserting an incomplete clustering row
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_schema, *_last_row));
|
||||
e->set_continuous(false);
|
||||
auto insert_result = rows.insert_check(rows.end(), *e, less);
|
||||
auto insert_result = rows.insert_before_hint(rows.end(), *e, cmp);
|
||||
auto inserted = insert_result.second;
|
||||
if (inserted) {
|
||||
clogger.trace("csm {}: inserted lower bound dummy at {}", fmt::ptr(this), e->position());
|
||||
@@ -456,7 +456,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
|
||||
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
|
||||
mutation_partition& mp = _snp->version()->partition();
|
||||
rows_entry::compare less(*_schema);
|
||||
rows_entry::tri_compare cmp(*_schema);
|
||||
|
||||
if (_read_context->digest_requested()) {
|
||||
cr.cells().prepare_hash(*_schema, column_kind::regular_column);
|
||||
@@ -465,8 +465,8 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
current_allocator().construct<rows_entry>(*_schema, cr.key(), cr.as_deletable_row()));
|
||||
new_entry->set_continuous(false);
|
||||
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
|
||||
: mp.clustered_rows().lower_bound(cr.key(), less);
|
||||
auto insert_result = mp.clustered_rows().insert_check(it, *new_entry, less);
|
||||
: mp.clustered_rows().lower_bound(cr.key(), cmp);
|
||||
auto insert_result = mp.clustered_rows().insert_before_hint(it, *new_entry, cmp);
|
||||
if (insert_result.second) {
|
||||
_snp->tracker()->insert(*new_entry);
|
||||
new_entry.release();
|
||||
|
||||
@@ -32,7 +32,6 @@
|
||||
#include "mutation_query.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "mutation_compactor.hh"
|
||||
#include "intrusive_set_external_comparator.hh"
|
||||
#include "counters.hh"
|
||||
#include "row_cache.hh"
|
||||
#include "view_info.hh"
|
||||
@@ -157,8 +156,8 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
|
||||
#ifdef SEASTAR_DEBUG
|
||||
assert(x._schema_version == _schema_version);
|
||||
#endif
|
||||
auto cloner = [&s] (const auto& x) {
|
||||
return current_allocator().construct<rows_entry>(s, x);
|
||||
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
|
||||
return current_allocator().construct<rows_entry>(s, *x);
|
||||
};
|
||||
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
|
||||
}
|
||||
@@ -181,7 +180,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
|
||||
_rows.insert(_rows.end(), *ce, rows_entry::compare(schema));
|
||||
_rows.insert_before_hint(_rows.end(), *ce, rows_entry::tri_compare(schema));
|
||||
ce.release();
|
||||
}
|
||||
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
|
||||
@@ -322,7 +321,6 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
rows_entry::compare less(s);
|
||||
rows_entry::tri_compare cmp(s);
|
||||
auto del = current_deleter<rows_entry>();
|
||||
auto p_i = p._rows.begin();
|
||||
@@ -331,17 +329,16 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
try {
|
||||
rows_entry& src_e = *p_i;
|
||||
if (i != _rows.end() && cmp(*i, src_e) < 0) {
|
||||
i = _rows.lower_bound(src_e, less);
|
||||
i = _rows.lower_bound(src_e, cmp);
|
||||
}
|
||||
if (i == _rows.end() || cmp(src_e, *i) < 0) {
|
||||
p_i = p._rows.erase(p_i);
|
||||
|
||||
bool insert = true;
|
||||
if (i != _rows.end() && i->continuous()) {
|
||||
// When falling into a continuous range, preserve continuity.
|
||||
src_e.set_continuous(true);
|
||||
|
||||
if (src_e.dummy()) {
|
||||
p_i = p._rows.erase(p_i);
|
||||
if (tracker) {
|
||||
tracker->on_remove(src_e);
|
||||
}
|
||||
@@ -350,7 +347,8 @@ stop_iteration mutation_partition::apply_monotonically(const schema& s, mutation
|
||||
}
|
||||
}
|
||||
if (insert) {
|
||||
_rows.insert_before(i, src_e);
|
||||
rows_type::key_grabber pi_kg(p_i);
|
||||
_rows.insert_before(i, std::move(pi_kg));
|
||||
}
|
||||
} else {
|
||||
auto continuous = i->continuous() || src_e.continuous();
|
||||
@@ -444,7 +442,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
|
||||
check_schema(schema);
|
||||
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
|
||||
|
||||
auto j = _rows.find(key, rows_entry::compare(schema));
|
||||
auto j = _rows.find(key, rows_entry::tri_compare(schema));
|
||||
if (j != _rows.end()) {
|
||||
t.apply(j->row().deleted_at(), j->row().marker());
|
||||
}
|
||||
@@ -531,7 +529,7 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
|
||||
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key, std::move(row)));
|
||||
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
|
||||
_rows.insert_before_hint(_rows.end(), *e, rows_entry::tri_compare(s));
|
||||
e.release();
|
||||
}
|
||||
|
||||
@@ -539,14 +537,14 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
|
||||
check_schema(s);
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, key, row));
|
||||
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
|
||||
_rows.insert_before_hint(_rows.end(), *e, rows_entry::tri_compare(s));
|
||||
e.release();
|
||||
}
|
||||
|
||||
const row*
|
||||
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
@@ -556,11 +554,11 @@ mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(std::move(key)));
|
||||
i = _rows.insert(i, *e, rows_entry::compare(s));
|
||||
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
|
||||
e.release();
|
||||
}
|
||||
return i->row();
|
||||
@@ -569,11 +567,11 @@ mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = _rows.insert(i, *e, rows_entry::compare(s));
|
||||
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
|
||||
e.release();
|
||||
}
|
||||
return i->row();
|
||||
@@ -582,11 +580,11 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = _rows.insert(i, *e, rows_entry::compare(s));
|
||||
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
|
||||
e.release();
|
||||
}
|
||||
return i->row();
|
||||
@@ -595,11 +593,11 @@ mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
check_schema(s);
|
||||
auto i = _rows.find(pos, rows_entry::compare(s));
|
||||
auto i = _rows.find(pos, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
|
||||
i = _rows.insert(i, *e, rows_entry::compare(s));
|
||||
i = _rows.insert_before_hint(i, *e, rows_entry::tri_compare(s)).first;
|
||||
e.release();
|
||||
}
|
||||
return i->row();
|
||||
@@ -608,7 +606,6 @@ mutation_partition::clustered_row(const schema& s, position_in_partition_view po
|
||||
deletable_row&
|
||||
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
check_schema(s);
|
||||
const auto less = rows_entry::compare(s);
|
||||
const auto cmp = rows_entry::tri_compare(s);
|
||||
auto i = _rows.end();
|
||||
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
|
||||
@@ -616,7 +613,7 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
|
||||
", last clustering row is equal or greater: {}", i->key(), std::prev(i)->key()));
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
|
||||
i = _rows.insert(i, *e, less);
|
||||
i = _rows.insert_before_hint(i, *e, cmp).first;
|
||||
e.release();
|
||||
|
||||
return i->row();
|
||||
@@ -628,7 +625,7 @@ mutation_partition::lower_bound(const schema& schema, const query::clustering_ra
|
||||
if (!r.start()) {
|
||||
return std::cbegin(_rows);
|
||||
}
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::compare(schema));
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
|
||||
}
|
||||
|
||||
mutation_partition::rows_type::const_iterator
|
||||
@@ -637,7 +634,7 @@ mutation_partition::upper_bound(const schema& schema, const query::clustering_ra
|
||||
if (!r.end()) {
|
||||
return std::cend(_rows);
|
||||
}
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::compare(schema));
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
|
||||
}
|
||||
|
||||
boost::iterator_range<mutation_partition::rows_type::const_iterator>
|
||||
@@ -1317,6 +1314,7 @@ size_t mutation_partition::external_memory_usage(const schema& s) const {
|
||||
check_schema(s);
|
||||
size_t sum = 0;
|
||||
sum += static_row().external_memory_usage(s, column_kind::static_column);
|
||||
sum += clustered_rows().external_memory_usage();
|
||||
for (auto& clr : clustered_rows()) {
|
||||
sum += clr.memory_usage(s);
|
||||
}
|
||||
@@ -2434,14 +2432,13 @@ void mutation_partition::make_fully_continuous() {
|
||||
}
|
||||
|
||||
void mutation_partition::set_continuity(const schema& s, const position_range& pr, is_continuous cont) {
|
||||
auto less = rows_entry::compare(s);
|
||||
auto cmp = rows_entry::tri_compare(s);
|
||||
|
||||
if (cmp(pr.start(), pr.end()) >= 0) {
|
||||
return; // empty range
|
||||
}
|
||||
|
||||
auto end = _rows.lower_bound(pr.end(), less);
|
||||
auto end = _rows.lower_bound(pr.end(), cmp);
|
||||
if (end == _rows.end() || cmp(pr.end(), end->position()) < 0) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, pr.end(), is_dummy::yes,
|
||||
@@ -2450,7 +2447,7 @@ void mutation_partition::set_continuity(const schema& s, const position_range& p
|
||||
e.release();
|
||||
}
|
||||
|
||||
auto i = _rows.lower_bound(pr.start(), less);
|
||||
auto i = _rows.lower_bound(pr.start(), cmp);
|
||||
if (cmp(pr.start(), i->position()) < 0) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, pr.start(), is_dummy::yes, i->continuous()));
|
||||
@@ -2526,10 +2523,9 @@ stop_iteration mutation_partition::clear_gently(cache_tracker* tracker) noexcept
|
||||
bool
|
||||
mutation_partition::check_continuity(const schema& s, const position_range& r, is_continuous cont) const {
|
||||
check_schema(s);
|
||||
auto less = rows_entry::compare(s);
|
||||
auto cmp = rows_entry::tri_compare(s);
|
||||
auto i = _rows.lower_bound(r.start(), less);
|
||||
auto end = _rows.lower_bound(r.end(), less);
|
||||
auto i = _rows.lower_bound(r.start(), cmp);
|
||||
auto end = _rows.lower_bound(r.end(), cmp);
|
||||
if (cmp(r.start(), r.end()) >= 0) {
|
||||
return bool(cont);
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@
|
||||
#include "hashing_partition_visitor.hh"
|
||||
#include "range_tombstone_list.hh"
|
||||
#include "clustering_key_filter.hh"
|
||||
#include "intrusive_set_external_comparator.hh"
|
||||
#include "utils/intrusive_btree.hh"
|
||||
#include "utils/preempt.hh"
|
||||
#include "utils/managed_ref.hh"
|
||||
|
||||
@@ -1023,7 +1023,7 @@ class cache_tracker;
|
||||
class rows_entry {
|
||||
using lru_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
friend class size_calculator;
|
||||
intrusive_set_external_comparator_member_hook _link;
|
||||
intrusive_b::member_hook _link;
|
||||
clustering_key _key;
|
||||
deletable_row _row;
|
||||
lru_link_type _lru_link;
|
||||
@@ -1039,7 +1039,6 @@ class rows_entry {
|
||||
flags() : _before_ck(0), _after_ck(0), _continuous(true), _dummy(false), _last_dummy(false) { }
|
||||
} _flags{};
|
||||
public:
|
||||
using container_type = intrusive_set_external_comparator<rows_entry, &rows_entry::_link>;
|
||||
using lru_type = bi::list<rows_entry,
|
||||
bi::member_hook<rows_entry, rows_entry::lru_link_type, &rows_entry::_lru_link>,
|
||||
bi::constant_time_size<false>>; // we need this to have bi::auto_unlink on hooks.
|
||||
@@ -1158,6 +1157,8 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
};
|
||||
friend std::ostream& operator<<(std::ostream& os, const printer& p);
|
||||
|
||||
using container_type = intrusive_b::tree<rows_entry, &rows_entry::_link, rows_entry::tri_compare, 12, 20, intrusive_b::key_search::linear>;
|
||||
};
|
||||
|
||||
struct mutation_application_stats {
|
||||
|
||||
@@ -56,7 +56,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
class lsa_partition_reader {
|
||||
const schema& _schema;
|
||||
reader_permit _permit;
|
||||
rows_entry::compare _less;
|
||||
rows_entry::tri_compare _cmp;
|
||||
position_in_partition::equal_compare _eq;
|
||||
heap_compare _heap_cmp;
|
||||
|
||||
@@ -89,10 +89,10 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
}
|
||||
|
||||
for (auto&& v : _snapshot->versions()) {
|
||||
auto cr_end = v.partition().upper_bound(_schema, ck_range);
|
||||
mutation_partition::rows_type::const_iterator cr_end = v.partition().upper_bound(_schema, ck_range);
|
||||
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
|
||||
if (last_row) {
|
||||
return v.partition().clustered_rows().upper_bound(*last_row, _less);
|
||||
return v.partition().clustered_rows().upper_bound(*last_row, _cmp);
|
||||
} else {
|
||||
return v.partition().lower_bound(_schema, ck_range);
|
||||
}
|
||||
@@ -131,7 +131,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
bool digest_requested)
|
||||
: _schema(s)
|
||||
, _permit(std::move(permit))
|
||||
, _less(s)
|
||||
, _cmp(s)
|
||||
, _eq(s)
|
||||
, _heap_cmp(s)
|
||||
, _snapshot(std::move(snp))
|
||||
|
||||
@@ -72,11 +72,11 @@ public:
|
||||
return false;
|
||||
}
|
||||
_change_mark = snp_cm;
|
||||
rows_entry::compare less(*snp.schema());
|
||||
rows_entry::tri_compare cmp(*snp.schema());
|
||||
_in_latest = true;
|
||||
for (auto&& v : snp.versions()) {
|
||||
auto& rows = v.partition().clustered_rows();
|
||||
_it = rows.find(_pos, less);
|
||||
_it = rows.find(_pos, cmp);
|
||||
if (_it != rows.end()) {
|
||||
return true;
|
||||
}
|
||||
@@ -163,7 +163,7 @@ class partition_snapshot_row_cursor final {
|
||||
|
||||
void prepare_heap(position_in_partition_view lower_bound) {
|
||||
memory::on_alloc_point();
|
||||
rows_entry::compare less(_schema);
|
||||
rows_entry::tri_compare cmp(_schema);
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
_heap.clear();
|
||||
_current_row.clear();
|
||||
@@ -174,7 +174,7 @@ class partition_snapshot_row_cursor final {
|
||||
for (auto&& v : _snp.versions()) {
|
||||
unique_owner = unique_owner && (first || !v.is_referenced());
|
||||
auto& rows = v.partition().clustered_rows();
|
||||
auto pos = rows.lower_bound(lower_bound, less);
|
||||
auto pos = rows.lower_bound(lower_bound, cmp);
|
||||
auto end = rows.end();
|
||||
_iterators.push_back(pos);
|
||||
if (pos != end) {
|
||||
@@ -233,11 +233,11 @@ public:
|
||||
// before it and after cursor's position. There cannot be any
|
||||
// insertions for non-latest versions, so we don't have to update them.
|
||||
if (_current_row[0].version_no != 0) {
|
||||
rows_entry::compare less(_schema);
|
||||
rows_entry::tri_compare cmp(_schema);
|
||||
position_in_partition::equal_compare eq(_schema);
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
auto& rows = _snp.version()->partition().clustered_rows();
|
||||
auto it = _iterators[0] = rows.lower_bound(_position, less);
|
||||
auto it = _iterators[0] = rows.lower_bound(_position, cmp);
|
||||
auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
|
||||
if (it == rows.end()) {
|
||||
if (heap_i != _heap.end()) {
|
||||
|
||||
12
row_cache.cc
12
row_cache.cc
@@ -1198,7 +1198,8 @@ void cache_entry::on_evicted(cache_tracker& tracker) noexcept {
|
||||
}
|
||||
|
||||
void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
|
||||
auto it = mutation_partition::rows_type::iterator_to(*this);
|
||||
mutation_partition::rows_type::iterator it(this);
|
||||
|
||||
if (is_last_dummy()) {
|
||||
// Every evictable partition entry must have a dummy entry at the end,
|
||||
// so don't remove it, just unlink from the LRU.
|
||||
@@ -1206,16 +1207,15 @@ void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
|
||||
// with no regular rows, and we need to track them.
|
||||
unlink_from_lru();
|
||||
} else {
|
||||
++it;
|
||||
it = it.erase_and_dispose(current_deleter<rows_entry>());
|
||||
it->set_continuous(false);
|
||||
current_deleter<rows_entry>()(this);
|
||||
tracker.on_row_eviction();
|
||||
}
|
||||
|
||||
if (mutation_partition::rows_type::is_only_member(*it)) {
|
||||
mutation_partition::rows_type* rows = it.tree_if_singular();
|
||||
if (rows != nullptr) {
|
||||
assert(it->is_last_dummy());
|
||||
partition_version& pv = partition_version::container_of(mutation_partition::container_of(
|
||||
mutation_partition::rows_type::container_of_only_member(*it)));
|
||||
partition_version& pv = partition_version::container_of(mutation_partition::container_of(*rows));
|
||||
if (pv.is_referenced_from_entry()) {
|
||||
partition_entry& pe = partition_entry::container_of(pv);
|
||||
if (!pe.is_locked()) {
|
||||
|
||||
@@ -77,7 +77,7 @@ static void add_tombstone(mutation& m, range_tombstone rt) {
|
||||
}
|
||||
|
||||
static void set_row_continuous(mutation_partition& mp, int ck, is_continuous value) {
|
||||
auto it = mp.clustered_rows().find(make_ck(ck), rows_entry::compare(*SCHEMA));
|
||||
auto it = mp.clustered_rows().find(make_ck(ck), rows_entry::tri_compare(*SCHEMA));
|
||||
assert(it != mp.clustered_rows().end());
|
||||
it->set_continuous(value);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user