db: Use mutation_partition_v2 in mvcc

This patch switches memtable and cache to use mutation_partition_v2,
and all affected algorithms accordingly.

The memtable reader was changed to use the same cursor implementation
which cache uses, for improved code reuse and reducing risk of bugs
due to discrepancy of algorithms which deal with MVCC.

Range tombstone eviction in cache has now fine granularity, like with
rows.

Fixes #2578
Fixes #3288
Fixes #10587
This commit is contained in:
Tomasz Grabiec
2022-06-07 12:52:12 +02:00
parent ccf3a13648
commit 026f8cc1e7
14 changed files with 696 additions and 986 deletions

View File

@@ -41,7 +41,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
move_to_underlying,
// Invariants:
// - Upper bound of the read is min(_next_row.position(), _upper_bound)
// - Upper bound of the read is *_underlying_upper_bound
// - _next_row_in_range = _next.position() < _upper_bound
// - _last_row points at a direct predecessor of the next row which is going to be read.
// Used for populating continuity.
@@ -51,46 +51,6 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
end_of_stream
};
enum class source {
cache = 0,
underlying = 1,
};
// Merges range tombstone change streams coming from underlying and the cache.
// Ensures no range tombstone change fragment is emitted when there is no
// actual change in the effective tombstone.
class range_tombstone_change_merger {
const schema& _schema;
position_in_partition _pos;
tombstone _current_tombstone;
std::array<tombstone, 2> _tombstones;
private:
std::optional<range_tombstone_change> do_flush(position_in_partition pos, bool end_of_range) {
std::optional<range_tombstone_change> ret;
position_in_partition::tri_compare cmp(_schema);
const auto res = cmp(_pos, pos);
const auto should_flush = end_of_range ? res <= 0 : res < 0;
if (should_flush) {
auto merged_tomb = std::max(_tombstones.front(), _tombstones.back());
if (merged_tomb != _current_tombstone) {
_current_tombstone = merged_tomb;
ret.emplace(_pos, _current_tombstone);
}
_pos = std::move(pos);
}
return ret;
}
public:
range_tombstone_change_merger(const schema& s) : _schema(s), _pos(position_in_partition::before_all_clustered_rows()), _tombstones{}
{ }
std::optional<range_tombstone_change> apply(source src, range_tombstone_change&& rtc) {
auto ret = do_flush(rtc.position(), false);
_tombstones[static_cast<size_t>(src)] = rtc.tombstone();
return ret;
}
std::optional<range_tombstone_change> flush(position_in_partition_view pos, bool end_of_range) {
return do_flush(position_in_partition(pos), end_of_range);
}
};
partition_snapshot_ptr _snp;
query::clustering_key_filter_ranges _ck_ranges; // Query schema domain, reversed reads use native order
@@ -103,8 +63,11 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
// Holds the lower bound of a position range which hasn't been processed yet.
// Only rows with positions < _lower_bound have been emitted, and only
// range_tombstones with positions <= _lower_bound.
// range_tombstone_changes with positions <= _lower_bound.
//
// Invariant: !_lower_bound.is_clustering_row()
position_in_partition _lower_bound; // Query schema domain
// Invariant: !_upper_bound.is_clustering_row()
position_in_partition_view _upper_bound; // Query schema domain
std::optional<position_in_partition> _underlying_upper_bound; // Query schema domain
@@ -121,22 +84,19 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
read_context& _read_context;
partition_snapshot_row_cursor _next_row;
range_tombstone_change_generator _rt_gen; // cache -> reader
range_tombstone_assembler _rt_assembler; // underlying -> cache
range_tombstone_change_merger _rt_merger; // {cache, underlying} -> reader
// When the read moves to the underlying, the read range will be
// (_lower_bound, x], where x is either _next_row.position() or _upper_bound.
// In the former case (x is _next_row.position()), underlying can emit
// a range tombstone change for after_key(x), which is outside the range.
// We can't push this fragment into the buffer straight away, the cache may
// have fragments with smaller position. So we save it here and flush it when
// a fragment with a larger position is seen.
std::optional<mutation_fragment_v2> _queued_underlying_fragment;
// Holds the currently active range tombstone of the output mutation fragment stream.
// While producing the stream, at any given time, _current_tombstone applies to the
// key range which extends at least to _lower_bound. When consuming subsequent interval,
// which will advance _lower_bound further, be it from underlying or from cache,
// a decision is made whether the range tombstone in the next interval is the same as
// the current one or not. If it is different, then range_tombstone_change is emitted
// with the old _lower_bound value (start of the next interval).
tombstone _current_tombstone;
state _state = state::before_static_row;
bool _next_row_in_range = false;
bool _has_rt = false;
// True iff current population interval, since the previous clustering row, starts before all clustered rows.
// We cannot just look at _lower_bound, because emission of range tombstones changes _lower_bound and
@@ -145,11 +105,6 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
// Valid when _state == reading_from_underlying.
bool _population_range_starts_before_all_rows;
// Whether _lower_bound was changed within current fill_buffer().
// If it did not then we cannot break out of it (e.g. on preemption) because
// forward progress is not guaranteed in case iterators are getting constantly invalidated.
bool _lower_bound_changed = false;
// Points to the underlying reader conforming to _schema,
// either to *_underlying_holder or _read_context.underlying().underlying().
flat_mutation_reader_v2* _underlying = nullptr;
@@ -163,14 +118,11 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
void move_to_next_range();
void move_to_range(query::clustering_row_ranges::const_iterator);
void move_to_next_entry();
void maybe_drop_last_entry() noexcept;
void flush_tombstones(position_in_partition_view, bool end_of_range = false);
void maybe_drop_last_entry(tombstone) noexcept;
void add_to_buffer(const partition_snapshot_row_cursor&);
void add_clustering_row_to_buffer(mutation_fragment_v2&&);
void add_to_buffer(range_tombstone_change&&, source);
void do_add_to_buffer(range_tombstone_change&&);
void add_range_tombstone_to_buffer(range_tombstone&&);
void add_to_buffer(mutation_fragment_v2&&);
void add_to_buffer(range_tombstone_change&&);
void offer_from_underlying(mutation_fragment_v2&&);
future<> read_from_underlying();
void start_reading_from_underlying();
bool after_current_range(position_in_partition_view position);
@@ -189,7 +141,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
bool ensure_population_lower_bound();
void maybe_add_to_cache(const mutation_fragment_v2& mf);
void maybe_add_to_cache(const clustering_row& cr);
void maybe_add_to_cache(const range_tombstone_change& rtc);
bool maybe_add_to_cache(const range_tombstone_change& rtc);
void maybe_add_to_cache(const static_row& sr);
void maybe_set_static_row_continuous();
void finish_reader() {
@@ -244,8 +196,6 @@ public:
, _read_context_holder()
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
, _rt_gen(*_schema)
, _rt_merger(*_schema)
{
clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(),
fmt::ptr(&*_snp));
@@ -373,13 +323,31 @@ future<> cache_flat_mutation_reader::do_fill_buffer() {
}
_state = state::reading_from_underlying;
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema) && !_read_context.is_reversed();
_underlying_upper_bound = _next_row_in_range ? position_in_partition::before_key(_next_row.position())
: position_in_partition(_upper_bound);
if (!_read_context.partition_exists()) {
clogger.trace("csm {}: partition does not exist", fmt::ptr(this));
if (_current_tombstone) {
clogger.trace("csm {}: move_to_underlying: emit rtc({}, null)", fmt::ptr(this), _lower_bound);
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, range_tombstone_change(_lower_bound, {})));
_current_tombstone = {};
}
return read_from_underlying();
}
_underlying_upper_bound = _next_row_in_range ? position_in_partition(_next_row.position())
: position_in_partition(_upper_bound);
return _underlying->fast_forward_to(position_range{_lower_bound, *_underlying_upper_bound}).then([this] {
return read_from_underlying();
if (!_current_tombstone) {
return read_from_underlying();
}
return _underlying->peek().then([this] (mutation_fragment_v2* mf) {
position_in_partition::equal_compare eq(*_schema);
if (!mf || !mf->is_range_tombstone_change()
|| !eq(mf->as_range_tombstone_change().position(), _lower_bound)) {
clogger.trace("csm {}: move_to_underlying: emit rtc({}, null)", fmt::ptr(this), _lower_bound);
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, range_tombstone_change(_lower_bound, {})));
_current_tombstone = {};
}
return read_from_underlying();
});
});
}
if (_state == state::reading_from_underlying) {
@@ -388,8 +356,8 @@ future<> cache_flat_mutation_reader::do_fill_buffer() {
// assert(_state == state::reading_from_cache)
return _lsa_manager.run_in_read_section([this] {
auto next_valid = _next_row.iterators_valid();
clogger.trace("csm {}: reading_from_cache, range=[{}, {}), next={}, valid={}", fmt::ptr(this), _lower_bound,
_upper_bound, _next_row.position(), next_valid);
clogger.trace("csm {}: reading_from_cache, range=[{}, {}), next={}, valid={}, rt={}", fmt::ptr(this), _lower_bound,
_upper_bound, _next_row.position(), next_valid, _current_tombstone);
// We assume that if there was eviction, and thus the range may
// no longer be continuous, the cursor was invalidated.
if (!next_valid) {
@@ -403,13 +371,9 @@ future<> cache_flat_mutation_reader::do_fill_buffer() {
}
_next_row.maybe_refresh();
clogger.trace("csm {}: next={}", fmt::ptr(this), _next_row);
_lower_bound_changed = false;
while (_state == state::reading_from_cache) {
copy_from_cache_to_buffer();
// We need to check _lower_bound_changed even if is_buffer_full() because
// we may have emitted only a range tombstone which overlapped with _lower_bound
// and thus didn't cause _lower_bound to change.
if ((need_preempt() || is_buffer_full()) && _lower_bound_changed) {
if (need_preempt() || is_buffer_full()) {
break;
}
}
@@ -423,37 +387,38 @@ future<> cache_flat_mutation_reader::read_from_underlying() {
[this] { return _state != state::reading_from_underlying || is_buffer_full(); },
[this] (mutation_fragment_v2 mf) {
_read_context.cache().on_row_miss();
maybe_add_to_cache(mf);
add_to_buffer(std::move(mf));
offer_from_underlying(std::move(mf));
},
[this] {
_lower_bound = std::move(*_underlying_upper_bound);
_underlying_upper_bound.reset();
_state = state::reading_from_cache;
_lsa_manager.run_in_update_section([this] {
auto same_pos = _next_row.maybe_refresh();
clogger.trace("csm {}: underlying done, in_range={}, same={}, next={}", fmt::ptr(this), _next_row_in_range, same_pos, _next_row);
if (!same_pos) {
_read_context.cache().on_mispopulate(); // FIXME: Insert dummy entry at _upper_bound.
_read_context.cache().on_mispopulate(); // FIXME: Insert dummy entry at _lower_bound.
_next_row_in_range = !after_current_range(_next_row.position());
if (!_next_row.continuous()) {
_last_row = nullptr; // We did not populate the full range up to _lower_bound, break continuity
start_reading_from_underlying();
}
return;
}
if (_next_row_in_range) {
maybe_update_continuity();
if (!_next_row.dummy()) {
_lower_bound = position_in_partition::before_key(_next_row.key());
} else {
_lower_bound = _next_row.position();
}
} else {
if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
this->maybe_update_continuity();
} else if (can_populate()) {
if (can_populate()) {
const schema& table_s = table_schema();
rows_entry::tri_compare cmp(table_s);
auto& rows = _snp->version()->partition().mutable_clustered_rows();
if (query::is_single_row(*_schema, *_ck_ranges_curr)) {
// If there are range tombstones which apply to the row then
// we cannot insert an empty entry here because if those range
// tombstones got evicted by now, we will insert an entry
// with missing range tombstone information.
// FIXME: try to set the range tombstone when possible.
if (!_has_rt) {
with_allocator(_snp->region().allocator(), [&] {
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(_ck_ranges_curr->start()->value()));
@@ -466,9 +431,10 @@ future<> cache_flat_mutation_reader::read_from_underlying() {
// Also works in reverse read mode.
// It preserves the continuity of the range the entry falls into.
it->set_continuous(next->continuous());
clogger.trace("csm {}: inserted empty row at {}, cont={}", fmt::ptr(this), it->position(), it->continuous());
clogger.trace("csm {}: inserted empty row at {}, cont={}, rt={}", fmt::ptr(this), it->position(), it->continuous(), it->range_tombstone());
}
});
}
} else if (ensure_population_lower_bound()) {
with_allocator(_snp->region().allocator(), [&] {
auto e = alloc_strategy_unique_ptr<rows_entry>(
@@ -480,13 +446,15 @@ future<> cache_flat_mutation_reader::read_from_underlying() {
_snp->tracker()->insert(*insert_result.first);
}
if (_read_context.is_reversed()) [[unlikely]] {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), _last_row.position());
clogger.trace("csm {}: set_continuous({}), prev={}, rt={}", fmt::ptr(this), _last_row.position(), insert_result.first->position(), _current_tombstone);
_last_row->set_continuous(true);
_last_row->set_range_tombstone(_current_tombstone);
} else {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), insert_result.first->position());
clogger.trace("csm {}: set_continuous({}), prev={}, rt={}", fmt::ptr(this), insert_result.first->position(), _last_row.position(), _current_tombstone);
insert_result.first->set_continuous(true);
insert_result.first->set_range_tombstone(_current_tombstone);
}
maybe_drop_last_entry();
maybe_drop_last_entry(_current_tombstone);
});
}
} else {
@@ -515,55 +483,103 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
// Continuity flag we will later set for the upper bound extends to the previous row in the same version,
// so we need to ensure we have an entry in the latest version.
if (!_last_row.is_in_latest_version()) {
with_allocator(_snp->region().allocator(), [&] {
auto& rows = _snp->version()->partition().mutable_clustered_rows();
rows_entry::tri_compare cmp(table_schema());
// FIXME: Avoid the copy by inserting an incomplete clustering row
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(table_schema(), *_last_row));
e->set_continuous(false);
auto insert_result = rows.insert_before_hint(rows.end(), std::move(e), cmp);
if (insert_result.second) {
auto it = insert_result.first;
clogger.trace("csm {}: inserted lower bound dummy at {}", fmt::ptr(this), it->position());
_snp->tracker()->insert(*it);
}
_last_row.set_latest(insert_result.first);
rows_entry::tri_compare cmp(*_schema);
partition_snapshot_row_cursor cur(*_schema, *_snp, false, _read_context.is_reversed());
if (!cur.advance_to(_last_row.position())) {
return false;
}
if (cmp(cur.position(), _last_row.position()) != 0) {
return false;
}
auto res = with_allocator(_snp->region().allocator(), [&] {
return cur.ensure_entry_in_latest();
});
_last_row.set_latest(res.it);
if (res.inserted) {
clogger.trace("csm {}: inserted lower bound dummy at {}", fmt::ptr(this), _last_row.position());
}
}
return true;
}
inline
void cache_flat_mutation_reader::maybe_update_continuity() {
if (can_populate() && ensure_population_lower_bound()) {
position_in_partition::equal_compare eq(*_schema);
if (can_populate()
&& ensure_population_lower_bound()
&& !eq(_last_row.position(), _next_row.position())) {
with_allocator(_snp->region().allocator(), [&] {
rows_entry& e = _next_row.ensure_entry_in_latest().row;
auto& rows = _snp->version()->partition().mutable_clustered_rows();
const schema& table_s = table_schema();
rows_entry::tri_compare table_cmp(table_s);
if (_read_context.is_reversed()) [[unlikely]] {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), _last_row.position());
_last_row->set_continuous(true);
if (_current_tombstone != _last_row->range_tombstone() && !_last_row->dummy()) {
with_allocator(_snp->region().allocator(), [&] {
auto e2 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(table_s,
position_in_partition_view::before_key(_last_row->position()),
is_dummy::yes,
is_continuous::yes));
auto insert_result = rows.insert(std::move(e2), table_cmp);
if (insert_result.second) {
clogger.trace("csm {}: inserted dummy at {}", fmt::ptr(this), insert_result.first->position());
_snp->tracker()->insert(*insert_result.first);
}
clogger.trace("csm {}: set_continuous({}), prev={}, rt={}", fmt::ptr(this), insert_result.first->position(),
_last_row.position(), _current_tombstone);
insert_result.first->set_continuous(true);
insert_result.first->set_range_tombstone(_current_tombstone);
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), _last_row.position());
_last_row->set_continuous(true);
});
} else {
clogger.trace("csm {}: set_continuous({}), rt={}", fmt::ptr(this), _last_row.position(), _current_tombstone);
_last_row->set_continuous(true);
_last_row->set_range_tombstone(_current_tombstone);
}
} else {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), e.position());
e.set_continuous(true);
if (_current_tombstone != e.range_tombstone() && !e.dummy()) {
with_allocator(_snp->region().allocator(), [&] {
auto e2 = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(table_s,
position_in_partition_view::before_key(e.position()),
is_dummy::yes,
is_continuous::yes));
// Use _next_row iterator only as a hint because there could be insertions before
// _next_row.get_iterator_in_latest_version(), either from concurrent reads,
// from _next_row.ensure_entry_in_latest().
auto insert_result = rows.insert_before_hint(_next_row.get_iterator_in_latest_version(), std::move(e2), table_cmp);
if (insert_result.second) {
clogger.trace("csm {}: inserted dummy at {}", fmt::ptr(this), insert_result.first->position());
_snp->tracker()->insert(*insert_result.first);
}
clogger.trace("csm {}: set_continuous({}), prev={}, rt={}", fmt::ptr(this), insert_result.first->position(),
_last_row.position(), _current_tombstone);
insert_result.first->set_continuous(true);
insert_result.first->set_range_tombstone(_current_tombstone);
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), e.position());
e.set_continuous(true);
});
} else {
clogger.trace("csm {}: set_continuous({}), rt={}", fmt::ptr(this), e.position(), _current_tombstone);
e.set_range_tombstone(_current_tombstone);
e.set_continuous(true);
}
}
maybe_drop_last_entry();
maybe_drop_last_entry(_current_tombstone);
});
} else {
_read_context.cache().on_mispopulate();
}
}
inline
void cache_flat_mutation_reader::maybe_add_to_cache(const mutation_fragment_v2& mf) {
if (mf.is_range_tombstone_change()) {
maybe_add_to_cache(mf.as_range_tombstone_change());
} else {
assert(mf.is_clustering_row());
const clustering_row& cr = mf.as_clustering_row();
maybe_add_to_cache(cr);
}
}
inline
void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
if (!can_populate()) {
@@ -572,16 +588,9 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
_read_context.cache().on_mispopulate();
return;
}
auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(*_schema, cr.key()));
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
_lsa_manager.run_in_update_section_with_allocator([this, &cr, &rt_opt] {
mutation_partition& mp = _snp->version()->partition();
if (rt_opt) {
clogger.trace("csm {}: populate flushed rt({})", fmt::ptr(this), *rt_opt);
mp.mutable_row_tombstones().apply_monotonically(table_schema(), to_table_domain(range_tombstone(*rt_opt)));
}
clogger.trace("csm {}: populate({}), rt={}", fmt::ptr(this), clustering_row::printer(*_schema, cr), _current_tombstone);
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
mutation_partition_v2& mp = _snp->version()->partition();
rows_entry::tri_compare cmp(table_schema());
if (_read_context.digest_requested()) {
@@ -590,6 +599,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(table_schema(), cr.key(), cr.as_deletable_row()));
new_entry->set_continuous(false);
new_entry->set_range_tombstone(_current_tombstone);
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
: mp.clustered_rows().lower_bound(cr.key(), cmp);
auto insert_result = mp.mutable_clustered_rows().insert_before_hint(it, std::move(new_entry), cmp);
@@ -603,9 +613,14 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
if (_read_context.is_reversed()) [[unlikely]] {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), _last_row.position());
_last_row->set_continuous(true);
// _current_tombstone must also apply to _last_row itself (if it's non-dummy)
// because otherwise there would be a rtc after it, either creating a different entry,
// or clearing _last_row if population did not happen.
_last_row->set_range_tombstone(_current_tombstone);
} else {
clogger.trace("csm {}: set_continuous({})", fmt::ptr(this), e.position());
e.set_continuous(true);
e.set_range_tombstone(_current_tombstone);
}
} else {
_read_context.cache().on_mispopulate();
@@ -617,6 +632,72 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
});
}
inline
bool cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone_change& rtc) {
rows_entry::tri_compare q_cmp(*_schema);
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rtc);
// Don't emit the closing range tombstone change, we may continue from cache with the same tombstone.
// The following relies on !_underlying_upper_bound->is_clustering_row()
if (q_cmp(rtc.position(), *_underlying_upper_bound) == 0) {
_lower_bound = rtc.position();
return false;
}
auto prev = std::exchange(_current_tombstone, rtc.tombstone());
if (_current_tombstone == prev) {
return false;
}
if (!can_populate()) {
// _current_tombstone is now invalid and remains so for this reader. No need to change it.
_last_row = nullptr;
_population_range_starts_before_all_rows = false;
_read_context.cache().on_mispopulate();
return true;
}
_lsa_manager.run_in_update_section_with_allocator([&] {
mutation_partition_v2& mp = _snp->version()->partition();
rows_entry::tri_compare cmp(table_schema());
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(table_schema(), to_table_domain(rtc.position()), is_dummy::yes, is_continuous::no));
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
: mp.clustered_rows().lower_bound(to_table_domain(rtc.position()), cmp);
auto insert_result = mp.mutable_clustered_rows().insert_before_hint(it, std::move(new_entry), cmp);
it = insert_result.first;
if (insert_result.second) {
_snp->tracker()->insert(*it);
}
rows_entry& e = *it;
if (ensure_population_lower_bound()) {
// underlying may emit range_tombstone_change fragments with the same position.
// In such case, the range to which the tombstone from the first fragment applies is empty and should be ignored.
if (q_cmp(_last_row.position(), it->position()) < 0) {
if (_read_context.is_reversed()) [[unlikely]] {
clogger.trace("csm {}: set_continuous({}), rt={}", fmt::ptr(this), _last_row.position(), prev);
_last_row->set_continuous(true);
_last_row->set_range_tombstone(prev);
} else {
clogger.trace("csm {}: set_continuous({}), rt={}", fmt::ptr(this), e.position(), prev);
e.set_continuous(true);
e.set_range_tombstone(prev);
}
}
} else {
_read_context.cache().on_mispopulate();
}
with_allocator(standard_allocator(), [&] {
_last_row = partition_snapshot_row_weakref(*_snp, it, true);
});
_population_range_starts_before_all_rows = false;
});
return true;
}
inline
bool cache_flat_mutation_reader::after_current_range(position_in_partition_view p) {
position_in_partition::tri_compare cmp(*_schema);
@@ -632,19 +713,35 @@ void cache_flat_mutation_reader::start_reading_from_underlying() {
inline
void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
clogger.trace("csm {}: copy_from_cache, next={}, next_row_in_range={}", fmt::ptr(this), _next_row.position(), _next_row_in_range);
clogger.trace("csm {}: copy_from_cache, next_row_in_range={}, next={}", fmt::ptr(this), _next_row_in_range, _next_row);
_next_row.touch();
auto next_lower_bound = position_in_partition_view::after_key(table_schema(), _next_row.position());
auto upper_bound = _next_row_in_range ? next_lower_bound.view : _upper_bound;
if (_snp->range_tombstones(_lower_bound, upper_bound, [&] (range_tombstone rts) {
add_range_tombstone_to_buffer(std::move(rts));
return stop_iteration(_lower_bound_changed && is_buffer_full());
}, _read_context.is_reversed()) == stop_iteration::no) {
return;
if (_next_row.range_tombstone() != _current_tombstone) {
position_in_partition::equal_compare eq(*_schema);
auto upper_bound = _next_row_in_range ? position_in_partition_view::before_key(_next_row.position()) : _upper_bound;
if (!eq(_lower_bound, upper_bound)) {
position_in_partition new_lower_bound(upper_bound);
auto tomb = _next_row.range_tombstone();
clogger.trace("csm {}: rtc({}, {}) ...{}", fmt::ptr(this), _lower_bound, tomb, new_lower_bound);
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, range_tombstone_change(_lower_bound, tomb)));
_current_tombstone = tomb;
_lower_bound = std::move(new_lower_bound);
_read_context.cache()._tracker.on_range_tombstone_read();
}
}
// We add the row to the buffer even when it's full.
// This simplifies the code. For more info see #3139.
if (_next_row_in_range) {
if (_next_row.range_tombstone_for_row() != _current_tombstone) [[unlikely]] {
auto tomb = _next_row.range_tombstone_for_row();
auto new_lower_bound = position_in_partition::before_key(_next_row.position());
clogger.trace("csm {}: rtc({}, {})", fmt::ptr(this), new_lower_bound, tomb);
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, range_tombstone_change(new_lower_bound, tomb)));
_lower_bound = std::move(new_lower_bound);
_current_tombstone = tomb;
_read_context.cache()._tracker.on_range_tombstone_read();
}
add_to_buffer(_next_row);
move_to_next_entry();
} else {
@@ -660,10 +757,11 @@ void cache_flat_mutation_reader::move_to_end() {
inline
void cache_flat_mutation_reader::move_to_next_range() {
if (_queued_underlying_fragment) {
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
if (_current_tombstone) {
clogger.trace("csm {}: move_to_next_range: emit rtc({}, null)", fmt::ptr(this), _upper_bound);
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, range_tombstone_change(_upper_bound, {})));
_current_tombstone = {};
}
flush_tombstones(position_in_partition::for_range_end(*_ck_ranges_curr), true);
auto next_it = std::next(_ck_ranges_curr);
if (next_it == _ck_ranges_end) {
move_to_end();
@@ -680,8 +778,6 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
_last_row = nullptr;
_lower_bound = std::move(lb);
_upper_bound = std::move(ub);
_rt_gen.trim(_lower_bound);
_lower_bound_changed = true;
_ck_ranges_curr = next_it;
auto adjacent = _next_row.advance_to(_lower_bound);
_next_row_in_range = !after_current_range(_next_row.position());
@@ -722,7 +818,7 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
// _next_row must have a greater position than _last_row.
// Invalidates references but keeps the _next_row valid.
inline
void cache_flat_mutation_reader::maybe_drop_last_entry() noexcept {
void cache_flat_mutation_reader::maybe_drop_last_entry(tombstone rt) noexcept {
// Drop dummy entry if it falls inside a continuous range.
// This prevents unnecessary dummy entries from accumulating in cache and slowing down scans.
//
@@ -733,9 +829,12 @@ void cache_flat_mutation_reader::maybe_drop_last_entry() noexcept {
&& !_read_context.is_reversed() // FIXME
&& _last_row->dummy()
&& _last_row->continuous()
&& _last_row->range_tombstone() == rt
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {
clogger.trace("csm {}: dropping unnecessary dummy at {}", fmt::ptr(this), _last_row->position());
with_allocator(_snp->region().allocator(), [&] {
cache_tracker& tracker = _read_context.cache()._tracker;
tracker.get_lru().remove(*_last_row);
@@ -769,57 +868,38 @@ void cache_flat_mutation_reader::move_to_next_entry() {
if (!_next_row.continuous()) {
start_reading_from_underlying();
} else {
maybe_drop_last_entry();
maybe_drop_last_entry(_next_row.range_tombstone());
}
}
}
void cache_flat_mutation_reader::flush_tombstones(position_in_partition_view pos_, bool end_of_range) {
// Ensure position is appropriate for range tombstone bound
auto pos = position_in_partition_view::after_key(*_schema, pos_);
clogger.trace("csm {}: flush_tombstones({}) end_of_range: {}", fmt::ptr(this), pos.view, end_of_range);
_rt_gen.flush(pos.view, [this] (range_tombstone_change&& rtc) {
add_to_buffer(std::move(rtc), source::cache);
}, end_of_range);
if (auto rtc_opt = _rt_merger.flush(pos.view, end_of_range)) {
do_add_to_buffer(std::move(*rtc_opt));
}
}
inline
void cache_flat_mutation_reader::add_to_buffer(mutation_fragment_v2&& mf) {
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), mutation_fragment_v2::printer(*_schema, mf));
position_in_partition::less_compare less(*_schema);
if (_underlying_upper_bound && less(*_underlying_upper_bound, mf.position())) {
_queued_underlying_fragment = std::move(mf);
return;
}
flush_tombstones(mf.position());
void cache_flat_mutation_reader::offer_from_underlying(mutation_fragment_v2&& mf) {
clogger.trace("csm {}: offer_from_underlying({})", fmt::ptr(this), mutation_fragment_v2::printer(*_schema, mf));
if (mf.is_clustering_row()) {
maybe_add_to_cache(mf.as_clustering_row());
add_clustering_row_to_buffer(std::move(mf));
} else {
assert(mf.is_range_tombstone_change());
add_to_buffer(std::move(mf).as_range_tombstone_change(), source::underlying);
auto& chg = mf.as_range_tombstone_change();
if (maybe_add_to_cache(chg)) {
add_to_buffer(std::move(mf).as_range_tombstone_change());
}
}
}
inline
void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_cursor& row) {
position_in_partition::less_compare less(*_schema);
if (_queued_underlying_fragment && less(_queued_underlying_fragment->position(), row.position())) {
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
}
if (!row.dummy()) {
_read_context.cache().on_row_hit();
if (_read_context.digest_requested()) {
row.latest_row().cells().prepare_hash(table_schema(), column_kind::regular_column);
}
flush_tombstones(position_in_partition_view::for_key(row.key()));
add_clustering_row_to_buffer(mutation_fragment_v2(*_schema, _permit, row.row()));
} else {
if (less(_lower_bound, row.position())) {
_lower_bound = row.position();
_lower_bound_changed = true;
}
_read_context.cache()._tracker.on_dummy_row_hit();
}
@@ -835,64 +915,21 @@ void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment_
auto new_lower_bound = position_in_partition::after_key(*_schema, row.key());
push_mutation_fragment(std::move(mf));
_lower_bound = std::move(new_lower_bound);
_lower_bound_changed = true;
if (row.tomb()) {
_read_context.cache()._tracker.on_row_tombstone_read();
}
}
inline
void cache_flat_mutation_reader::add_to_buffer(range_tombstone_change&& rtc, source src) {
void cache_flat_mutation_reader::add_to_buffer(range_tombstone_change&& rtc) {
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rtc);
if (auto rtc_opt = _rt_merger.apply(src, std::move(rtc))) {
do_add_to_buffer(std::move(*rtc_opt));
}
}
inline
void cache_flat_mutation_reader::do_add_to_buffer(range_tombstone_change&& rtc) {
clogger.trace("csm {}: push({})", fmt::ptr(this), rtc);
_has_rt = true;
position_in_partition::less_compare less(*_schema);
auto lower_bound_changed = less(_lower_bound, rtc.position());
_lower_bound = position_in_partition(rtc.position());
_lower_bound_changed = lower_bound_changed;
push_mutation_fragment(*_schema, _permit, std::move(rtc));
_read_context.cache()._tracker.on_range_tombstone_read();
}
inline
void cache_flat_mutation_reader::add_range_tombstone_to_buffer(range_tombstone&& rt) {
position_in_partition::less_compare less(*_schema);
if (_queued_underlying_fragment && less(_queued_underlying_fragment->position(), rt.position())) {
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
}
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rt);
if (!less(_lower_bound, rt.position())) {
rt.set_start(_lower_bound);
}
flush_tombstones(rt.position());
_rt_gen.consume(std::move(rt));
}
inline
void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone_change& rtc) {
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rtc);
auto rt_opt = _rt_assembler.consume(*_schema, range_tombstone_change(rtc));
if (!rt_opt) {
return;
}
const auto& rt = *rt_opt;
if (can_populate()) {
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rt);
_lsa_manager.run_in_update_section_with_allocator([&] {
_snp->version()->partition().mutable_row_tombstones().apply_monotonically(
table_schema(), to_table_domain(rt));
});
} else {
_read_context.cache().on_mispopulate();
}
}
inline
void cache_flat_mutation_reader::maybe_add_to_cache(const static_row& sr) {
if (can_populate()) {

View File

@@ -50,6 +50,8 @@ public:
uint64_t rows_processed_from_memtable;
uint64_t rows_dropped_from_memtable;
uint64_t rows_merged_from_memtable;
uint64_t dummy_processed_from_memtable;
uint64_t rows_covered_by_range_tombstones_from_memtable;
uint64_t partition_evictions;
uint64_t partition_removals;
uint64_t row_evictions;
@@ -120,6 +122,7 @@ public:
mutation_cleaner& memtable_cleaner() noexcept { return _memtable_cleaner; }
uint64_t partitions() const noexcept { return _stats.partitions; }
const stats& get_stats() const noexcept { return _stats; }
stats& get_stats() noexcept { return _stats; }
void set_compaction_scheduling_group(seastar::scheduling_group);
lru& get_lru() { return _lru; }
};

View File

@@ -961,6 +961,14 @@ mutation_partition_v2::upgrade(const schema& old_schema, const schema& new_schem
*this = mutation_partition_v2(new_schema, std::move(tmp));
}
mutation_partition mutation_partition_v2::as_mutation_partition(const schema& s) const {
mutation_partition tmp(s.shared_from_this());
tmp.set_static_row_continuous(_static_row_continuous);
partition_builder v(s, tmp);
accept(s, v);
return tmp;
}
mutation_partition_v2::mutation_partition_v2(mutation_partition_v2::incomplete_tag, const schema& s, tombstone t)
: _tombstone(t)
, _static_row_continuous(!s.has_static_columns())

View File

@@ -221,6 +221,8 @@ public:
//
// Strong exception guarantees.
void upgrade(const schema& old_schema, const schema& new_schema);
mutation_partition as_mutation_partition(const schema&) const;
private:
// Erases the entry if it's safe to do so without changing the logical state of the partition.
rows_type::iterator maybe_drop(const schema&, cache_tracker*, rows_type::iterator, mutation_application_stats&);

View File

@@ -11,48 +11,39 @@
#include "partition_version.hh"
#include "readers/flat_mutation_reader_fwd.hh"
#include "readers/flat_mutation_reader_v2.hh"
#include "readers/range_tombstone_change_merger.hh"
#include "clustering_key_filter.hh"
#include "query-request.hh"
#include "partition_snapshot_row_cursor.hh"
#include <boost/range/algorithm/heap_algorithm.hpp>
#include <any>
extern seastar::logger mplog;
template <bool Reversing, typename Accounter>
class partition_snapshot_flat_reader : public flat_mutation_reader_v2::impl, public Accounter {
using rows_iter_type = std::conditional_t<Reversing,
mutation_partition::rows_type::const_reverse_iterator,
mutation_partition::rows_type::const_iterator>;
struct rows_position {
rows_iter_type _position, _end;
struct row_info {
mutation_fragment_v2 row;
tombstone rt_for_row;
};
static rows_iter_type make_iterator(mutation_partition::rows_type::const_iterator it) {
if constexpr (Reversing) {
return std::make_reverse_iterator(it);
} else {
return it;
}
}
// Represents a subset of mutations for some clustering key range.
//
// The range of the interval starts at the upper bound of the previous
// interval and its end depends on the contents of info:
// - position_in_partition: holds the upper bound of the interval
// - row_info: after_key(row_info::row.as_clustering_row().key())
// - monostate: upper bound is the end of the current clustering key range
//
// All positions in query schema domain.
struct interval_info {
// Applies to the whole range of the interval.
tombstone range_tombstone;
class heap_compare {
position_in_partition::less_compare _less;
public:
// `s` shall be native to the query clustering order.
explicit heap_compare(const schema& s) : _less(s) { }
bool operator()(const rows_position& a, const rows_position& b) {
if constexpr (Reversing) {
// Here `reversed()` doesn't change anything, but keep it for consistency.
return _less(b._position->position().reversed(), a._position->position().reversed());
} else {
return _less(b._position->position(), a._position->position());
}
}
bool operator()(const range_tombstone_list::iterator_range& a, const range_tombstone_list::iterator_range& b) {
if constexpr (Reversing) {
return _less(b.back().end_position().reversed(), a.back().end_position().reversed());
} else {
return _less(b.front().position(), a.front().position());
}
}
// monostate means no more rows (end of range).
// position_in_partition means there is no row, it is the upper bound of the interval.
// if row_info, the upper bound is after_key(row_info::row.as_clustering_row().key()).
std::variant<row_info, position_in_partition, std::monostate> info;
};
// The part of the reader that accesses LSA memory directly and works
@@ -69,20 +60,12 @@ class partition_snapshot_flat_reader : public flat_mutation_reader_v2::impl, pub
// instance, if a query used newer version of the schema.
const schema_ptr _snapshot_schema;
reader_permit _permit;
heap_compare _heap_cmp;
partition_snapshot_ptr _snapshot;
logalloc::region& _region;
logalloc::allocating_section& _read_section;
partition_snapshot::change_mark _change_mark;
std::vector<rows_position> _clustering_rows;
std::vector<range_tombstone_list::iterator_range> _range_tombstones;
range_tombstone_stream _rt_stream;
partition_snapshot_row_cursor _cursor;
bool _digest_requested;
bool _done = false;
private:
template<typename Function>
decltype(auto) in_alloc_section(Function&& fn) {
@@ -90,131 +73,6 @@ class partition_snapshot_flat_reader : public flat_mutation_reader_v2::impl, pub
return fn();
});
}
void maybe_refresh_state(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
auto mark = _snapshot->get_change_mark();
if (mark != _change_mark) {
do_refresh_state(ck_range_snapshot, last_row, last_rts);
_change_mark = mark;
}
}
// In reversing mode, upper and lower bounds still need to be executed against
// snapshot schema and ck_range, however we need them to search from "opposite" direction.
template<typename T, typename... Args>
static rows_iter_type lower_bound(const T& t, Args&&... args) {
if constexpr (Reversing) {
return make_iterator(t.upper_bound(std::forward<Args>(args)...));
} else {
return make_iterator(t.lower_bound(std::forward<Args>(args)...));
}
}
template<typename T, typename... Args>
static rows_iter_type upper_bound(const T& t, Args&&... args) {
if constexpr (Reversing) {
return make_iterator(t.lower_bound(std::forward<Args>(args)...));
} else {
return make_iterator(t.upper_bound(std::forward<Args>(args)...));
}
}
void do_refresh_state(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
_clustering_rows.clear();
_range_tombstones.clear();
rows_entry::tri_compare rows_cmp(*_snapshot_schema);
for (auto&& v : _snapshot->versions()) {
auto cr = [&] () {
if (last_row) {
return upper_bound(v.partition().clustered_rows(), *last_row, rows_cmp);
} else {
return lower_bound(v.partition(), *_snapshot_schema, ck_range_snapshot);
}
}();
auto cr_end = upper_bound(v.partition(), *_snapshot_schema, ck_range_snapshot);
if (cr != cr_end) {
_clustering_rows.emplace_back(rows_position { cr, cr_end });
}
range_tombstone_list::iterator_range rt_slice = [&] () {
const auto& tombstones = v.partition().row_tombstones();
if (last_rts) {
if constexpr (Reversing) {
return tombstones.lower_slice(*_snapshot_schema, bound_view::from_range_start(ck_range_snapshot), *last_rts);
} else {
return tombstones.upper_slice(*_snapshot_schema, *last_rts, bound_view::from_range_end(ck_range_snapshot));
}
} else {
return tombstones.slice(*_snapshot_schema, ck_range_snapshot);
}
}();
if (rt_slice.begin() != rt_slice.end()) {
_range_tombstones.emplace_back(std::move(rt_slice));
}
}
boost::range::make_heap(_clustering_rows, _heap_cmp);
boost::range::make_heap(_range_tombstones, _heap_cmp);
}
// Valid if has_more_rows()
const rows_entry& pop_clustering_row() {
boost::range::pop_heap(_clustering_rows, _heap_cmp);
auto& current = _clustering_rows.back();
const rows_entry& e = *current._position;
current._position = std::next(current._position);
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, _heap_cmp);
}
return e;
}
range_tombstone pop_range_tombstone() {
boost::range::pop_heap(_range_tombstones, _heap_cmp);
auto& current = _range_tombstones.back();
range_tombstone rt = (Reversing ? std::prev(current.end()) : current.begin())->tombstone();
if constexpr (Reversing) {
current.advance_end(-1);
rt.reverse();
} else {
current.advance_begin(1);
}
if (current.begin() == current.end()) {
_range_tombstones.pop_back();
} else {
boost::range::push_heap(_range_tombstones, _heap_cmp);
}
return rt;
}
// Valid if has_more_rows()
const rows_entry& peek_row() const {
return *_clustering_rows.front()._position;
}
bool has_more_rows() const {
return !_clustering_rows.empty();
}
// Let's not lose performance when not Reversing.
using peeked_range_tombstone = std::conditional_t<Reversing, range_tombstone, const range_tombstone&>;
peeked_range_tombstone peek_range_tombstone() const {
if constexpr (Reversing) {
range_tombstone rt = std::prev(_range_tombstones.front().end())->tombstone();
rt.reverse();
return rt;
} else {
return _range_tombstones.front().begin()->tombstone();
}
}
bool has_more_range_tombstones() const {
return !_range_tombstones.empty();
}
public:
explicit lsa_partition_reader(const schema& s, reader_permit permit, partition_snapshot_ptr snp,
logalloc::region& region, logalloc::allocating_section& read_section,
@@ -222,16 +80,19 @@ class partition_snapshot_flat_reader : public flat_mutation_reader_v2::impl, pub
: _query_schema(s)
, _snapshot_schema(Reversing ? s.make_reversed() : s.shared_from_this())
, _permit(permit)
, _heap_cmp(s)
, _snapshot(std::move(snp))
, _region(region)
, _read_section(read_section)
, _rt_stream(s, permit)
, _cursor(s, *_snapshot, false, Reversing, digest_requested)
, _digest_requested(digest_requested)
{ }
void reset_state(const query::clustering_range& ck_range_snapshot) {
do_refresh_state(ck_range_snapshot, {}, {});
void on_new_range(position_in_partition_view lower_bound) {
in_alloc_section([&] {
_done = false;
_cursor.advance_to(lower_bound);
mplog.trace("on_new_range({}): {}", lower_bound, _cursor);
});
}
template<typename Function>
@@ -250,79 +111,35 @@ class partition_snapshot_flat_reader : public flat_mutation_reader_v2::impl, pub
});
}
// Returns next clustered row in the range.
// If the ck_range_snapshot is the same as the one used previously last_row needs
// to be engaged and equal the position of the row returned last time.
// If the ck_range_snapshot is different or this is the first call to this
// function last_row has to be disengaged. Additionally, when entering
// new range _rt_stream will be populated with all relevant
// tombstones.
mutation_fragment_opt next_row(const query::clustering_range& ck_range_snapshot,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts) {
return in_alloc_section([&] () -> mutation_fragment_opt {
maybe_refresh_state(ck_range_snapshot, last_row, last_rts);
// Returns mutations for the next interval in the range.
interval_info next_interval(const query::clustering_range& ck_range_query) {
return in_alloc_section([&]() -> interval_info {
position_in_partition::tri_compare cmp(_query_schema);
position_in_partition::equal_compare rows_eq(_query_schema);
while (has_more_rows()) {
const rows_entry& e = pop_clustering_row();
if (e.dummy()) {
continue;
}
if (_digest_requested) {
e.row().cells().prepare_hash(_query_schema, column_kind::regular_column);
}
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _query_schema, _permit, _query_schema, e);
// TODO: Ideally this should be position() or position().reversed(), depending on Reversing.
while (has_more_rows() && rows_eq(peek_row().position(), result.as_clustering_row().position())) {
const rows_entry& e = pop_clustering_row();
if (_digest_requested) {
e.row().cells().prepare_hash(_query_schema, column_kind::regular_column);
}
result.mutate_as_clustering_row(_query_schema, [&] (clustering_row& cr) mutable {
cr.apply(_query_schema, e);
});
}
return result;
// Result is ignored because memtables don't lose information. If the entry is missing,
// it must have been redundant, and we can as well look at the next entry.
_cursor.maybe_refresh();
auto rt_before_row = _cursor.range_tombstone();
mplog.trace("next_interval(): range={}, rt={}, cursor={}", ck_range_query, rt_before_row, _cursor);
if (_done || cmp(_cursor.position(), position_in_partition::for_range_end(ck_range_query)) >= 0) {
mplog.trace("next_interval(): done");
return interval_info{rt_before_row, std::monostate{}};
}
return { };
});
}
mutation_fragment_opt next_range_tombstone(const query::clustering_range& ck_range_snapshot,
const query::clustering_range& ck_range_query,
const std::optional<position_in_partition>& last_row,
const std::optional<position_in_partition>& last_rts,
position_in_partition_view pos) {
return in_alloc_section([&] () -> mutation_fragment_opt {
maybe_refresh_state(ck_range_snapshot, last_row, last_rts);
position_in_partition::less_compare rt_less(_query_schema);
// The while below moves range tombstones from partition versions
// into _rt_stream, just enough to produce the next range tombstone
// The main goal behind moving to _rt_stream is to deoverlap range tombstones
// which have the same starting position. This is not in order to satisfy
// flat_mutation_reader stream requirements, the reader can emit range tombstones
// which have the same position incrementally. This is to guarantee forward
// progress in the case iterators get invalidated and maybe_refresh_state()
// above needs to restore them. It does so using last_rts, which tracks
// the position of the last emitted range tombstone. All range tombstones
// with positions <= than last_rts are skipped on refresh. To make progress,
// we need to make sure that all range tombstones with duplicated positions
// are emitted before maybe_refresh_state().
while (has_more_range_tombstones()
&& !rt_less(pos, peek_range_tombstone().position())
&& (_rt_stream.empty() || !rt_less(_rt_stream.peek_next().position(), peek_range_tombstone().position()))) {
range_tombstone rt = pop_range_tombstone();
if (rt.trim(_query_schema,
position_in_partition_view::for_range_start(ck_range_query),
position_in_partition_view::for_range_end(ck_range_query))) {
_rt_stream.apply(std::move(rt));
}
if (_cursor.dummy()) {
mplog.trace("next_interval(): pos={}, rt={}", _cursor.position(), rt_before_row);
auto res = interval_info{rt_before_row, position_in_partition(_cursor.position())};
_done = !_cursor.next();
return res;
}
return _rt_stream.get_next(std::move(pos));
tombstone rt_for_row = _cursor.range_tombstone_for_row();
mplog.trace("next_interval(): row, pos={}, rt={}, rt_for_row={}", _cursor.position(), rt_before_row, rt_for_row);
auto result = mutation_fragment_v2(_query_schema, _permit, _cursor.row());
_done = !_cursor.next();
return interval_info{rt_before_row, row_info{std::move(result), rt_for_row}};
});
}
};
@@ -341,18 +158,13 @@ private:
// Holds reversed current clustering key range, if Reversing was needed.
std::optional<query::clustering_range> opt_reversed_range;
std::optional<position_in_partition> _last_entry;
// When not Reversing, it's .position() of last emitted range tombstone.
// When Reversing, it's .position().reversed() of last emitted range tombstone,
// so that it is usable from functions expecting position in snapshot domain.
std::optional<position_in_partition> _last_rts;
mutation_fragment_opt _next_row;
std::optional<position_in_partition> _lower_bound;
range_tombstone_change_generator _rtc_gen;
// Last emitted range_tombstone_change.
tombstone _current_tombstone;
lsa_partition_reader _reader;
bool _static_row_done = false;
bool _no_more_rows_in_current_range = false;
Accounter& accounter() {
return *this;
@@ -365,59 +177,43 @@ private:
}
}
// If `Reversing`, when we pop_range_tombstone(), a reversed rt is returned (the correct
// one in query clustering order). In order to save progress of reading from range_tombstone_list,
// we need to save the end position of rt (as it was stored in the list). This corresponds to
// the start position, with reversed bound weigth.
static position_in_partition rt_position_in_snapshot_order(const range_tombstone& rt) {
position_in_partition pos(rt.position());
if constexpr (Reversing) {
pos = pos.reversed();
}
return pos;
// We use the names ck_range_snapshot and ck_range_query to denote clustering order.
// ck_range_snapshot uses the snapshot order, while ck_range_query uses the
// query order. These two differ if the query was reversed (`Reversing==true`).
const query::clustering_range& current_ck_range_query() {
return opt_reversed_range ? *opt_reversed_range : *_current_ck_range;
}
mutation_fragment_opt read_next() {
// We use the names ck_range_snapshot and ck_range_query to denote clustering order.
// ck_range_snapshot uses the snapshot order, while ck_range_query uses the
// query order. These two differ if the query was reversed (`Reversing==true`).
const auto& ck_range_snapshot = *_current_ck_range;
const auto& ck_range_query = opt_reversed_range ? *opt_reversed_range : ck_range_snapshot;
void emit_next_interval() {
interval_info next = _reader.next_interval(current_ck_range_query());
if (!_next_row && !_no_more_rows_in_current_range) {
_next_row = _reader.next_row(ck_range_snapshot, _last_entry, _last_rts);
if (next.range_tombstone != _current_tombstone) {
_current_tombstone = next.range_tombstone;
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit,
range_tombstone_change(*_lower_bound, _current_tombstone)));
}
if (_next_row) {
auto pos_view = _next_row->as_clustering_row().position();
_last_entry = position_in_partition(pos_view);
auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, pos_view);
if (mf) {
_last_rts = rt_position_in_snapshot_order(mf->as_range_tombstone());
return mf;
std::visit(make_visitor([&] (row_info&& info) {
auto pos_view = info.row.as_clustering_row().position();
_lower_bound = position_in_partition::after_key(*_schema, pos_view);
if (info.rt_for_row != _current_tombstone) {
_current_tombstone = info.rt_for_row;
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit,
range_tombstone_change(
position_in_partition::before_key(info.row.as_clustering_row().key()), _current_tombstone)));
}
return std::exchange(_next_row, {});
} else {
_no_more_rows_in_current_range = true;
auto mf = _reader.next_range_tombstone(ck_range_snapshot, ck_range_query, _last_entry, _last_rts, position_in_partition_view::for_range_end(ck_range_query));
if (mf) {
_last_rts = rt_position_in_snapshot_order(mf->as_range_tombstone());
emplace_mutation_fragment(std::move(info.row));
}, [&] (position_in_partition&& pos) {
_lower_bound = std::move(pos);
}, [&] (std::monostate) {
if (_current_tombstone) {
_current_tombstone = {};
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit,
range_tombstone_change(position_in_partition_view::for_range_end(current_ck_range_query()), _current_tombstone)));
}
return mf;
}
}
void emplace_mutation_fragment(mutation_fragment&& mf) {
_rtc_gen.flush(mf.position(), [this] (range_tombstone_change&& rtc) {
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
});
if (mf.is_clustering_row()) {
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(mf).as_clustering_row()));
} else {
assert(mf.is_range_tombstone());
_rtc_gen.consume(std::move(mf).as_range_tombstone());
}
_current_ck_range = std::next(_current_ck_range);
on_new_range();
}), std::move(next.info));
}
void emplace_mutation_fragment(mutation_fragment_v2&& mfopt) {
@@ -427,38 +223,21 @@ private:
void on_new_range() {
if (_current_ck_range == _ck_range_end) {
opt_reversed_range = std::nullopt;
_end_of_stream = true;
_rtc_gen.flush(position_in_partition::after_all_clustered_rows(), [this] (range_tombstone_change&& rtc) {
emplace_mutation_fragment(mutation_fragment_v2(*_schema, _permit, std::move(rtc)));
});
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
} else {
_reader.reset_state(*_current_ck_range);
}
_no_more_rows_in_current_range = false;
}
void fill_opt_reversed_range() {
opt_reversed_range = std::nullopt;
if (_current_ck_range != _ck_range_end) {
if constexpr (Reversing) {
opt_reversed_range = query::reverse(*_current_ck_range);
}
_lower_bound = position_in_partition_view::for_range_start(current_ck_range_query());
_reader.on_new_range(*_lower_bound);
}
}
void do_fill_buffer() {
while (!is_end_of_stream() && !is_buffer_full()) {
auto mfopt = read_next();
if (mfopt) {
emplace_mutation_fragment(std::move(*mfopt));
} else {
_last_entry = std::nullopt;
_last_rts = std::nullopt;
_current_ck_range = std::next(_current_ck_range);
fill_opt_reversed_range();
on_new_range();
}
emit_next_interval();
if (need_preempt()) {
break;
}
@@ -476,10 +255,8 @@ public:
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
, _ck_range_end(_ck_ranges.end())
, _rtc_gen(*_schema)
, _reader(*_schema, _permit, std::move(snp), region, read_section, digest_requested)
{
fill_opt_reversed_range();
_reader.with_reserve([&] {
push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _reader.partition_tombstone()));
});

View File

@@ -100,6 +100,11 @@ public:
// Insertion of row entries after cursor's position invalidates the cursor.
// Exceptions thrown from mutators invalidate the cursor.
//
// Range tombstone information is accessible via range_tombstone() and range_tombstone_for_row()
// functions. range_tombstone() returns the tombstone for the interval which strictly precedes
// the current row, and range_tombstone_for_row() returns the information for the row itself.
// If the interval which precedes the row is not continuous, then range_tombstone() is empty.
// If range_tombstone() is not empty then the interval is continuous.
class partition_snapshot_row_cursor final {
friend class partition_snapshot_row_weakref;
struct position_in_version {
@@ -108,6 +113,10 @@ class partition_snapshot_row_cursor final {
int version_no;
bool unique_owner = false;
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
// Range tombstone in the direction of lower keys (in cursor schema domain).
// Excludes the row. In the reverse mode, the row may have a different range tombstone.
tombstone rt;
};
const schema& _schema; // query domain
@@ -126,15 +135,18 @@ class partition_snapshot_row_cursor final {
// also in _reversed mode.
std::optional<mutation_partition::rows_type::iterator> _latest_it;
// Continuity corresponding to ranges which are not represented in _heap because the cursor
// Continuity and range tombstone corresponding to ranges which are not represented in _heap because the cursor
// went pass all the entries in those versions.
bool _background_continuity = false;
tombstone _background_rt;
bool _continuous{};
bool _dummy{};
const bool _unique_owner;
const bool _reversed;
const bool _digest_requested;
tombstone _range_tombstone;
tombstone _range_tombstone_for_row;
position_in_partition _position; // table domain
partition_snapshot::change_mark _change_mark;
@@ -171,6 +183,8 @@ class partition_snapshot_row_cursor final {
bool recreate_current_row() {
_current_row.clear();
_continuous = _background_continuity;
_range_tombstone = _background_rt;
_range_tombstone_for_row = _background_rt;
_dummy = true;
if (_heap.empty()) {
if (_reversed) {
@@ -185,19 +199,28 @@ class partition_snapshot_row_cursor final {
do {
boost::range::pop_heap(_heap, heap_less);
memory::on_alloc_point();
rows_entry& e = *_heap.back().it;
position_in_version& v = _heap.back();
rows_entry& e = *v.it;
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
}
_dummy &= bool(e.dummy());
_continuous |= bool(_heap.back().continuous);
_current_row.push_back(_heap.back());
_continuous |= bool(v.continuous);
_range_tombstone_for_row.apply(e.range_tombstone());
if (v.continuous) {
_range_tombstone.apply(v.rt);
}
_current_row.push_back(v);
_heap.pop_back();
} while (!_heap.empty() && eq(_current_row[0].it->position(), _heap[0].it->position()));
if (boost::algorithm::any_of(_heap, [] (auto&& v) { return v.continuous; })) {
// FIXME: Optimize by dropping dummy() entries.
_continuous = true;
// FIXME: Optimize by dropping dummy() entries.
for (position_in_version& v : _heap) {
_continuous |= bool(v.continuous);
if (v.continuous) {
_range_tombstone.apply(v.rt);
_range_tombstone_for_row.apply(v.rt);
}
}
_position = position_in_partition(_current_row[0].it->position());
@@ -213,6 +236,7 @@ class partition_snapshot_row_cursor final {
_heap.clear();
_latest_it.reset();
_background_continuity = false;
_background_rt = {};
int version_no = 0;
bool unique_owner = _unique_owner;
bool first = true;
@@ -225,13 +249,18 @@ class partition_snapshot_row_cursor final {
}
if (pos) {
is_continuous cont;
tombstone rt;
if (_reversed) [[unlikely]] {
if (cmp(pos->position(), lower_bound) != 0) {
cont = pos->continuous();
rt = pos->range_tombstone();
if (pos != rows.begin()) {
--pos;
} else {
_background_continuity |= bool(cont);
if (cont) {
_background_rt = rt;
}
pos = {};
}
} else {
@@ -240,15 +269,18 @@ class partition_snapshot_row_cursor final {
// Positions past last dummy are complete since mutation sources
// can't contain any keys which are larger.
cont = is_continuous::yes;
rt = {};
} else {
cont = next_entry->continuous();
rt = next_entry->range_tombstone();
}
}
} else {
cont = pos->continuous();
rt = pos->range_tombstone();
}
if (pos) [[likely]] {
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont});
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont, rt});
}
} else {
if (_reversed) [[unlikely]] {
@@ -287,9 +319,13 @@ class partition_snapshot_row_cursor final {
if (_reversed && curr.it) [[unlikely]] {
if (curr.rows.begin() == curr.it) {
_background_continuity |= bool(curr.it->continuous());
if (curr.it->continuous()) {
_background_rt.apply(curr.it->range_tombstone());
}
curr.it = {};
} else {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
--curr.it;
}
}
@@ -297,15 +333,20 @@ class partition_snapshot_row_cursor final {
if (_reversed) [[unlikely]] {
if (curr.rows.begin() == curr.it) {
_background_continuity |= bool(curr.it->continuous());
if (curr.it->continuous()) {
_background_rt.apply(curr.it->range_tombstone());
}
curr.it = {};
} else {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
--curr.it;
}
} else {
++curr.it;
if (curr.it) {
curr.continuous = curr.it->continuous();
curr.rt = curr.it->range_tombstone();
}
}
}
@@ -394,27 +435,31 @@ public:
auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
is_continuous cont;
tombstone rt;
if (it) {
cont = it->continuous();
rt = it->range_tombstone();
if (_reversed) [[unlikely]] {
if (!match) {
// lower_bound() in reverse order points to predecessor of it unless the keys are equal.
if (it == rows.begin()) {
_background_continuity |= bool(it->continuous());
if (it->continuous()) {
_background_continuity = true;
_background_rt.apply(it->range_tombstone());
}
it = {};
} else {
cont = it->continuous();
--it;
}
} else {
// We can put anything in the match case since this continuity will not be used
// when advancing the cursor.
// when advancing the cursor. Same applies to rt.
cont = is_continuous::no;
rt = {};
}
} else {
cont = it->continuous();
}
} else {
_background_continuity = true; // Default continuity past the last entry.
_background_continuity = true; // Default continuity
}
if (!it) {
@@ -424,7 +469,7 @@ public:
}
} else if (match) {
_current_row.insert(_current_row.begin(), position_in_version{
it, std::move(rows), 0, _unique_owner, cont});
it, std::move(rows), 0, _unique_owner, cont, rt});
if (heap_i != _heap.end()) {
_heap.erase(heap_i);
boost::range::make_heap(_heap, heap_less);
@@ -433,10 +478,11 @@ public:
if (heap_i != _heap.end()) {
heap_i->it = it;
heap_i->continuous = cont;
heap_i->rt = rt;
boost::range::make_heap(_heap, heap_less);
} else {
_heap.push_back(position_in_version{
it, std::move(rows), 0, _unique_owner, cont});
it, std::move(rows), 0, _unique_owner, cont, rt});
boost::range::push_heap(_heap, heap_less);
}
}
@@ -478,7 +524,8 @@ public:
// Advances to the next row, if any.
// If there is no next row, advances to the extreme position in the direction of the cursor
// (position_in_partition::before_all_clustering_rows() or position_in_partition::after_all_clustering_rows)
// and does not point at a row. continuous() is still valid in this case.
// and does not point at a row.
// Information about the range, continuous() and range_tombstone(), is still valid in this case.
// Call only when valid, not necessarily pointing at a row.
bool next() { return advance(true); }
@@ -489,6 +536,16 @@ public:
// is marked as continuous.
bool continuous() const { return _continuous; }
// Can be called when cursor is valid, not necessarily pointing at a row.
// Returns the range tombstone for the key range adjacent to the cursor's position from the side of smaller keys.
// Excludes the range for the row itself. That information is returned by range_tombstone_for_row().
// It's possible that range_tombstone() is empty and range_tombstone_for_row() is not empty.
tombstone range_tombstone() const { return _range_tombstone; }
// Can be called when cursor is pointing at a row.
// Returns the range tombstone covering the row under the cursor.
tombstone range_tombstone_for_row() const { return _range_tombstone_for_row; }
// Can be called when cursor is pointing at a row.
bool dummy() const { return _dummy; }
@@ -546,6 +603,7 @@ public:
struct ensure_result {
rows_entry& row;
mutation_partition_v2::rows_type::iterator it;
bool inserted = false;
};
@@ -553,6 +611,7 @@ public:
// Doesn't change logical value or continuity of the snapshot.
// Can be called only when cursor is valid and pointing at a row.
// The cursor remains valid after the call and points at the same row as before.
// Use only with evictable snapshots.
ensure_result ensure_entry_in_latest() {
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
if (is_in_latest_version()) {
@@ -561,7 +620,7 @@ public:
if (_snp.at_latest_version()) {
_snp.tracker()->touch(latest);
}
return {latest, false};
return {latest, latest_i, false};
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
@@ -579,18 +638,26 @@ public:
if (_reversed) { // latest_i is not reliably a successor
// FIXME: set continuity when possible. Not that important since cache sets it anyway when populating.
re.set_continuous(false);
e->set_range_tombstone(range_tombstone_for_row());
rows_entry::tri_compare cmp(*_snp.schema());
auto res = rows.insert(std::move(e), cmp);
if (res.second) {
_snp.tracker()->insert(re);
}
return {*res.first, res.second};
return {*res.first, res.first, res.second};
} else {
auto latest_i = get_iterator_in_latest_version();
e->set_continuous(latest_i && latest_i->continuous());
rows.insert_before(latest_i, std::move(e));
if (latest_i && latest_i->continuous()) {
e->set_continuous(true);
// See the "information monotonicity" rule.
e->set_range_tombstone(latest_i->range_tombstone());
} else {
e->set_continuous(false);
e->set_range_tombstone(range_tombstone_for_row());
}
auto i = rows.insert_before(latest_i, std::move(e));
_snp.tracker()->insert(re);
return {re, true};
return {re, i, true};
}
}
}
@@ -601,10 +668,13 @@ public:
// Doesn't change logical value of mutation_partition or continuity of the snapshot.
// The cursor doesn't have to be valid.
// The cursor is invalid after the call.
// When returns an engaged optional, the attributes of the cursor: continuous() and range_tombstone()
// are valid, as if the cursor was advanced to the requested position.
// Assumes the snapshot is evictable and not populated by means other than ensure_entry_if_complete().
// Subsequent calls to ensure_entry_if_complete() must be given strictly monotonically increasing
// Subsequent calls to ensure_entry_if_complete() or advance_to() must be given weakly monotonically increasing
// positions unless iterators are invalidated across the calls.
// The cursor must not be a reversed-order cursor.
// Use only with evictable snapshots.
std::optional<ensure_result> ensure_entry_if_complete(position_in_partition_view pos) {
if (_reversed) { // latest_i is unreliable
throw_with_backtrace<std::logic_error>("ensure_entry_if_complete() called on reverse cursor");
@@ -614,28 +684,48 @@ public:
auto has_entry = maybe_advance_to(pos);
assert(has_entry); // evictable snapshots must have a dummy after all rows.
}
{
position_in_partition::equal_compare eq(_schema);
if (eq(position(), pos)) {
if (dummy()) {
return std::nullopt;
}
return ensure_entry_in_latest();
} else if (!continuous()) {
return std::nullopt;
}
}
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
auto latest_i = get_iterator_in_latest_version();
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, pos, is_dummy(!pos.is_clustering_row()),
is_continuous(latest_i && latest_i->continuous())));
position_in_partition::equal_compare eq(_schema);
if (eq(position(), pos)) {
// Check if entry was already inserted by previous call to ensure_entry_if_complete()
if (latest_i != rows.begin()) {
auto prev_i = std::prev(latest_i);
if (eq(prev_i->position(), pos)) {
return ensure_result{*prev_i, prev_i, false};
}
}
return ensure_entry_in_latest();
} else if (!continuous()) {
return std::nullopt;
}
// Check if entry was already inserted by previous call to ensure_entry_if_complete()
if (latest_i != rows.begin()) {
auto prev_i = std::prev(latest_i);
if (eq(prev_i->position(), pos)) {
return ensure_result{*prev_i, prev_i, false};
}
}
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(_schema, pos,
is_dummy(!pos.is_clustering_row()),
is_continuous::no));
if (latest_i && latest_i->continuous()) {
e->set_continuous(true);
e->set_range_tombstone(latest_i->range_tombstone()); // See the "information monotonicity" rule.
} else {
// Even if the range in the latest version is not continuous, the row itself is assumed to be complete,
// so it must inherit the current range tombstone.
e->set_range_tombstone(range_tombstone());
}
auto e_i = rows.insert_before(latest_i, std::move(e));
_snp.tracker()->insert(*e_i);
return ensure_result{*e_i, true};
return ensure_result{*e_i, e_i, true};
}
// Brings the entry pointed to by the cursor to the front of the LRU
// Cursor must be valid and pointing at a row.
// Use only with evictable snapshots.
void touch() {
// We cannot bring entries from non-latest versions to the front because that
// could result violate ordering invariant for the LRU, which states that older versions
@@ -658,21 +748,32 @@ public:
}
friend std::ostream& operator<<(std::ostream& out, const partition_snapshot_row_cursor& cur) {
out << "{cursor: position=" << cur._position << ", cont=" << cur.continuous() << ", ";
out << "{cursor: position=" << cur._position
<< ", cont=" << cur.continuous()
<< ", rt=" << cur.range_tombstone();
if (cur.range_tombstone() != cur.range_tombstone_for_row()) {
out << ", row_rt=" << cur.range_tombstone_for_row();
}
out << ", ";
if (cur._reversed) {
out << "reversed, ";
}
if (!cur.iterators_valid()) {
return out << " iterators invalid}";
}
out << "current=[";
out << "snp=" << &cur._snp << ", current=[";
bool first = true;
for (auto&& v : cur._current_row) {
if (!first) {
out << ", ";
}
first = false;
out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.continuous << "}";
out << "{v=" << v.version_no
<< ", pos=" << v.it->position()
<< ", cont=" << v.continuous
<< ", rt=" << v.rt
<< ", row_rt=" << v.it->range_tombstone()
<< "}";
}
out << "], heap=[\n ";
first = true;
@@ -681,7 +782,12 @@ public:
out << ",\n ";
}
first = false;
out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.continuous << "}";
out << "{v=" << v.version_no
<< ", pos=" << v.it->position()
<< ", cont=" << v.continuous
<< ", rt=" << v.rt
<< ", row_rt=" << v.it->range_tombstone()
<< "}";
}
out << "], latest_iterator=[";
if (cur._latest_it) {

View File

@@ -88,7 +88,7 @@ concept Reducer =
// |map| extracts the part from each version.
// |reduce| Combines parts from the two versions.
template <typename Result, typename Map, typename Initial, typename Reduce>
requires Mapper<Map, mutation_partition, Result> && Reducer<Reduce, Result>
requires Mapper<Map, mutation_partition_v2, Result> && Reducer<Reduce, Result>
inline Result squashed(const partition_version_ref& v, Map&& map, Initial&& initial, Reduce&& reduce) {
const partition_version* this_v = &*v;
partition_version* it = v->last();
@@ -101,7 +101,7 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Initial&& init
}
template <typename Result, typename Map, typename Reduce>
requires Mapper<Map, mutation_partition, Result> && Reducer<Reduce, Result>
requires Mapper<Map, mutation_partition_v2, Result> && Reducer<Reduce, Result>
inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduce) {
return squashed<Result>(v, map,
[] (auto&& o) -> decltype(auto) { return std::forward<decltype(o)>(o); },
@@ -112,7 +112,7 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
::static_row partition_snapshot::static_row(bool digest_requested) const {
return ::static_row(::squashed<row>(version(),
[&] (const mutation_partition& mp) -> const row& {
[&] (const mutation_partition_v2& mp) -> const row& {
if (digest_requested) {
mp.static_row().prepare_hash(*_schema, column_kind::static_column);
}
@@ -128,14 +128,16 @@ bool partition_snapshot::static_row_continuous() const {
tombstone partition_snapshot::partition_tombstone() const {
return ::squashed<tombstone>(version(),
[] (const mutation_partition& mp) { return mp.partition_tombstone(); },
[] (const mutation_partition_v2& mp) { return mp.partition_tombstone(); },
[] (tombstone& a, tombstone b) { a.apply(b); });
}
mutation_partition partition_snapshot::squashed() const {
return ::squashed<mutation_partition>(version(),
[] (const mutation_partition& mp) -> const mutation_partition& { return mp; },
[this] (const mutation_partition& mp) { return mutation_partition(*_schema, mp); },
[this] (const mutation_partition_v2& mp) -> mutation_partition {
return mp.as_mutation_partition(*_schema);
},
[] (mutation_partition&& mp) { return std::move(mp); },
[this] (mutation_partition& a, const mutation_partition& b) {
mutation_application_stats app_stats;
a.apply(*_schema, b, *_schema, app_stats);
@@ -144,7 +146,7 @@ mutation_partition partition_snapshot::squashed() const {
tombstone partition_entry::partition_tombstone() const {
return ::squashed<tombstone>(_version,
[] (const mutation_partition& mp) { return mp.partition_tombstone(); },
[] (const mutation_partition_v2& mp) { return mp.partition_tombstone(); },
[] (tombstone& a, tombstone b) { a.apply(b); });
}
@@ -163,7 +165,7 @@ partition_snapshot::~partition_snapshot() {
});
}
void merge_versions(const schema& s, mutation_partition& newer, mutation_partition&& older, cache_tracker* tracker) {
void merge_versions(const schema& s, mutation_partition_v2& newer, mutation_partition_v2&& older, cache_tracker* tracker) {
mutation_application_stats app_stats;
older.apply_monotonically(s, std::move(newer), tracker, app_stats);
newer = std::move(older);
@@ -234,16 +236,20 @@ unsigned partition_snapshot::version_count()
return count;
}
partition_entry::partition_entry(mutation_partition mp)
partition_entry::partition_entry(mutation_partition_v2 mp)
{
auto new_version = current_allocator().construct<partition_version>(std::move(mp));
_version = partition_version_ref(*new_version);
}
partition_entry::partition_entry(const schema& s, mutation_partition mp)
: partition_entry(mutation_partition_v2(s, std::move(mp)))
{ }
partition_entry::partition_entry(partition_entry::evictable_tag, const schema& s, mutation_partition&& mp)
: partition_entry([&] {
mp.ensure_last_dummy(s);
return std::move(mp);
return mutation_partition_v2(s, std::move(mp));
}())
{ }
@@ -320,8 +326,8 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
// to stay around (with tombstones and static rows) after fully evicted.
// Such versions must be fully discontinuous, and thus have a dummy at the end.
auto new_version = tracker
? current_allocator().construct<partition_version>(mutation_partition::make_incomplete(s))
: current_allocator().construct<partition_version>(mutation_partition(s.shared_from_this()));
? current_allocator().construct<partition_version>(mutation_partition_v2::make_incomplete(s))
: current_allocator().construct<partition_version>(mutation_partition_v2(s.shared_from_this()));
new_version->partition().set_static_row_continuous(_version->partition().static_row_continuous());
new_version->insert_before(*_version);
set_version(new_version);
@@ -331,12 +337,23 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
return *new_version;
}
void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, const schema& s, const mutation_partition& mp, const schema& mp_schema,
void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, const schema& s, const mutation_partition_v2& mp, const schema& mp_schema,
mutation_application_stats& app_stats) {
apply(r, cleaner, s, mutation_partition(mp_schema, mp), mp_schema, app_stats);
apply(r, cleaner, s, mutation_partition_v2(mp_schema, mp), mp_schema, app_stats);
}
void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, const schema& s, mutation_partition&& mp, const schema& mp_schema,
void partition_entry::apply(logalloc::region& r,
mutation_cleaner& c,
const schema& s,
const mutation_partition& mp,
const schema& mp_schema,
mutation_application_stats& app_stats) {
auto mp_v1 = mutation_partition(mp_schema, mp);
mp_v1.make_fully_continuous();
apply(r, c, s, mutation_partition_v2(mp_schema, std::move(mp_v1)), mp_schema, app_stats);
}
void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, const schema& s, mutation_partition_v2&& mp, const schema& mp_schema,
mutation_application_stats& app_stats) {
// A note about app_stats: it may happen that mp has rows that overwrite other rows
// in older partition_version. Those overwrites will be counted when their versions get merged.
@@ -348,6 +365,7 @@ void partition_entry::apply(logalloc::region& r, mutation_cleaner& cleaner, cons
if (!_snapshot) {
try {
apply_resume res;
auto notify = cleaner.make_region_space_guard();
if (_version->partition().apply_monotonically(s,
std::move(new_version->partition()),
no_cache_tracker,
@@ -411,12 +429,13 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
// of allocating sections, so we return here to get out of the current allocating section and
// give the caller a chance to store the coroutine object. The code inside coroutine below
// runs outside allocating section.
return utils::coroutine([&tracker, &s, &alloc, &reg, &acc, can_move, preemptible,
return utils::coroutine([self = this, &tracker, &s, &alloc, &reg, &acc, can_move, preemptible,
cur = partition_snapshot_row_cursor(s, *dst_snp),
src_cur = partition_snapshot_row_cursor(s, *src_snp, can_move),
dst_snp = std::move(dst_snp),
prev_snp = std::move(prev_snp),
src_snp = std::move(src_snp),
lb = position_in_partition::before_all_clustered_rows(),
static_done = false] () mutable {
auto&& allocator = reg.allocator();
return alloc(reg, [&] {
@@ -439,14 +458,6 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
static_row.apply(s, column_kind::static_column, current->partition().static_row());
}
}
dirty_size += current->partition().row_tombstones().external_memory_usage(s);
range_tombstone_list& tombstones = dst.partition().mutable_row_tombstones();
// FIXME: defer while applying range tombstones
if (can_move) {
tombstones.apply_monotonically(s, std::move(current->partition().mutable_row_tombstones()));
} else {
tombstones.apply_monotonically(s, const_cast<const mutation_partition&>(current->partition()).row_tombstones());
}
current = current->next();
can_move &= current && !current->is_referenced();
}
@@ -460,21 +471,66 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
do {
auto size = src_cur.memory_usage();
if (!src_cur.dummy()) {
tracker.on_row_processed_from_memtable();
// Range tombstones in memtables are bounded by dummy entries on both sides.
assert(src_cur.range_tombstone_for_row() == src_cur.range_tombstone());
if (src_cur.range_tombstone()) {
// Apply the tombstone to (lb, src_cur.position())
// FIXME: Avoid if before all rows
auto ropt = cur.ensure_entry_if_complete(lb);
cur.advance_to(lb); // ensure_entry_if_complete() leaves the cursor invalid. Bring back to valid.
// If !ropt, it means there is no entry at lb, so cur is guaranteed to be at a position
// greater than lb. No need to advance it.
if (ropt) {
cur.next();
}
position_in_partition::less_compare less(s);
assert(less(lb, cur.position()));
while (less(cur.position(), src_cur.position())) {
auto res = cur.ensure_entry_in_latest();
if (cur.continuous()) {
assert(cur.dummy() || cur.range_tombstone_for_row() == cur.range_tombstone());
res.row.set_continuous(is_continuous::yes);
}
res.row.set_range_tombstone(cur.range_tombstone_for_row() + src_cur.range_tombstone());
// FIXME: Compact the row
++tracker.get_stats().rows_covered_by_range_tombstones_from_memtable;
cur.next();
// FIXME: preempt
}
}
{
if (src_cur.dummy()) {
++tracker.get_stats().dummy_processed_from_memtable;
} else {
tracker.on_row_processed_from_memtable();
}
auto ropt = cur.ensure_entry_if_complete(src_cur.position());
if (ropt) {
if (!ropt->inserted) {
tracker.on_row_merged_from_memtable();
}
rows_entry& e = ropt->row;
src_cur.consume_row([&](deletable_row&& row) {
e.row().apply_monotonically(s, std::move(row));
});
if (!src_cur.dummy()) {
src_cur.consume_row([&](deletable_row&& row) {
e.row().apply_monotonically(s, std::move(row));
});
}
// We can set cont=1 only if there is a range tombstone because
// only then the lower bound of the range is ensured in the latest version earlier.
if (src_cur.range_tombstone()) {
if (cur.continuous()) {
assert(cur.dummy() || cur.range_tombstone_for_row() == cur.range_tombstone());
e.set_continuous(is_continuous::yes);
}
e.set_range_tombstone(cur.range_tombstone_for_row() + src_cur.range_tombstone());
}
} else {
tracker.on_row_dropped_from_memtable();
}
}
// FIXME: Avoid storing lb if no range tombstones
lb = position_in_partition(src_cur.position());
auto has_next = src_cur.erase_and_advance();
acc.unpin_memory(size);
if (!has_next) {
@@ -487,12 +543,12 @@ utils::coroutine partition_entry::apply_to_incomplete(const schema& s,
});
}
mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
mutation_partition_v2 partition_entry::squashed(schema_ptr from, schema_ptr to)
{
mutation_partition mp(to);
mutation_partition_v2 mp(to);
mp.set_static_row_continuous(_version->partition().static_row_continuous());
for (auto&& v : _version->all_elements()) {
auto older = mutation_partition(*from, v.partition());
auto older = mutation_partition_v2(*from, v.partition());
if (from->version() != to->version()) {
older.upgrade(*from, *to);
}
@@ -503,7 +559,8 @@ mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
mutation_partition partition_entry::squashed(const schema& s)
{
return squashed(s.shared_from_this(), s.shared_from_this());
return squashed(s.shared_from_this(), s.shared_from_this())
.as_mutation_partition(s);
}
void partition_entry::upgrade(schema_ptr from, schema_ptr to, mutation_cleaner& cleaner, cache_tracker* tracker)
@@ -543,99 +600,6 @@ partition_snapshot_ptr partition_entry::read(logalloc::region& r,
return partition_snapshot_ptr(std::move(snp));
}
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end) {
range_tombstone_result rts;
range_tombstones(start, end, [&] (range_tombstone rt) {
rts.emplace_back(std::move(rt));
return stop_iteration::no;
});
return rts;
}
stop_iteration
partition_snapshot::range_tombstones(position_in_partition_view start, position_in_partition_view end,
std::function<stop_iteration(range_tombstone)> callback,
bool reverse)
{
partition_version* v = &*version();
if (reverse) [[unlikely]] {
std::swap(start, end);
start = start.reversed();
end = end.reversed();
}
auto pop_stream = [&] (range_tombstone_list::iterator_range& range) -> range_tombstone {
auto rt = reverse ? std::prev(range.end())->tombstone()
: range.begin()->tombstone();
if (reverse) [[unlikely]] {
rt.reverse();
range.advance_end(-1);
} else {
range.advance_begin(1);
}
return rt;
};
if (!v->next()) { // Optimization for single-version snapshots
auto range = v->partition().row_tombstones().slice(*_schema, start, end);
while (!range.empty()) {
if (callback(pop_stream(range)) == stop_iteration::yes) {
return stop_iteration::no;
}
}
return stop_iteration::yes;
}
std::vector<range_tombstone_list::iterator_range> streams; // contains only non-empty ranges
position_in_partition::less_compare less(*_schema);
// Sorts ranges by first range_tombstone's starting position
// in descending order (because the heap is a max-heap).
// In reverse mode, sorts by range_tombstone's end position
// in ascending order.
auto stream_less = [&] (range_tombstone_list::iterator_range left, range_tombstone_list::iterator_range right) {
if (reverse) [[unlikely]] {
return less(std::prev(left.end())->end_position(), std::prev(right.end())->end_position());
}
return less(right.begin()->position(), left.begin()->position());
};
while (v) {
auto&& range = v->partition().row_tombstones().slice(*_schema, start, end);
if (!range.empty()) {
streams.emplace_back(std::move(range));
}
v = v->next();
}
std::make_heap(streams.begin(), streams.end(), stream_less);
while (!streams.empty()) {
std::pop_heap(streams.begin(), streams.end(), stream_less);
range_tombstone_list::iterator_range& stream = streams.back();
if (callback(pop_stream(stream)) == stop_iteration::yes) {
return stop_iteration::no;
}
if (!stream.empty()) {
std::push_heap(streams.begin(), streams.end(), stream_less);
} else {
streams.pop_back();
}
}
return stop_iteration::yes;
}
partition_snapshot::range_tombstone_result
partition_snapshot::range_tombstones()
{
return range_tombstones(
position_in_partition_view::before_all_clustered_rows(),
position_in_partition_view::after_all_clustered_rows());
}
void partition_snapshot::touch() noexcept {
// Eviction assumes that older versions are evicted before newer so only the latest snapshot
// can be touched.
@@ -671,7 +635,7 @@ std::ostream& operator<<(std::ostream& out, const partition_entry::printer& p) {
}
out << ") ";
}
out << fmt::ptr(v) << ": " << mutation_partition::printer(p._schema, v->partition());
out << fmt::ptr(v) << ": " << mutation_partition_v2::printer(p._schema, v->partition());
v = v->next();
first = false;
}

View File

@@ -105,45 +105,17 @@ class static_row;
// snapshot on the list is marked as unique owner so that on its destruction
// it continues removal of the partition versions.
//
// Continuity merging rules.
//
// Non-evictable snapshots contain fully continuous partitions in all versions at all times.
// For evictable snapshots, that's not the case.
//
// Each version has its own continuity, fully specified in that version,
// independent of continuity of other versions. Continuity of the snapshot is a
// union of continuities of each version. This rule follows from the fact that we
// want eviction from older versions to not have to touch newer versions.
//
// It is assumed that continuous intervals in different versions are non-
// overlapping, with exceptions for points corresponding to complete rows.
// A row may overlap with another row, in which case it completely overrides
// it. A later version may have a row which falls into a continuous interval
// in the older version. A newer version cannot have a continuous interval
// which is not a row and covers a row in the older version. We make use of
// this assumption to make calculation of the union of intervals on merging
// easier.
//
// versions of evictable entries always have a dummy entry at position_in_partition::after_all_clustered_rows().
// This is needed so that they can be always made fully discontinuous by eviction, and because
// we need a way to link partitions with no rows into the LRU.
//
// Snapshots of evictable entries always have a row entry at
// position_in_partition::after_all_clustered_rows().
//
class partition_version_ref;
class partition_version : public anchorless_list_base_hook<partition_version> {
partition_version_ref* _backref = nullptr;
mutation_partition _partition;
mutation_partition_v2 _partition;
friend class partition_version_ref;
friend class partition_entry;
friend class partition_snapshot;
public:
static partition_version& container_of(mutation_partition& mp) {
static partition_version& container_of(mutation_partition_v2& mp) {
return *boost::intrusive::get_parent_from_member(&mp, &partition_version::_partition);
}
@@ -151,7 +123,7 @@ public:
explicit partition_version(schema_ptr s) noexcept
: _partition(std::move(s)) { }
explicit partition_version(mutation_partition mp) noexcept
explicit partition_version(mutation_partition_v2 mp) noexcept
: _partition(std::move(mp)) { }
partition_version(partition_version&& pv) noexcept;
partition_version& operator=(partition_version&& pv) noexcept;
@@ -160,8 +132,8 @@ public:
// Returns stop_iteration::yes iff there are no more elements to free.
stop_iteration clear_gently(cache_tracker* tracker) noexcept;
mutation_partition& partition() { return _partition; }
const mutation_partition& partition() const { return _partition; }
mutation_partition_v2& partition() { return _partition; }
const mutation_partition_v2& partition() const { return _partition; }
bool is_referenced() const { return _backref; }
// Returns true iff this version is directly referenced from a partition_entry (is its newset version).
@@ -403,19 +375,6 @@ public:
using range_tombstone_result = utils::chunked_vector<range_tombstone>;
// Returns range tombstones overlapping with [start, end)
range_tombstone_result range_tombstones(position_in_partition_view start, position_in_partition_view end);
// Invokes the callback for every range tombstones overlapping with [start, end) until
// the callback returns stop_iteration::yes or all tombstones are exhausted.
// Returns stop_iteration::yes if all range tombstones in the range were consumed.
// When reversed is true, start and end are assumed to belong to the domain of reverse clustering order schema
// and the method produces range_tombstones in reverse order, conforming to reverse schema.
stop_iteration range_tombstones(position_in_partition_view start, position_in_partition_view end,
std::function<stop_iteration(range_tombstone)> callback,
bool reversed = false);
// Returns all range tombstones
range_tombstone_result range_tombstones();
phase_type phase() const { return _phase; }
};
@@ -485,7 +444,8 @@ public:
// Constructs a non-evictable entry holding empty partition
partition_entry() = default;
// Constructs a non-evictable entry
explicit partition_entry(mutation_partition mp);
explicit partition_entry(mutation_partition_v2);
partition_entry(const schema&, mutation_partition);
// Returns a reference to partition_entry containing given pv,
// assuming pv.is_referenced_from_entry().
static partition_entry& container_of(partition_version& pv) {
@@ -550,14 +510,21 @@ public:
void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
const mutation_partition& mp,
const mutation_partition_v2& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);
void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
mutation_partition&& mp,
mutation_partition_v2&& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);
void apply(logalloc::region&,
mutation_cleaner&,
const schema& s,
const mutation_partition& mp,
const schema& mp_schema,
mutation_application_stats& app_stats);
@@ -619,7 +586,7 @@ public:
return *_version;
}
mutation_partition squashed(schema_ptr from, schema_ptr to);
mutation_partition_v2 squashed(schema_ptr from, schema_ptr to);
mutation_partition squashed(const schema&);
tombstone partition_tombstone() const;

View File

@@ -54,7 +54,7 @@ public:
memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p)
: _schema(std::move(s))
, _key(std::move(key))
, _pe(std::move(p))
, _pe(*_schema, std::move(p))
{ }
memtable_entry(memtable_entry&& o) noexcept;

View File

@@ -1221,7 +1221,7 @@ void cache_entry::on_evicted(cache_tracker& tracker) noexcept {
}
void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
mutation_partition::rows_type::iterator it(this);
mutation_partition_v2::rows_type::iterator it(this);
if (is_last_dummy()) {
// Every evictable partition entry must have a dummy entry at the end,
@@ -1236,18 +1236,19 @@ void rows_entry::on_evicted(cache_tracker& tracker) noexcept {
// When evicting a dummy with both sides continuous we don't need to break continuity.
//
auto still_continuous = continuous() && dummy();
mutation_partition::rows_type::key_grabber kg(it);
auto old_rt = range_tombstone();
mutation_partition_v2::rows_type::key_grabber kg(it);
kg.release(current_deleter<rows_entry>());
if (!still_continuous) {
if (!still_continuous || old_rt != it->range_tombstone()) {
it->set_continuous(false);
}
tracker.on_row_eviction();
}
mutation_partition::rows_type* rows = it.tree_if_singular();
mutation_partition_v2::rows_type* rows = it.tree_if_singular();
if (rows != nullptr) {
assert(it->is_last_dummy());
partition_version& pv = partition_version::container_of(mutation_partition::container_of(*rows));
partition_version& pv = partition_version::container_of(mutation_partition_v2::container_of(*rows));
if (pv.is_referenced_from_entry()) {
partition_entry& pe = partition_entry::container_of(pv);
if (!pe.is_locked()) {

View File

@@ -172,7 +172,7 @@ struct expected_tombstone {
};
static void assert_cached_tombstones(partition_snapshot_ptr snp, std::deque<range_tombstone> expected, const query::clustering_row_ranges& ck_ranges) {
range_tombstone_list rts = snp->version()->partition().mutable_row_tombstones();
range_tombstone_list rts = snp->version()->partition().as_mutation_partition(*SCHEMA).mutable_row_tombstones();
rts.trim(*SCHEMA, ck_ranges);
range_tombstone_list expected_list(*SCHEMA);
@@ -498,7 +498,8 @@ SEASTAR_TEST_CASE(test_single_row_cached_as_noncontinuous_range_from_row_to_end)
SEASTAR_TEST_CASE(test_single_row_cached_as_noncontinuous_exclusive_range_on_the_left) {
return seastar::async([] {
test_single_row(1, true, is_continuous::no, make_slice({ query::clustering_range::make_ending_with({make_ck(1), false}) }), { }, {
expected_row(1, is_continuous::yes),
before_cont(1),
expected_row(1, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
@@ -966,7 +967,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_on_the_lef
return seastar::async([] {
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make_ending_with({make_ck(3), false}) }), { 1 }, {
expected_row(1, is_continuous::yes),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
@@ -988,7 +990,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_between_ro
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make({make_ck(1), false}, {make_ck(3), false}) }), { }, {
expected_row(1, is_continuous::no),
after_notc(1),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
@@ -1009,7 +1012,8 @@ SEASTAR_TEST_CASE(test_two_rows_cached_non_continuous_exclusive_range_between_ro
return seastar::async([] {
test_two_rows(1, true, is_continuous::no, 3, true, is_continuous::no, make_slice({ query::clustering_range::make(make_ck(1), {make_ck(3), false}) }), { 1 }, {
expected_row(1, is_continuous::no),
expected_row(3, is_continuous::yes),
before_cont(3),
expected_row(3, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
});
});
@@ -1298,7 +1302,8 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range1) {
expected_fragment(1),
expected_fragment(position_in_partition_view::after_all_prefixed(make_ck(1)), {}),
}, {
expected_row(1, is_continuous::no),
before_notc(1),
expected_row(1, is_continuous::yes),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
}, { rt });
});
@@ -1347,6 +1352,7 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range3) {
expected_fragment(4)
}, {
before_notc(0),
after_cont(2), // upper bound of rt
expected_row(4, is_continuous::yes),
after_cont(5),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)

View File

@@ -35,218 +35,33 @@ using namespace std::chrono_literals;
static thread_local mutation_application_stats app_stats_for_tests;
// Verifies that tombstones in "list" are monotonic, overlap with the requested range,
// and have information equivalent with "expected" in that range.
static
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
const query::clustering_range& range,
const range_tombstone_list& expected)
{
range_tombstone_list actual(s);
position_in_partition::less_compare less(s);
position_in_partition prev_pos = position_in_partition::before_all_clustered_rows();
for (auto&& rt : list) {
if (!less(rt.position(), position_in_partition::for_range_end(range))) {
BOOST_FAIL(format("Range tombstone out of range: {}, range: {}", rt, range));
}
if (!less(position_in_partition::for_range_start(range), rt.end_position())) {
BOOST_FAIL(format("Range tombstone out of range: {}, range: {}", rt, range));
}
if (less(rt.position(), prev_pos)) {
BOOST_FAIL(format("Range tombstone breaks position monotonicity: {}, list: {}", rt, list));
}
prev_pos = position_in_partition(rt.position());
actual.apply(s, rt);
}
actual.trim(s, query::clustering_row_ranges{range});
range_tombstone_list expected_list(expected);
expected_list.trim(s, query::clustering_row_ranges{range});
assert_that(s, actual).is_equal_to(expected_list);
}
static
void check_tombstone_slice(const schema& s, const utils::chunked_vector<range_tombstone>& list,
const query::clustering_range& range,
std::initializer_list<range_tombstone> expected)
{
range_tombstone_list expected_list(s);
for (auto&& rt : expected) {
expected_list.apply(s, rt);
}
check_tombstone_slice(s, list, range, expected_list);
}
// Reads the rest of the partition into a mutation_partition object.
// There must be at least one entry ahead of the cursor.
// The cursor must be pointing at a row and valid.
// The cursor will not be pointing at a row after this.
static mutation_partition read_partition_from(const schema& schema, partition_snapshot_row_cursor& cur) {
mutation_partition p(schema.shared_from_this());
position_in_partition prev = position_in_partition::before_all_clustered_rows();
do {
testlog.trace("cur: {}", cur);
p.clustered_row(schema, cur.position(), is_dummy(cur.dummy()), is_continuous(cur.continuous()))
.apply(schema, cur.row().as_deletable_row());
auto after_pos = position_in_partition::after_key(schema, cur.position());
auto before_pos = position_in_partition::before_key(cur.position());
if (cur.range_tombstone()) {
p.apply_row_tombstone(schema, range_tombstone(prev, before_pos, cur.range_tombstone()));
}
if (cur.range_tombstone_for_row()) {
p.apply_row_tombstone(schema, range_tombstone(before_pos, after_pos, cur.range_tombstone_for_row()));
}
prev = std::move(after_pos);
} while (cur.next());
return p;
}
SEASTAR_TEST_CASE(test_range_tombstone_slicing) {
return seastar::async([] {
logalloc::region r;
mutation_cleaner cleaner(r, no_cache_tracker, app_stats_for_tests);
simple_schema table;
auto s = table.schema();
with_allocator(r.allocator(), [&] {
mutation_application_stats app_stats;
logalloc::reclaim_lock l(r);
auto rt1 = table.make_range_tombstone(table.make_ckey_range(1, 2));
auto rt2 = table.make_range_tombstone(table.make_ckey_range(4, 7));
auto rt3 = table.make_range_tombstone(table.make_ckey_range(6, 9));
mutation_partition m1(s);
m1.apply_delete(*s, rt1);
m1.apply_delete(*s, rt2);
m1.apply_delete(*s, rt3);
partition_entry e(mutation_partition(*s, m1));
auto snap = e.read(r, cleaner, s, no_cache_tracker);
auto check_range = [&s] (partition_snapshot& snap, const query::clustering_range& range,
std::initializer_list<range_tombstone> expected) {
auto tombstones = snap.range_tombstones(
position_in_partition::for_range_start(range),
position_in_partition::for_range_end(range));
check_tombstone_slice(*s, tombstones, range, expected);
};
check_range(*snap, table.make_ckey_range(0, 0), {});
check_range(*snap, table.make_ckey_range(1, 1), {rt1});
check_range(*snap, table.make_ckey_range(3, 4), {rt2});
check_range(*snap, table.make_ckey_range(3, 5), {rt2});
check_range(*snap, table.make_ckey_range(3, 6), {rt2, rt3});
check_range(*snap, table.make_ckey_range(6, 6), {rt2, rt3});
check_range(*snap, table.make_ckey_range(7, 10), {rt2, rt3});
check_range(*snap, table.make_ckey_range(8, 10), {rt3});
check_range(*snap, table.make_ckey_range(10, 10), {});
check_range(*snap, table.make_ckey_range(0, 10), {rt1, rt2, rt3});
auto rt4 = table.make_range_tombstone(table.make_ckey_range(1, 2));
auto rt5 = table.make_range_tombstone(table.make_ckey_range(5, 8));
mutation_partition m2(s);
m2.apply_delete(*s, rt4);
m2.apply_delete(*s, rt5);
auto&& v2 = e.add_version(*s, no_cache_tracker);
v2.partition().apply_weak(*s, m2, *s, app_stats);
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
check_range(*snap2, table.make_ckey_range(0, 0), {});
check_range(*snap2, table.make_ckey_range(1, 1), {rt4});
check_range(*snap2, table.make_ckey_range(3, 4), {rt2});
check_range(*snap2, table.make_ckey_range(3, 5), {rt2, rt5});
check_range(*snap2, table.make_ckey_range(3, 6), {rt2, rt3, rt5});
check_range(*snap2, table.make_ckey_range(4, 4), {rt2});
check_range(*snap2, table.make_ckey_range(5, 5), {rt2, rt5});
check_range(*snap2, table.make_ckey_range(6, 6), {rt2, rt3, rt5});
check_range(*snap2, table.make_ckey_range(7, 10), {rt2, rt3, rt5});
check_range(*snap2, table.make_ckey_range(8, 8), {rt3, rt5});
check_range(*snap2, table.make_ckey_range(9, 9), {rt3});
check_range(*snap2, table.make_ckey_range(8, 10), {rt3, rt5});
check_range(*snap2, table.make_ckey_range(10, 10), {});
check_range(*snap2, table.make_ckey_range(0, 10), {rt4, rt2, rt3, rt5});
});
});
}
static query::clustering_range reversed(const query::clustering_range& range) {
if (!range.is_singular()) {
return query::clustering_range(range.end(), range.start());
if (cur.range_tombstone()) {
p.apply_row_tombstone(schema, range_tombstone(prev, position_in_partition::after_all_clustered_rows(), cur.range_tombstone()));
}
return range;
}
SEASTAR_THREAD_TEST_CASE(test_range_tombstone_reverse_slicing) {
logalloc::region r;
mutation_cleaner cleaner(r, no_cache_tracker, app_stats_for_tests);
with_allocator(r.allocator(), [&] {
mutation_application_stats app_stats;
logalloc::reclaim_lock l(r);
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
auto rev_s = s->make_reversed();
mutation_partition m1(s);
mutation_partition m2(s); // m2 and m3 will have effectively m1 split among each other randomly
mutation_partition m3(s);
range_tombstone_list rts(*s);
range_tombstone_list rev_rts(*rev_s);
for (int i = 0; i < 12; ++i) {
auto rt = gen.make_random_range_tombstone();
rts.apply(*s, rt);
{
auto rev_rt = rt;
rev_rt.reverse();
rev_rts.apply(*rev_s, rev_rt);
}
m1.apply_delete(*s, rt);
if (i % 2 == 0) {
m2.apply_delete(*s, rt);
} else {
m3.apply_delete(*s, rt);
}
}
auto check_range = [&] (partition_snapshot& snap, query::clustering_range range, bool reverse = false) {
utils::chunked_vector<range_tombstone> result;
if (reverse) {
range = reversed(range);
}
auto start = position_in_partition::for_range_start(range);
auto end = position_in_partition::for_range_end(range);
snap.range_tombstones(start, end, [&] (range_tombstone rt) {
result.emplace_back(std::move(rt));
return stop_iteration::no;
}, reverse);
if (reverse) {
check_tombstone_slice(*rev_s, result, range, rev_rts);
} else {
check_tombstone_slice(*s, result, range, rts);
}
};
// Single version
{
partition_entry e(mutation_partition(*s, m1));
auto snap = e.read(r, cleaner, s, no_cache_tracker);
check_range(*snap, query::clustering_range::make_open_ended_both_sides());
check_range(*snap, query::clustering_range::make_open_ended_both_sides(), true);
}
// Two versions
{
partition_entry e(mutation_partition(*s, m2));
auto snap = e.read(r, cleaner, s, no_cache_tracker);
auto&& v2 = e.add_version(*s, no_cache_tracker);
v2.partition().apply_weak(*s, m3, *s, app_stats);
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
check_range(*snap2, query::clustering_range::make_open_ended_both_sides());
check_range(*snap2, query::clustering_range::make_open_ended_both_sides(), true);
}
});
return p;
}
class mvcc_partition;
@@ -398,7 +213,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) {
void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) {
with_allocator(region().allocator(), [&] {
if (_evictable) {
apply_to_evictable(partition_entry(mutation_partition(*mp_s, mp)), mp_s);
apply_to_evictable(partition_entry(mutation_partition_v2(*mp_s, mp)), mp_s);
} else {
logalloc::allocating_section as;
as(region(), [&] {
@@ -426,7 +241,7 @@ mvcc_partition mvcc_container::make_not_evictable(const mutation_partition& mp)
return with_allocator(region().allocator(), [&] {
logalloc::allocating_section as;
return as(region(), [&] {
return mvcc_partition(_schema, partition_entry(mutation_partition(*_schema, mp)), *this, false);
return mvcc_partition(_schema, partition_entry(mutation_partition_v2(*_schema, mp)), *this, false);
});
});
}
@@ -630,9 +445,20 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
expected.apply_weak(*s, std::move(expected_to_apply_slice), app_stats);
e += to_apply;
assert_that(s, e.squashed())
.is_equal_to_compacted(expected, e_continuity.to_clustering_row_ranges())
.has_same_continuity(before);
auto sq = e.squashed();
// After applying to_apply the continuity can be more narrow due to compaction with tombstones
// present in to_apply.
auto continuity_after = sq.get_continuity(*s);
if (!continuity_after.contained_in(e_continuity)) {
BOOST_FAIL(format("Expected later continuity to be contained in earlier, later={}\n, earlier={}",
continuity_after, e_continuity));
}
assert_that(s, std::move(sq))
.is_equal_to_compacted(expected, e_continuity.to_clustering_row_ranges());
};
test(false);
@@ -642,15 +468,16 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
}
// Call with region locked.
static mutation_partition read_using_cursor(partition_snapshot& snap) {
static mutation_partition read_using_cursor(partition_snapshot& snap, bool reversed = false) {
tests::reader_concurrency_semaphore_wrapper semaphore;
partition_snapshot_row_cursor cur(*snap.schema(), snap);
cur.advance_to(position_in_partition::before_all_clustered_rows());
auto mp = read_partition_from(*snap.schema(), cur);
for (auto&& rt : snap.range_tombstones()) {
mp.apply_delete(*snap.schema(), rt);
auto s = snap.schema();
if (reversed) {
s = s->make_reversed();
}
mp.apply(*snap.schema(), mutation_fragment(*snap.schema(), semaphore.make_permit(), static_row(snap.static_row(false))));
partition_snapshot_row_cursor cur(*s, snap, false, reversed);
cur.advance_to(position_in_partition::before_all_clustered_rows());
auto mp = read_partition_from(*s, cur);
mp.apply(*s, mutation_fragment(*s, semaphore.make_permit(), static_row(snap.static_row(false))));
mp.set_static_row_continuous(snap.static_row_continuous());
mp.apply(snap.partition_tombstone());
return mp;
@@ -709,7 +536,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab
{
mutation_application_stats app_stats;
logalloc::reclaim_lock rl(r);
auto e = partition_entry(mutation_partition(*s, m3.partition()));
auto e = partition_entry(mutation_partition_v2(*s, m3.partition()));
auto snap1 = e.read(r, cleaner, s, no_cache_tracker);
e.apply(r, cleaner, *s, m2.partition(), *s, app_stats);
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
@@ -1371,7 +1198,7 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
size_t fail_offset = 0;
while (true) {
mutation_partition m2 = mutation_partition(*second.schema(), second.partition());
auto e = partition_entry(mutation_partition(*target.schema(), target.partition()));
auto e = partition_entry(mutation_partition_v2(*target.schema(), target.partition()));
//auto snap1 = e.read(r, gen.schema());
alloc.fail_after(fail_offset++);
@@ -1419,7 +1246,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
m3.partition().make_fully_continuous();
{
auto e = partition_entry(mutation_partition(*s, m1.partition()));
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
{
@@ -1440,7 +1267,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
}
{
auto e = partition_entry(mutation_partition(*s, m1.partition()));
auto e = partition_entry(mutation_partition_v2(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
{

View File

@@ -15,6 +15,7 @@
#include <seastar/util/closeable.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/mutation_assertions.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
#include "test/lib/mutation_source_test.hh"
@@ -2603,7 +2604,7 @@ SEASTAR_TEST_CASE(test_exception_safety_of_reads) {
BOOST_REQUIRE(got_opt);
BOOST_REQUIRE(!read_mutation_from_flat_mutation_reader(rd).get0());
assert_that(*got_opt).is_equal_to(mut, ranges);
assert_that(*got_opt).is_equal_to_compacted(mut, ranges);
assert_that(cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, slice))
.produces(mut, ranges);
});
@@ -4269,7 +4270,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones)
cache_entry& entry = t.get_row_cache().lookup(dk);
auto& cp = entry.partition().version()->partition();
BOOST_REQUIRE_EQUAL(cp.tombstone_for_row(*s, ck1), row_tombstone(tombstone(1, dt_noexp))); // non-expired tombstone is in cache
BOOST_REQUIRE_EQUAL(cp.clustered_row(*s, ck1).deleted_at(), row_tombstone(tombstone(1, dt_noexp))); // non-expired tombstone is in cache
BOOST_REQUIRE(cp.find_row(*s, ck2) == nullptr); // expired tombstone isn't in cache
const auto rows = cp.non_dummy_rows();

View File

@@ -14,12 +14,13 @@
#include "mutation_fragment_stream_validator.hh"
#include "log.hh"
#include "clustering_interval_set.hh"
#include "mutation_partition_v2.hh"
extern logging::logger testlog;
class mutation_partition_assertion {
schema_ptr _schema;
const mutation_partition& _m;
mutation_partition _m;
private:
static mutation_partition compacted(const schema& s, const mutation_partition& m) {
mutation_partition res(s, m);
@@ -28,9 +29,14 @@ private:
return res;
}
public:
mutation_partition_assertion(schema_ptr s, mutation_partition&& m)
: _schema(s)
, _m(std::move(m))
{ }
mutation_partition_assertion(schema_ptr s, const mutation_partition& m)
: _schema(s)
, _m(m)
, _m(*s, m)
{ }
// If ck_ranges is passed, verifies only that information relevant for ck_ranges matches.
@@ -106,6 +112,11 @@ mutation_partition_assertion assert_that(schema_ptr s, const mutation_partition&
return {std::move(s), mp};
}
static inline
mutation_partition_assertion assert_that(schema_ptr s, const mutation_partition_v2& mp) {
return {s, mp.as_mutation_partition(*s)};
}
class mutation_assertion {
mutation _m;
public: