From 18a80a98b870c1104295fc48a31e2e4b5089ef3a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 2 Apr 2022 17:06:54 +0300 Subject: [PATCH 1/5] range_tombstone_change_generator: flush: return early if empty Optimize the common, empty case. Signed-off-by: Benny Halevy --- range_tombstone_change_generator.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/range_tombstone_change_generator.hh b/range_tombstone_change_generator.hh index 93ab677eb8..a6444ee908 100644 --- a/range_tombstone_change_generator.hh +++ b/range_tombstone_change_generator.hh @@ -71,6 +71,10 @@ public: // FIXME: respect preemption template void flush(position_in_partition_view upper_bound, C consumer) { + if (_range_tombstones.empty()) { + return; + } + position_in_partition::less_compare less(_schema); std::optional prev; From 2c5a6b389431e10fe630d9b1c6a282c264f0edeb Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 3 Apr 2022 00:24:34 +0300 Subject: [PATCH 2/5] range_tombstone_change_generator: flush: use tri_compare rather than less less is already using tri_compare internally, and we'll use tri_compare for equality in the next patch. Signed-off-by: Benny Halevy --- range_tombstone_change_generator.hh | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/range_tombstone_change_generator.hh b/range_tombstone_change_generator.hh index a6444ee908..901b7c9baf 100644 --- a/range_tombstone_change_generator.hh +++ b/range_tombstone_change_generator.hh @@ -75,19 +75,19 @@ public: return; } - position_in_partition::less_compare less(_schema); + position_in_partition::tri_compare cmp(_schema); std::optional prev; - while (!_range_tombstones.empty() && less(_range_tombstones.begin()->end_position(), upper_bound)) { + while (!_range_tombstones.empty() && (cmp(_range_tombstones.begin()->end_position(), upper_bound) < 0)) { auto rt = _range_tombstones.pop(_range_tombstones.begin()); - if (prev && less(prev->end_position(), rt.position())) { // [1] + if (prev && (cmp(prev->end_position(), rt.position()) < 0)) { // [1] // previous range tombstone not adjacent, emit gap. consumer(range_tombstone_change(prev->end_position(), tombstone())); } // Check if start of rt was already emitted, emit if not. - if (!less(rt.position(), _lower_bound)) { + if (cmp(rt.position(), _lower_bound) >= 0) { consumer(range_tombstone_change(rt.position(), rt.tomb)); } @@ -99,15 +99,15 @@ public: // It cannot get adjacent later because prev->end_position() < upper_bound, // so nothing == prev->end_position() can be added after this invocation. if (prev && (_range_tombstones.empty() - || less(prev->end_position(), _range_tombstones.begin()->position()))) { + || (cmp(prev->end_position(), _range_tombstones.begin()->position()) < 0))) { consumer(range_tombstone_change(prev->end_position(), tombstone())); // [2] } // Emit the fragment for start bound of a range_tombstone which is overlapping with upper_bound, // unless no such fragment or already emitted. if (!_range_tombstones.empty() - && less(_range_tombstones.begin()->position(), upper_bound) - && (!less(_range_tombstones.begin()->position(), _lower_bound))) { + && (cmp(_range_tombstones.begin()->position(), upper_bound) < 0) + && (cmp(_range_tombstones.begin()->position(), _lower_bound) >= 0)) { consumer(range_tombstone_change( _range_tombstones.begin()->position(), _range_tombstones.begin()->tombstone().tomb)); } From cd171f309cc19d9dbfcbf45e2dd9b27ded60c429 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 2 Apr 2022 17:06:54 +0300 Subject: [PATCH 3/5] range_tombstone_change_generator: flush: emit end_position when upper limit is after all clustered rows When the highest tombstone is open ended, we must emit a closing range_tombstone_change at position_in_partition::after_all_clustered_rows(). Since all consumers need to do it, implement the logic int the range_tombstone_change_generator itself. Signed-off-by: Benny Halevy --- range_tombstone_change_generator.hh | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/range_tombstone_change_generator.hh b/range_tombstone_change_generator.hh index 901b7c9baf..c30c2f450d 100644 --- a/range_tombstone_change_generator.hh +++ b/range_tombstone_change_generator.hh @@ -68,17 +68,23 @@ public: // for accumulated range tombstones. // After this, only range_tombstones with positions >= upper_bound may be added, // which guarantees that they won't affect the output of this flush. + // + // If upper_bound == position_in_partition::after_all_clustered_rows(), + // emits all remaining range_tombstone_changes. + // No range_tombstones may be added after this. + // // FIXME: respect preemption template - void flush(position_in_partition_view upper_bound, C consumer) { + void flush(const position_in_partition_view upper_bound, C consumer) { if (_range_tombstones.empty()) { return; } position_in_partition::tri_compare cmp(_schema); std::optional prev; + bool flush_all = cmp(upper_bound, position_in_partition::after_all_clustered_rows()) == 0; - while (!_range_tombstones.empty() && (cmp(_range_tombstones.begin()->end_position(), upper_bound) < 0)) { + while (!_range_tombstones.empty() && (flush_all || (cmp(_range_tombstones.begin()->end_position(), upper_bound) < 0))) { auto rt = _range_tombstones.pop(_range_tombstones.begin()); if (prev && (cmp(prev->end_position(), rt.position()) < 0)) { // [1] From 002be743f661fd2f5370a92936ef0cb627260721 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 3 Apr 2022 00:17:17 +0300 Subject: [PATCH 4/5] reader: upgrading_consumer: let range_tombstone_change_generator emit last closing change When flushing range tombstones up to position_in_partition::after_all_clustered_rows(), the range_tombstone_change_generator now emits the closing range_tombstone_change, so there's no need for the upgrading_consumer to do so too. Signed-off-by: Benny Halevy --- readers/upgrading_consumer.hh | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/readers/upgrading_consumer.hh b/readers/upgrading_consumer.hh index 5dd657e9d1..7497354502 100644 --- a/readers/upgrading_consumer.hh +++ b/readers/upgrading_consumer.hh @@ -41,11 +41,14 @@ public: bool discardable() const { return _rt_gen.discardable() && !_current_rt; } - void flush_tombstones(position_in_partition_view pos) { + void flush_tombstones(position_in_partition_view pos, bool emit_end = false) { _rt_gen.flush(pos, [&] (range_tombstone_change rt) { _current_rt = rt.tombstone(); do_consume(std::move(rt)); }); + if (emit_end && _current_rt) { + do_consume(range_tombstone_change(pos, {})); + } } void consume(partition_start mf) { _rt_gen.reset(); @@ -72,10 +75,10 @@ public: _rt_gen.consume(std::move(rt)); } void consume(partition_end mf) { - flush_tombstones(position_in_partition::after_all_clustered_rows()); - if (_current_rt) { - assert(!_pr); - do_consume(range_tombstone_change(position_in_partition::after_all_clustered_rows(), {})); + if (_pr) { + flush_tombstones(_pr->end(), true); + } else { + flush_tombstones(position_in_partition::after_all_clustered_rows()); } do_consume(std::move(mf)); } @@ -84,10 +87,7 @@ public: } void on_end_of_stream() { if (_pr) { - flush_tombstones(_pr->end()); - if (_current_rt) { - do_consume(range_tombstone_change(_pr->end(), {})); - } + flush_tombstones(_pr->end(), true); } } }; From b3e2bbe5bd16cc5c64b4a8fd5e62123d5b73b128 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 2 Apr 2022 17:30:29 +0300 Subject: [PATCH 5/5] test: random_mutation_generator: make more interesting range tombstones Include also singular prefix and semi-bounded range tombstones. Signed-off-by: Benny Halevy --- test/lib/mutation_source_test.cc | 55 ++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index c374c20f84..423811c72e 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -2045,6 +2045,7 @@ private: std::uniform_int_distribution _ck_index_dist{0, n_blobs - 1}; std::uniform_int_distribution _bool_dist{0, 1}; std::uniform_int_distribution _not_dummy_dist{0, 19}; + std::uniform_int_distribution _range_tombstone_dist{0, 29}; std::uniform_int_distribution _timestamp_dist{::api::min_timestamp, ::api::min_timestamp + 2}; template @@ -2111,10 +2112,13 @@ public: return clustering_key::from_exploded(*_schema, { random_blob(), random_blob() }); } - clustering_key_prefix make_random_prefix() { + clustering_key_prefix make_random_prefix(std::optional max_components_opt = std::nullopt) { std::vector components = { random_blob() }; - if (_bool_dist(_gen)) { - components.push_back(random_blob()); + auto max_components = max_components_opt.value_or(_schema->clustering_key_size()); + for (size_t i = 1; i < max_components; i++) { + if (_bool_dist(_gen)) { + components.push_back(random_blob()); + } } return clustering_key_prefix::from_exploded(*_schema, std::move(components)); } @@ -2169,13 +2173,46 @@ public: } range_tombstone make_random_range_tombstone() { - auto start = make_random_prefix(); - auto end = make_random_prefix(); - clustering_key_prefix::less_compare less(*_schema); - if (less(end, start)) { - std::swap(start, end); + auto t = random_tombstone(timestamp_level::range_tombstone); + switch (_range_tombstone_dist(_gen)) { + case 0: { + // singular prefix + auto prefix = make_random_prefix(_schema->clustering_key_size()-1); // make sure the prefix is partial + auto start = bound_view(prefix, bound_kind::incl_start); + auto end = bound_view(prefix, bound_kind::incl_end); + return range_tombstone(std::move(start), std::move(end), std::move(t)); + } + case 1: { + // unbound start + auto prefix = make_random_prefix(); + auto start = bound_view::bottom(); + auto end = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end); + return range_tombstone(std::move(start), std::move(end), std::move(t)); + } + case 2: { + // unbound end + auto prefix = make_random_prefix(); + auto start = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start); + auto end = bound_view::top(); + return range_tombstone(std::move(start), std::move(end), std::move(t)); + } + default: + // fully bounded + auto start_prefix = make_random_prefix(); + auto end_prefix = make_random_prefix(); + clustering_key_prefix::tri_compare cmp(*_schema); + auto d = cmp(end_prefix, start_prefix); + while (d == 0) { + end_prefix = make_random_prefix(); + d = cmp(end_prefix, start_prefix); + } + if (d < 0) { + std::swap(end_prefix, start_prefix); + } + auto start = bound_view(std::move(start_prefix), _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start); + auto end = bound_view(std::move(end_prefix), _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end); + return range_tombstone(std::move(start), std::move(end), std::move(t)); } - return range_tombstone(std::move(start), std::move(end), random_tombstone(timestamp_level::range_tombstone)); } mutation operator()() {