Merge 'range_tombstone_change_generator: flush: emit closing range_tombstone_change' from Benny Halevy

When the highest tombstone is open ended, we must
emit a closing range_tombstone_change at
position_in_partition::after_all_clustered_rows().

Since all consumers need to do it, implement the logic
in the range_tombstone_change_generator itself.

It turned out that mutation::consume doesn't do that,
hence this series, and 5a09e5234ef4e1ee673bc7fca481defbbb2c0384 in particular,
fix the issue.

Change 028b2a8cdfdc12721b2be23d175cbc756d2507de exposes the issue
by generating a richer set of random range_tombstone that include open-ended
range tombstones.

Fixes #10316

Test: unit(dev)

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes #10317

* github.com:scylladb/scylla:
  test: random_mutation_generator: make more interesting range tombstones
  reader: upgrading_consumer: let range_tombstone_change_generator emit last closing change
  range_tombstone_change_generator: flush: emit end_position when upper limit is after all clustered rows
  range_tombstone_change_generator: flush: use tri_compare rather than less
  range_tombstone_change_generator: flush: return early if empty
This commit is contained in:
Tomasz Grabiec
2022-04-04 19:07:45 +02:00
3 changed files with 74 additions and 27 deletions

View File

@@ -68,22 +68,32 @@ public:
// for accumulated range tombstones.
// After this, only range_tombstones with positions >= upper_bound may be added,
// which guarantees that they won't affect the output of this flush.
//
// If upper_bound == position_in_partition::after_all_clustered_rows(),
// emits all remaining range_tombstone_changes.
// No range_tombstones may be added after this.
//
// FIXME: respect preemption
template<RangeTombstoneChangeConsumer C>
void flush(position_in_partition_view upper_bound, C consumer) {
position_in_partition::less_compare less(_schema);
std::optional<range_tombstone> prev;
void flush(const position_in_partition_view upper_bound, C consumer) {
if (_range_tombstones.empty()) {
return;
}
while (!_range_tombstones.empty() && less(_range_tombstones.begin()->end_position(), upper_bound)) {
position_in_partition::tri_compare cmp(_schema);
std::optional<range_tombstone> prev;
bool flush_all = cmp(upper_bound, position_in_partition::after_all_clustered_rows()) == 0;
while (!_range_tombstones.empty() && (flush_all || (cmp(_range_tombstones.begin()->end_position(), upper_bound) < 0))) {
auto rt = _range_tombstones.pop(_range_tombstones.begin());
if (prev && less(prev->end_position(), rt.position())) { // [1]
if (prev && (cmp(prev->end_position(), rt.position()) < 0)) { // [1]
// previous range tombstone not adjacent, emit gap.
consumer(range_tombstone_change(prev->end_position(), tombstone()));
}
// Check if start of rt was already emitted, emit if not.
if (!less(rt.position(), _lower_bound)) {
if (cmp(rt.position(), _lower_bound) >= 0) {
consumer(range_tombstone_change(rt.position(), rt.tomb));
}
@@ -95,15 +105,15 @@ public:
// It cannot get adjacent later because prev->end_position() < upper_bound,
// so nothing == prev->end_position() can be added after this invocation.
if (prev && (_range_tombstones.empty()
|| less(prev->end_position(), _range_tombstones.begin()->position()))) {
|| (cmp(prev->end_position(), _range_tombstones.begin()->position()) < 0))) {
consumer(range_tombstone_change(prev->end_position(), tombstone())); // [2]
}
// Emit the fragment for start bound of a range_tombstone which is overlapping with upper_bound,
// unless no such fragment or already emitted.
if (!_range_tombstones.empty()
&& less(_range_tombstones.begin()->position(), upper_bound)
&& (!less(_range_tombstones.begin()->position(), _lower_bound))) {
&& (cmp(_range_tombstones.begin()->position(), upper_bound) < 0)
&& (cmp(_range_tombstones.begin()->position(), _lower_bound) >= 0)) {
consumer(range_tombstone_change(
_range_tombstones.begin()->position(), _range_tombstones.begin()->tombstone().tomb));
}

View File

@@ -41,11 +41,14 @@ public:
bool discardable() const {
return _rt_gen.discardable() && !_current_rt;
}
void flush_tombstones(position_in_partition_view pos) {
void flush_tombstones(position_in_partition_view pos, bool emit_end = false) {
_rt_gen.flush(pos, [&] (range_tombstone_change rt) {
_current_rt = rt.tombstone();
do_consume(std::move(rt));
});
if (emit_end && _current_rt) {
do_consume(range_tombstone_change(pos, {}));
}
}
void consume(partition_start mf) {
_rt_gen.reset();
@@ -72,10 +75,10 @@ public:
_rt_gen.consume(std::move(rt));
}
void consume(partition_end mf) {
flush_tombstones(position_in_partition::after_all_clustered_rows());
if (_current_rt) {
assert(!_pr);
do_consume(range_tombstone_change(position_in_partition::after_all_clustered_rows(), {}));
if (_pr) {
flush_tombstones(_pr->end(), true);
} else {
flush_tombstones(position_in_partition::after_all_clustered_rows());
}
do_consume(std::move(mf));
}
@@ -84,10 +87,7 @@ public:
}
void on_end_of_stream() {
if (_pr) {
flush_tombstones(_pr->end());
if (_current_rt) {
do_consume(range_tombstone_change(_pr->end(), {}));
}
flush_tombstones(_pr->end(), true);
}
}
};

View File

@@ -2045,6 +2045,7 @@ private:
std::uniform_int_distribution<size_t> _ck_index_dist{0, n_blobs - 1};
std::uniform_int_distribution<int> _bool_dist{0, 1};
std::uniform_int_distribution<int> _not_dummy_dist{0, 19};
std::uniform_int_distribution<int> _range_tombstone_dist{0, 29};
std::uniform_int_distribution<api::timestamp_type> _timestamp_dist{::api::min_timestamp, ::api::min_timestamp + 2};
template <typename Generator>
@@ -2111,10 +2112,13 @@ public:
return clustering_key::from_exploded(*_schema, { random_blob(), random_blob() });
}
clustering_key_prefix make_random_prefix() {
clustering_key_prefix make_random_prefix(std::optional<size_t> max_components_opt = std::nullopt) {
std::vector<bytes> components = { random_blob() };
if (_bool_dist(_gen)) {
components.push_back(random_blob());
auto max_components = max_components_opt.value_or(_schema->clustering_key_size());
for (size_t i = 1; i < max_components; i++) {
if (_bool_dist(_gen)) {
components.push_back(random_blob());
}
}
return clustering_key_prefix::from_exploded(*_schema, std::move(components));
}
@@ -2169,13 +2173,46 @@ public:
}
range_tombstone make_random_range_tombstone() {
auto start = make_random_prefix();
auto end = make_random_prefix();
clustering_key_prefix::less_compare less(*_schema);
if (less(end, start)) {
std::swap(start, end);
auto t = random_tombstone(timestamp_level::range_tombstone);
switch (_range_tombstone_dist(_gen)) {
case 0: {
// singular prefix
auto prefix = make_random_prefix(_schema->clustering_key_size()-1); // make sure the prefix is partial
auto start = bound_view(prefix, bound_kind::incl_start);
auto end = bound_view(prefix, bound_kind::incl_end);
return range_tombstone(std::move(start), std::move(end), std::move(t));
}
case 1: {
// unbound start
auto prefix = make_random_prefix();
auto start = bound_view::bottom();
auto end = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end);
return range_tombstone(std::move(start), std::move(end), std::move(t));
}
case 2: {
// unbound end
auto prefix = make_random_prefix();
auto start = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start);
auto end = bound_view::top();
return range_tombstone(std::move(start), std::move(end), std::move(t));
}
default:
// fully bounded
auto start_prefix = make_random_prefix();
auto end_prefix = make_random_prefix();
clustering_key_prefix::tri_compare cmp(*_schema);
auto d = cmp(end_prefix, start_prefix);
while (d == 0) {
end_prefix = make_random_prefix();
d = cmp(end_prefix, start_prefix);
}
if (d < 0) {
std::swap(end_prefix, start_prefix);
}
auto start = bound_view(std::move(start_prefix), _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start);
auto end = bound_view(std::move(end_prefix), _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end);
return range_tombstone(std::move(start), std::move(end), std::move(t));
}
return range_tombstone(std::move(start), std::move(end), random_tombstone(timestamp_level::range_tombstone));
}
mutation operator()() {