Merge "Multishard combining reader more robust reader recreation" from Botond
Make the reader recreation logic more robust, by moving away from deciding which fragments have to be dropped based on a bunch of special cases, instead replacing this with a general logic which just drops all already seen fragments (based on their position). Special handling is added for the case when the last position is a range tombstone with a non full prefix starting position. Reproducer unit tests are added for both cases. Refs #4695 Fixes #4733
This commit is contained in:
@@ -924,9 +924,10 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
std::optional<position_in_partition> _last_position_in_partition;
|
||||
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
|
||||
// These are used when the reader has to be recreated (after having been
|
||||
// evicted while paused) and the range and/or slice it is recreated with
|
||||
// differs from the original ones.
|
||||
@@ -934,13 +935,13 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
|
||||
std::optional<query::partition_slice> _slice_override;
|
||||
|
||||
private:
|
||||
void update_last_position(const circular_buffer<mutation_fragment>& buffer);
|
||||
void update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
|
||||
db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
remote_reader(
|
||||
@@ -1038,7 +1039,7 @@ void shard_reader::stop() noexcept {
|
||||
}).finally([zis = shared_from_this()] {}));
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::update_last_position(const circular_buffer<mutation_fragment>& buffer) {
|
||||
void shard_reader::remote_reader::update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) {
|
||||
if (buffer.empty()) {
|
||||
return;
|
||||
}
|
||||
@@ -1049,7 +1050,31 @@ void shard_reader::remote_reader::update_last_position(const circular_buffer<mut
|
||||
_last_pkey = pk_it->as_partition_start().key();
|
||||
}
|
||||
|
||||
_last_position_in_partition.emplace(buffer.back().position());
|
||||
const auto last_pos = buffer.back().position();
|
||||
switch (last_pos.region()) {
|
||||
case partition_region::partition_start:
|
||||
_next_position_in_partition = position_in_partition::for_static_row();
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::adjust_partition_slice() {
|
||||
@@ -1057,9 +1082,8 @@ void shard_reader::remote_reader::adjust_partition_slice() {
|
||||
_slice_override = _ps;
|
||||
}
|
||||
|
||||
auto& last_ckey = _last_position_in_partition->key();
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, last_ckey);
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
|
||||
@@ -1072,25 +1096,22 @@ flat_mutation_reader shard_reader::remote_reader::recreate_reader() {
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
if (_last_position_in_partition) {
|
||||
switch (_last_position_in_partition->region()) {
|
||||
case partition_region::partition_start:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
switch (_next_position_in_partition.region()) {
|
||||
case partition_region::partition_start:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// The original range contained a single partition and we've read it
|
||||
@@ -1129,62 +1150,83 @@ flat_mutation_reader shard_reader::remote_reader::resume_or_create_reader() {
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
bool shard_reader::remote_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
const auto eos = reader.is_end_of_stream();
|
||||
|
||||
if (reader.is_buffer_empty()) {
|
||||
return stop_iteration(eos);
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
if (_drop_partition_start) {
|
||||
_drop_partition_start = false;
|
||||
if (reader.peek_buffer().is_partition_start()) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
}
|
||||
|
||||
if (reader.is_buffer_empty()) {
|
||||
return stop_iteration(eos);
|
||||
}
|
||||
if (_drop_static_row) {
|
||||
_drop_static_row = false;
|
||||
if (reader.peek_buffer().is_static_row()) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
}
|
||||
|
||||
return stop_iteration(reader.is_buffer_full() || eos);
|
||||
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader,
|
||||
circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout) {
|
||||
if (buffer.empty() || !buffer.back().is_range_tombstone()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto stop = [this, &reader, &buffer] {
|
||||
future<> shard_reader::remote_reader::fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
if (next_pos.region() != partition_region::clustered) {
|
||||
return true;
|
||||
}
|
||||
return !next_pos.key().equal(*_schema, buffer.back().position().key());
|
||||
};
|
||||
|
||||
return do_until(stop, [this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
buffer = reader.detach_buffer();
|
||||
auto stop = [this, &reader, &buffer] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
// All other fragment kinds are safe to end the buffer on, and
|
||||
// are guaranteed to represent progress vs. the last buffer fill.
|
||||
if (!buffer.back().is_range_tombstone()) {
|
||||
return true;
|
||||
}
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
// To ensure safe progress we have to ensure the following:
|
||||
//
|
||||
// _next_position_in_partition < buffer.back().position() < next_pos
|
||||
//
|
||||
// * The first condition is to ensure we made progress since the
|
||||
// last buffer fill. Otherwise we might get into an endless loop if
|
||||
// the reader is recreated after each `fill_buffer()` call.
|
||||
// * The second condition is to ensure we have seen all fragments
|
||||
// with the same position. Otherwise we might jump over those
|
||||
// remaining fragments with the same position as the last
|
||||
// fragment's in the buffer when the reader is recreated.
|
||||
return _tri_cmp(_next_position_in_partition, buffer.back().position()) < 0 && _tri_cmp(buffer.back().position(), next_pos) < 0;
|
||||
};
|
||||
// Read additional fragments until it is safe to stop, if needed.
|
||||
// We have to ensure we stop at a fragment such that if the reader is
|
||||
// evicted and recreated later, we won't be skipping any fragments.
|
||||
// Practically, range tombstones are the only ones that are
|
||||
// problematic to end the buffer on. This is due to the fact range
|
||||
// tombstones can have the same position that multiple following range
|
||||
// tombstones, or a single following clustering row in the stream has.
|
||||
// When a range tombstone is the last in the buffer, we have to continue
|
||||
// to read until we are sure we've read all fragments sharing the same
|
||||
// position, so that we can safely continue reading from after said
|
||||
// position.
|
||||
return do_until(stop, [this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader, &buffer] {
|
||||
update_next_position(reader, buffer);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1202,7 +1244,8 @@ shard_reader::remote_reader::remote_reader(
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr) {
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _tri_cmp(*_schema) {
|
||||
}
|
||||
|
||||
future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffer(const dht::partition_range& pr, bool pending_next_partition,
|
||||
@@ -1210,7 +1253,7 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
|
||||
// We could have missed a `fast_forward_to()` if the reader wasn't created yet.
|
||||
_pr = ≺
|
||||
if (pending_next_partition) {
|
||||
_last_position_in_partition = position_in_partition(position_in_partition::end_of_partition_tag_t{});
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
return do_with(resume_or_create_reader(), circular_buffer<mutation_fragment>{},
|
||||
[this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) mutable {
|
||||
@@ -1218,22 +1261,8 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
|
||||
reader.next_partition();
|
||||
}
|
||||
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
|
||||
buffer = reader.detach_buffer();
|
||||
// When the reader is recreated (after having been evicted) we
|
||||
// recreate it such that it starts reading from *after* the last
|
||||
// seen fragment's position. If the last seen fragment is a range
|
||||
// tombstone it is *not* guaranteed that the next fragments in the
|
||||
// data stream have positions strictly greater than the range
|
||||
// tombstone's. If the reader is evicted and has to be recreated,
|
||||
// these fragments would be then skipped as the read would continue
|
||||
// after their position.
|
||||
// To avoid this ensure that the buffer contains *all* fragments for
|
||||
// the last seen position.
|
||||
return ensure_buffer_contains_all_fragments_for_last_pos(reader, buffer, timeout);
|
||||
}).then([this, &reader, &buffer] {
|
||||
return fill_buffer(reader, buffer, timeout).then([this, &reader, &buffer] {
|
||||
const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
update_last_position(buffer);
|
||||
_irh = _lifecycle_policy.pause(std::move(reader));
|
||||
return fill_buffer_result(std::move(buffer), eos);
|
||||
});
|
||||
@@ -1243,7 +1272,7 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
|
||||
future<> shard_reader::remote_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_last_pkey.reset();
|
||||
_last_position_in_partition.reset();
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
|
||||
if (!_reader_created || !_irh) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -129,6 +129,8 @@ public:
|
||||
: _type(partition_region::clustered), _ck(&ck) { }
|
||||
position_in_partition_view(range_tag_t, bound_view bv)
|
||||
: _type(partition_region::clustered), _bound_weight(position_weight(bv.kind())), _ck(&bv.prefix()) { }
|
||||
position_in_partition_view(const clustering_key_prefix& ck, bound_weight w)
|
||||
: _type(partition_region::clustered), _bound_weight(w), _ck(&ck) { }
|
||||
|
||||
static position_in_partition_view for_range_start(const query::clustering_range& r) {
|
||||
return {position_in_partition_view::range_tag_t(), bound_view::from_range_start(r)};
|
||||
@@ -159,6 +161,7 @@ public:
|
||||
}
|
||||
|
||||
partition_region region() const { return _type; }
|
||||
bound_weight get_bound_weight() const { return _bound_weight; }
|
||||
bool is_partition_start() const { return _type == partition_region::partition_start; }
|
||||
bool is_partition_end() const { return _type == partition_region::partition_end; }
|
||||
bool is_static_row() const { return _type == partition_region::static_row; }
|
||||
@@ -271,6 +274,10 @@ public:
|
||||
return {clustering_row_tag_t(), std::move(ck)};
|
||||
}
|
||||
|
||||
static position_in_partition for_partition_start() {
|
||||
return position_in_partition{partition_start_tag_t()};
|
||||
}
|
||||
|
||||
static position_in_partition for_static_row() {
|
||||
return position_in_partition{static_row_tag_t()};
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
#include "tracing/tracing.hh"
|
||||
#include "utils/small_vector.hh"
|
||||
|
||||
class position_in_partition_view;
|
||||
|
||||
namespace query {
|
||||
|
||||
using column_id_vector = utils::small_vector<column_id, 8>;
|
||||
@@ -58,10 +60,20 @@ typedef std::vector<clustering_range> clustering_row_ranges;
|
||||
|
||||
/// Trim the clustering ranges.
|
||||
///
|
||||
/// Equivalent of intersecting each range with [key, +inf), or (-inf, key] if
|
||||
/// Equivalent of intersecting each clustering range with [pos, +inf) position
|
||||
/// in partition range, or (-inf, pos] position in partition range if
|
||||
/// reversed == true. Ranges that do not intersect are dropped. Ranges that
|
||||
/// partially overlap are trimmed.
|
||||
/// Result: each range will overlap fully with [key, +inf), or (-int, key] if
|
||||
/// Result: each range will overlap fully with [pos, +inf), or (-int, pos] if
|
||||
/// reversed is true.
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed = false);
|
||||
|
||||
/// Trim the clustering ranges.
|
||||
///
|
||||
/// Equivalent of intersecting each clustering range with (key, +inf) clustering
|
||||
/// range, or (-inf, key) clustering range if reversed == true. Ranges that do
|
||||
/// not intersect are dropped. Ranges that partially overlap are trimmed.
|
||||
/// Result: each range will overlap fully with (key, +inf), or (-int, key) if
|
||||
/// reversed is true.
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed = false);
|
||||
|
||||
|
||||
32
query.cc
32
query.cc
@@ -71,34 +71,38 @@ std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
|
||||
return out << "{" << s._pk << " : " << join(", ", s._ranges) << "}";
|
||||
}
|
||||
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
|
||||
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed) {
|
||||
auto cmp = [reversed, cmp = position_in_partition::composite_tri_compare(s)] (const auto& a, const auto& b) {
|
||||
return reversed ? cmp(b, a) : cmp(a, b);
|
||||
};
|
||||
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.second : range.first;
|
||||
auto start_bound = [reversed] (const auto& range) -> position_in_partition_view {
|
||||
return reversed ? position_in_partition_view::for_range_end(range) : position_in_partition_view::for_range_start(range);
|
||||
};
|
||||
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
|
||||
return reversed ? range.first : range.second;
|
||||
auto end_bound = [reversed] (const auto& range) -> position_in_partition_view {
|
||||
return reversed ? position_in_partition_view::for_range_start(range) : position_in_partition_view::for_range_end(range);
|
||||
};
|
||||
clustering_key_prefix::equality eq(s);
|
||||
|
||||
auto it = ranges.begin();
|
||||
while (it != ranges.end()) {
|
||||
auto range = bound_view::from_range(*it);
|
||||
if (cmp(end_bound(range), key) || eq(end_bound(range).prefix(), key)) {
|
||||
if (cmp(end_bound(*it), pos) <= 0) {
|
||||
it = ranges.erase(it);
|
||||
continue;
|
||||
} else if (cmp(start_bound(range), key)) {
|
||||
assert(cmp(key, end_bound(range)));
|
||||
auto r = reversed ? clustering_range(it->start(), clustering_range::bound { key, false })
|
||||
: clustering_range(clustering_range::bound { key, false }, it->end());
|
||||
} else if (cmp(start_bound(*it), pos) <= 0) {
|
||||
assert(cmp(pos, end_bound(*it)) < 0);
|
||||
auto r = reversed ?
|
||||
clustering_range(it->start(), clustering_range::bound(pos.key(), pos.get_bound_weight() != bound_weight::before_all_prefixed)) :
|
||||
clustering_range(clustering_range::bound(pos.key(), pos.get_bound_weight() != bound_weight::after_all_prefixed), it->end());
|
||||
*it = std::move(r);
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
|
||||
return trim_clustering_row_ranges_to(s, ranges,
|
||||
position_in_partition_view(key, reversed ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed), reversed);
|
||||
}
|
||||
|
||||
partition_slice::partition_slice(clustering_row_ranges row_ranges,
|
||||
query::column_id_vector static_columns,
|
||||
query::column_id_vector regular_columns,
|
||||
|
||||
@@ -36,6 +36,15 @@ private:
|
||||
mutation_fragment_opt read_next() {
|
||||
return _reader(db::no_timeout).get0();
|
||||
}
|
||||
|
||||
static bool are_tombstones_mergeable(const schema& s, const range_tombstone& a, const range_tombstone& b) {
|
||||
const auto range_a = position_range(position_in_partition(a.position()), position_in_partition(a.end_position()));
|
||||
const auto tri_cmp = position_in_partition::tri_compare(s);
|
||||
return a.tomb.compare(b.tomb) == 0 && (range_a.overlaps(s, b.position(), b.end_position()) ||
|
||||
tri_cmp(a.end_position(), b.position()) == 0 ||
|
||||
tri_cmp(b.end_position(), a.position()) == 0);
|
||||
}
|
||||
|
||||
public:
|
||||
flat_reader_assertions(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader))
|
||||
@@ -238,18 +247,19 @@ public:
|
||||
BOOST_FAIL(format("Expected range tombstone {}, but got {}", rt, mutation_fragment::printer(*_reader.schema(), *mfo)));
|
||||
}
|
||||
const schema& s = *_reader.schema();
|
||||
_tombstones.apply(s, mfo->as_range_tombstone());
|
||||
range_tombstone_list actual_list(s);
|
||||
actual_list.apply(s, mfo->as_range_tombstone());
|
||||
_tombstones.apply(s, mfo->as_range_tombstone());
|
||||
position_in_partition::equal_compare eq(s);
|
||||
while (mutation_fragment* next = _reader.peek(db::no_timeout).get0()) {
|
||||
if (!next->is_range_tombstone() || !eq(next->position(), mfo->position())) {
|
||||
if (!next->is_range_tombstone() || !are_tombstones_mergeable(s, *actual_list.begin(), next->as_range_tombstone())) {
|
||||
break;
|
||||
}
|
||||
auto rt = _reader(db::no_timeout).get0()->as_range_tombstone();
|
||||
actual_list.apply(s, rt);
|
||||
assert(actual_list.size() == 1);
|
||||
_tombstones.apply(s, rt);
|
||||
}
|
||||
actual_list.apply(s, mfo->as_range_tombstone());
|
||||
{
|
||||
range_tombstone_list expected_list(s);
|
||||
expected_list.apply(s, rt);
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
|
||||
|
||||
#include <random>
|
||||
#include <experimental/source_location>
|
||||
|
||||
#include <boost/range/irange.hpp>
|
||||
#include <boost/range/adaptor/uniqued.hpp>
|
||||
@@ -1455,6 +1456,279 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
|
||||
}).get();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) {
|
||||
struct key {
|
||||
int c0;
|
||||
std::optional<int> c1;
|
||||
|
||||
key(int c0, std::optional<int> c1 = {}) : c0(c0), c1(c1) { }
|
||||
|
||||
clustering_key to_clustering_key(const schema& s) const {
|
||||
std::vector<bytes> v;
|
||||
v.push_back(int32_type->decompose(data_value(c0)));
|
||||
if (c1) {
|
||||
v.push_back(int32_type->decompose(data_value(*c1)));
|
||||
}
|
||||
return clustering_key::from_exploded(s, std::move(v));
|
||||
}
|
||||
};
|
||||
struct incl {
|
||||
key value;
|
||||
|
||||
incl(int c0, int c1) : value(c0, c1) { }
|
||||
incl(int c0) : value(c0) { }
|
||||
};
|
||||
struct excl {
|
||||
key value;
|
||||
|
||||
excl(int c0, int c1) : value(c0, c1) { }
|
||||
excl(int c0) : value(c0) { }
|
||||
};
|
||||
struct bound {
|
||||
key value;
|
||||
bool inclusive;
|
||||
|
||||
bound(incl b) : value(b.value), inclusive(true) { }
|
||||
bound(excl b) : value(b.value), inclusive(false) { }
|
||||
};
|
||||
struct inf {
|
||||
};
|
||||
struct range {
|
||||
std::optional<bound> start;
|
||||
std::optional<bound> end;
|
||||
bool singular = false;
|
||||
|
||||
range(bound s, bound e) : start(s), end(e) { }
|
||||
range(inf, bound e) : end(e) { }
|
||||
range(bound s, inf) : start(s) { }
|
||||
range(inf, inf) { }
|
||||
range(bound b) : start(b), end(b), singular(true) { }
|
||||
|
||||
static std::optional<range_bound<clustering_key>> to_bound(const schema& s, std::optional<bound> b) {
|
||||
if (b) {
|
||||
return range_bound<clustering_key>(b->value.to_clustering_key(s), b->inclusive);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
query::clustering_range to_clustering_range(const schema& s) const {
|
||||
return query::clustering_range(to_bound(s, start), to_bound(s, end), singular);
|
||||
}
|
||||
};
|
||||
|
||||
const auto schema = schema_builder("ks", get_name())
|
||||
.with_column("p0", int32_type, column_kind::partition_key)
|
||||
.with_column("c0", int32_type, column_kind::clustering_key)
|
||||
.with_column("c1", int32_type, column_kind::clustering_key)
|
||||
.with_column("v1", int32_type, column_kind::regular_column)
|
||||
.build();
|
||||
|
||||
const auto check = [&schema] (std::vector<range> ranges, key key, std::vector<range> output_ranges, bool reversed = false,
|
||||
std::experimental::source_location sl = std::experimental::source_location::current()) {
|
||||
auto actual_ranges = boost::copy_range<query::clustering_row_ranges>(ranges | boost::adaptors::transformed(
|
||||
[&] (const range& r) { return r.to_clustering_range(*schema); }));
|
||||
|
||||
query::trim_clustering_row_ranges_to(*schema, actual_ranges, key.to_clustering_key(*schema), reversed);
|
||||
|
||||
const auto expected_ranges = boost::copy_range<query::clustering_row_ranges>(output_ranges | boost::adaptors::transformed(
|
||||
[&] (const range& r) { return r.to_clustering_range(*schema); }));
|
||||
|
||||
if (!std::equal(actual_ranges.begin(), actual_ranges.end(), expected_ranges.begin(), expected_ranges.end(),
|
||||
[tri_cmp = clustering_key::tri_compare(*schema)] (const query::clustering_range& a, const query::clustering_range& b) {
|
||||
return a.equal(b, tri_cmp);
|
||||
})) {
|
||||
BOOST_FAIL(fmt::format("Unexpected result\nexpected {}\ngot {}\ncalled from {}:{}", expected_ranges, actual_ranges, sl.file_name(), sl.line()));
|
||||
}
|
||||
};
|
||||
const auto check_reversed = [&schema, &check] (std::vector<range> ranges, key key, std::vector<range> output_ranges, bool reversed = false,
|
||||
std::experimental::source_location sl = std::experimental::source_location::current()) {
|
||||
return check(std::move(ranges), std::move(key), std::move(output_ranges), true, sl);
|
||||
};
|
||||
|
||||
// We want to check the following cases:
|
||||
// 1) Before range
|
||||
// 2) Equal to begin(range with incl begin)
|
||||
// 3) Equal to begin(range with excl begin)
|
||||
// 4) Intersect with range (excl end)
|
||||
// 5) Intersect with range (incl end)
|
||||
// 6) Intersect with range (inf end)
|
||||
// 7) Equal to end(range with incl end)
|
||||
// 8) Equal to end(range with excl end)
|
||||
// 9) After range
|
||||
// 10) Full range
|
||||
|
||||
// (1)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{1, 0},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (2)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{1, 6},
|
||||
{ {excl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (2) - prefix
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{3, 6},
|
||||
{ {excl{3, 6}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (3)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{2, 3},
|
||||
{ {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (3) - prefix
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{3, 7},
|
||||
{ {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (4)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{2, 0},
|
||||
{ {excl{2, 0}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (5)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{90, 90},
|
||||
{ {excl{90, 90}, incl{999, 0}} });
|
||||
|
||||
// (6)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, inf{}} },
|
||||
{90, 90},
|
||||
{ {excl{90, 90}, inf{}} });
|
||||
|
||||
// (7)
|
||||
check(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{2, 3},
|
||||
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
|
||||
|
||||
// (7) - prefix
|
||||
check(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, incl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{4, 39},
|
||||
{ {excl{4, 39}, incl{4}}, {incl{7, 9}, excl{999, 0}} });
|
||||
|
||||
// (8)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{2, 3},
|
||||
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
|
||||
|
||||
// (8) - prefix
|
||||
check(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{4, 11},
|
||||
{ {incl{7, 9}, excl{999, 0}} });
|
||||
|
||||
// (9)
|
||||
check(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{2, 4},
|
||||
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
|
||||
|
||||
// (10)
|
||||
check(
|
||||
{ {inf{}, inf{}} },
|
||||
{7, 9},
|
||||
{ {excl{7, 9}, inf{}} });
|
||||
|
||||
// In reversed now
|
||||
|
||||
// (1)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{999, 1},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
|
||||
|
||||
// (2)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{2, 4},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, excl{2, 4}} });
|
||||
|
||||
// (2) - prefix
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, incl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{4, 43453},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4, 43453}} });
|
||||
|
||||
// (3)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{2, 3},
|
||||
{ {incl{1, 6}, excl{2, 3}} });
|
||||
|
||||
// (3) - prefix
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{4, 3},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}} });
|
||||
|
||||
// (4)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{8, 0},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{8, 0}} });
|
||||
|
||||
// (5)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
|
||||
{90, 90},
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{90, 90}} });
|
||||
|
||||
// (6)
|
||||
check_reversed(
|
||||
{ {inf{}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, inf{}} },
|
||||
{1, 90},
|
||||
{ {inf{}, excl{1, 90}} });
|
||||
|
||||
// (7)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{7, 9},
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}} });
|
||||
|
||||
// (7) - prefix
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{3, 673},
|
||||
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{3, 673}} });
|
||||
|
||||
// (8)
|
||||
check_reversed(
|
||||
{ {excl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{1, 6},
|
||||
{ });
|
||||
|
||||
// (8) - prefix
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, incl{2, 3}}, {excl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{3, 673},
|
||||
{ {incl{1, 6}, incl{2, 3}} });
|
||||
|
||||
// (9)
|
||||
check_reversed(
|
||||
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
|
||||
{0, 4},
|
||||
{});
|
||||
|
||||
// (10)
|
||||
check_reversed(
|
||||
{ {inf{}, inf{}} },
|
||||
{7, 9},
|
||||
{ {inf{}, excl{7, 9}} });
|
||||
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
// Shards tokens such that tokens are owned by shards in a round-robin manner.
|
||||
class dummy_partitioner : public dht::i_partitioner {
|
||||
dht::i_partitioner& _partitioner;
|
||||
@@ -1591,27 +1865,28 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
}
|
||||
|
||||
do_with_cql_env([] (cql_test_env& env) -> future<> {
|
||||
auto make_populate = [] (bool evict_paused_readers) {
|
||||
return [evict_paused_readers] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
|
||||
auto make_populate = [] (bool evict_paused_readers, bool single_fragment_buffer) {
|
||||
return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
|
||||
// We need to group mutations that have the same token so they land on the same shard.
|
||||
std::map<dht::token, std::vector<mutation>> mutations_by_token;
|
||||
std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
|
||||
|
||||
for (const auto& mut : mutations) {
|
||||
mutations_by_token[mut.token()].push_back(mut);
|
||||
mutations_by_token[mut.token()].push_back(freeze(mut));
|
||||
}
|
||||
|
||||
auto partitioner = make_lw_shared<dummy_partitioner>(dht::global_partitioner(), mutations_by_token);
|
||||
|
||||
auto merged_mutations = boost::copy_range<std::vector<std::vector<mutation>>>(mutations_by_token | boost::adaptors::map_values);
|
||||
auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);
|
||||
|
||||
auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
|
||||
for (unsigned shard = 0; shard < partitioner->shard_count(); ++shard) {
|
||||
auto remote_mt = smp::submit_to(shard, [shard, s = global_schema_ptr(s), &merged_mutations, partitioner = *partitioner] {
|
||||
auto mt = make_lw_shared<memtable>(s.get());
|
||||
auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, partitioner = *partitioner] {
|
||||
auto s = gs.get();
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
|
||||
for (unsigned i = shard; i < merged_mutations.size(); i += partitioner.shard_count()) {
|
||||
for (auto& mut : merged_mutations[i]) {
|
||||
mt->apply(mut);
|
||||
mt->apply(mut.unfreeze(s));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1620,22 +1895,26 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
remote_memtables->emplace_back(std::move(remote_mt));
|
||||
}
|
||||
|
||||
return mutation_source([partitioner, remote_memtables, evict_paused_readers] (schema_ptr s,
|
||||
return mutation_source([partitioner, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) mutable {
|
||||
auto factory = [remote_memtables] (
|
||||
auto factory = [remote_memtables, single_fragment_buffer] (
|
||||
schema_ptr s,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return remote_memtables->at(engine().cpu_id())->make_flat_reader(s, range, slice, pc, std::move(trace_state),
|
||||
auto reader = remote_memtables->at(engine().cpu_id())->make_flat_reader(s, range, slice, pc, std::move(trace_state),
|
||||
streamed_mutation::forwarding::no, fwd_mr);
|
||||
if (single_fragment_buffer) {
|
||||
reader.set_max_buffer_size(1);
|
||||
}
|
||||
return reader;
|
||||
};
|
||||
|
||||
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
|
||||
@@ -1648,11 +1927,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
|
||||
};
|
||||
};
|
||||
|
||||
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=false)");
|
||||
run_mutation_source_tests(make_populate(false));
|
||||
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
|
||||
run_mutation_source_tests(make_populate(false, false));
|
||||
|
||||
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true)");
|
||||
run_mutation_source_tests(make_populate(true));
|
||||
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
|
||||
run_mutation_source_tests(make_populate(true, false));
|
||||
|
||||
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
|
||||
run_mutation_source_tests(make_populate(true, true));
|
||||
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
|
||||
@@ -1514,6 +1514,14 @@ static mutation_sets generate_mutation_sets() {
|
||||
result.unequal.emplace_back(mutations{m1, m2});
|
||||
m2.partition().apply_row_tombstone(*s1, key, tomb);
|
||||
result.equal.emplace_back(mutations{m1, m2});
|
||||
|
||||
// Add a row which falls under the tombstone prefix.
|
||||
auto ts = new_timestamp();
|
||||
auto key_full = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0")), data_value(bytes("ck1_1")), });
|
||||
m1.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl);
|
||||
result.unequal.emplace_back(mutations{m1, m2});
|
||||
m2.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl);
|
||||
result.equal.emplace_back(mutations{m1, m2});
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user