Merge "Multishard combining reader more robust reader recreation" from Botond

Make the reader recreation logic more robust, by moving away from
deciding which fragments have to be dropped based on a bunch of
special cases, instead replacing this with a general logic which just
drops all already seen fragments (based on their position).  Special
handling is added for the case when the last position is a range
tombstone with a non full prefix starting position.  Reproducer unit
tests are added for both cases.

Refs #4695
Fixes #4733
This commit is contained in:
Tomasz Grabiec
2019-08-13 11:52:52 +02:00
7 changed files with 474 additions and 122 deletions

View File

@@ -924,9 +924,10 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
bool _reader_created = false;
bool _drop_partition_start = false;
bool _drop_static_row = false;
position_in_partition::tri_compare _tri_cmp;
std::optional<dht::decorated_key> _last_pkey;
std::optional<position_in_partition> _last_position_in_partition;
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
// These are used when the reader has to be recreated (after having been
// evicted while paused) and the range and/or slice it is recreated with
// differs from the original ones.
@@ -934,13 +935,13 @@ class shard_reader : public enable_lw_shared_from_this<shard_reader>, public fla
std::optional<query::partition_slice> _slice_override;
private:
void update_last_position(const circular_buffer<mutation_fragment>& buffer);
void update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer);
void adjust_partition_slice();
flat_mutation_reader recreate_reader();
flat_mutation_reader resume_or_create_reader();
bool should_drop_fragment(const mutation_fragment& mf);
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
future<> ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
db::timeout_clock::time_point timeout);
future<> fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout);
public:
remote_reader(
@@ -1038,7 +1039,7 @@ void shard_reader::stop() noexcept {
}).finally([zis = shared_from_this()] {}));
}
void shard_reader::remote_reader::update_last_position(const circular_buffer<mutation_fragment>& buffer) {
void shard_reader::remote_reader::update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) {
if (buffer.empty()) {
return;
}
@@ -1049,7 +1050,31 @@ void shard_reader::remote_reader::update_last_position(const circular_buffer<mut
_last_pkey = pk_it->as_partition_start().key();
}
_last_position_in_partition.emplace(buffer.back().position());
const auto last_pos = buffer.back().position();
switch (last_pos.region()) {
case partition_region::partition_start:
_next_position_in_partition = position_in_partition::for_static_row();
break;
case partition_region::static_row:
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
break;
case partition_region::clustered:
if (reader.is_buffer_empty()) {
_next_position_in_partition = position_in_partition::after_key(last_pos);
} else {
const auto& next_frag = reader.peek_buffer();
if (next_frag.is_end_of_partition()) {
buffer.emplace_back(reader.pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition(next_frag.position());
}
}
break;
case partition_region::partition_end:
_next_position_in_partition = position_in_partition::for_partition_start();
break;
}
}
void shard_reader::remote_reader::adjust_partition_slice() {
@@ -1057,9 +1082,8 @@ void shard_reader::remote_reader::adjust_partition_slice() {
_slice_override = _ps;
}
auto& last_ckey = _last_position_in_partition->key();
auto ranges = _slice_override->default_row_ranges();
query::trim_clustering_row_ranges_to(*_schema, ranges, last_ckey);
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
_slice_override->clear_ranges();
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
@@ -1072,25 +1096,22 @@ flat_mutation_reader shard_reader::remote_reader::recreate_reader() {
if (_last_pkey) {
bool partition_range_is_inclusive = true;
if (_last_position_in_partition) {
switch (_last_position_in_partition->region()) {
case partition_region::partition_start:
_drop_partition_start = true;
break;
case partition_region::static_row:
_drop_partition_start = true;
_drop_static_row = true;
break;
case partition_region::clustered:
_drop_partition_start = true;
_drop_static_row = true;
adjust_partition_slice();
slice = &*_slice_override;
break;
case partition_region::partition_end:
partition_range_is_inclusive = false;
break;
}
switch (_next_position_in_partition.region()) {
case partition_region::partition_start:
partition_range_is_inclusive = false;
break;
case partition_region::static_row:
_drop_partition_start = true;
break;
case partition_region::clustered:
_drop_partition_start = true;
_drop_static_row = true;
adjust_partition_slice();
slice = &*_slice_override;
break;
case partition_region::partition_end:
partition_range_is_inclusive = false;
break;
}
// The original range contained a single partition and we've read it
@@ -1129,62 +1150,83 @@ flat_mutation_reader shard_reader::remote_reader::resume_or_create_reader() {
return recreate_reader();
}
bool shard_reader::remote_reader::should_drop_fragment(const mutation_fragment& mf) {
if (_drop_partition_start && mf.is_partition_start()) {
_drop_partition_start = false;
return true;
}
if (_drop_static_row && mf.is_static_row()) {
_drop_static_row = false;
return true;
}
return false;
}
future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
if (!_drop_partition_start && !_drop_static_row) {
return reader.fill_buffer(timeout);
}
return repeat([this, &reader, timeout] {
return reader.fill_buffer(timeout).then([this, &reader] {
const auto eos = reader.is_end_of_stream();
if (reader.is_buffer_empty()) {
return stop_iteration(eos);
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
reader.pop_mutation_fragment();
}
if (_drop_partition_start) {
_drop_partition_start = false;
if (reader.peek_buffer().is_partition_start()) {
reader.pop_mutation_fragment();
}
}
if (reader.is_buffer_empty()) {
return stop_iteration(eos);
}
if (_drop_static_row) {
_drop_static_row = false;
if (reader.peek_buffer().is_static_row()) {
reader.pop_mutation_fragment();
}
}
return stop_iteration(reader.is_buffer_full() || eos);
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
});
});
}
future<> shard_reader::remote_reader::ensure_buffer_contains_all_fragments_for_last_pos(flat_mutation_reader& reader,
circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout) {
if (buffer.empty() || !buffer.back().is_range_tombstone()) {
return make_ready_future<>();
}
auto stop = [this, &reader, &buffer] {
future<> shard_reader::remote_reader::fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
db::timeout_clock::time_point timeout) {
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
if (reader.is_buffer_empty()) {
return reader.is_end_of_stream();
return make_ready_future<>();
}
const auto& next_pos = reader.peek_buffer().position();
if (next_pos.region() != partition_region::clustered) {
return true;
}
return !next_pos.key().equal(*_schema, buffer.back().position().key());
};
return do_until(stop, [this, &reader, &buffer, timeout] {
if (reader.is_buffer_empty()) {
return do_fill_buffer(reader, timeout);
}
buffer.emplace_back(reader.pop_mutation_fragment());
return make_ready_future<>();
buffer = reader.detach_buffer();
auto stop = [this, &reader, &buffer] {
// The only problematic fragment kind is the range tombstone.
// All other fragment kinds are safe to end the buffer on, and
// are guaranteed to represent progress vs. the last buffer fill.
if (!buffer.back().is_range_tombstone()) {
return true;
}
if (reader.is_buffer_empty()) {
return reader.is_end_of_stream();
}
const auto& next_pos = reader.peek_buffer().position();
// To ensure safe progress we have to ensure the following:
//
// _next_position_in_partition < buffer.back().position() < next_pos
//
// * The first condition is to ensure we made progress since the
// last buffer fill. Otherwise we might get into an endless loop if
// the reader is recreated after each `fill_buffer()` call.
// * The second condition is to ensure we have seen all fragments
// with the same position. Otherwise we might jump over those
// remaining fragments with the same position as the last
// fragment's in the buffer when the reader is recreated.
return _tri_cmp(_next_position_in_partition, buffer.back().position()) < 0 && _tri_cmp(buffer.back().position(), next_pos) < 0;
};
// Read additional fragments until it is safe to stop, if needed.
// We have to ensure we stop at a fragment such that if the reader is
// evicted and recreated later, we won't be skipping any fragments.
// Practically, range tombstones are the only ones that are
// problematic to end the buffer on. This is due to the fact range
// tombstones can have the same position that multiple following range
// tombstones, or a single following clustering row in the stream has.
// When a range tombstone is the last in the buffer, we have to continue
// to read until we are sure we've read all fragments sharing the same
// position, so that we can safely continue reading from after said
// position.
return do_until(stop, [this, &reader, &buffer, timeout] {
if (reader.is_buffer_empty()) {
return do_fill_buffer(reader, timeout);
}
buffer.emplace_back(reader.pop_mutation_fragment());
return make_ready_future<>();
});
}).then([this, &reader, &buffer] {
update_next_position(reader, buffer);
});
}
@@ -1202,7 +1244,8 @@ shard_reader::remote_reader::remote_reader(
, _ps(ps)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr) {
, _fwd_mr(fwd_mr)
, _tri_cmp(*_schema) {
}
future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffer(const dht::partition_range& pr, bool pending_next_partition,
@@ -1210,7 +1253,7 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
// We could have missed a `fast_forward_to()` if the reader wasn't created yet.
_pr = &pr;
if (pending_next_partition) {
_last_position_in_partition = position_in_partition(position_in_partition::end_of_partition_tag_t{});
_next_position_in_partition = position_in_partition::for_partition_start();
}
return do_with(resume_or_create_reader(), circular_buffer<mutation_fragment>{},
[this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) mutable {
@@ -1218,22 +1261,8 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
reader.next_partition();
}
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
buffer = reader.detach_buffer();
// When the reader is recreated (after having been evicted) we
// recreate it such that it starts reading from *after* the last
// seen fragment's position. If the last seen fragment is a range
// tombstone it is *not* guaranteed that the next fragments in the
// data stream have positions strictly greater than the range
// tombstone's. If the reader is evicted and has to be recreated,
// these fragments would be then skipped as the read would continue
// after their position.
// To avoid this ensure that the buffer contains *all* fragments for
// the last seen position.
return ensure_buffer_contains_all_fragments_for_last_pos(reader, buffer, timeout);
}).then([this, &reader, &buffer] {
return fill_buffer(reader, buffer, timeout).then([this, &reader, &buffer] {
const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty();
update_last_position(buffer);
_irh = _lifecycle_policy.pause(std::move(reader));
return fill_buffer_result(std::move(buffer), eos);
});
@@ -1243,7 +1272,7 @@ future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffe
future<> shard_reader::remote_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_pr = &pr;
_last_pkey.reset();
_last_position_in_partition.reset();
_next_position_in_partition = position_in_partition::for_partition_start();
if (!_reader_created || !_irh) {
return make_ready_future<>();

View File

@@ -129,6 +129,8 @@ public:
: _type(partition_region::clustered), _ck(&ck) { }
position_in_partition_view(range_tag_t, bound_view bv)
: _type(partition_region::clustered), _bound_weight(position_weight(bv.kind())), _ck(&bv.prefix()) { }
position_in_partition_view(const clustering_key_prefix& ck, bound_weight w)
: _type(partition_region::clustered), _bound_weight(w), _ck(&ck) { }
static position_in_partition_view for_range_start(const query::clustering_range& r) {
return {position_in_partition_view::range_tag_t(), bound_view::from_range_start(r)};
@@ -159,6 +161,7 @@ public:
}
partition_region region() const { return _type; }
bound_weight get_bound_weight() const { return _bound_weight; }
bool is_partition_start() const { return _type == partition_region::partition_start; }
bool is_partition_end() const { return _type == partition_region::partition_end; }
bool is_static_row() const { return _type == partition_region::static_row; }
@@ -271,6 +274,10 @@ public:
return {clustering_row_tag_t(), std::move(ck)};
}
static position_in_partition for_partition_start() {
return position_in_partition{partition_start_tag_t()};
}
static position_in_partition for_static_row() {
return position_in_partition{static_row_tag_t()};
}

View File

@@ -31,6 +31,8 @@
#include "tracing/tracing.hh"
#include "utils/small_vector.hh"
class position_in_partition_view;
namespace query {
using column_id_vector = utils::small_vector<column_id, 8>;
@@ -58,10 +60,20 @@ typedef std::vector<clustering_range> clustering_row_ranges;
/// Trim the clustering ranges.
///
/// Equivalent of intersecting each range with [key, +inf), or (-inf, key] if
/// Equivalent of intersecting each clustering range with [pos, +inf) position
/// in partition range, or (-inf, pos] position in partition range if
/// reversed == true. Ranges that do not intersect are dropped. Ranges that
/// partially overlap are trimmed.
/// Result: each range will overlap fully with [key, +inf), or (-int, key] if
/// Result: each range will overlap fully with [pos, +inf), or (-int, pos] if
/// reversed is true.
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed = false);
/// Trim the clustering ranges.
///
/// Equivalent of intersecting each clustering range with (key, +inf) clustering
/// range, or (-inf, key) clustering range if reversed == true. Ranges that do
/// not intersect are dropped. Ranges that partially overlap are trimmed.
/// Result: each range will overlap fully with (key, +inf), or (-int, key) if
/// reversed is true.
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed = false);

View File

@@ -71,34 +71,38 @@ std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
return out << "{" << s._pk << " : " << join(", ", s._ranges) << "}";
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
auto cmp = [reversed, bv_cmp = bound_view::compare(s)] (const auto& a, const auto& b) {
return reversed ? bv_cmp(b, a) : bv_cmp(a, b);
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed) {
auto cmp = [reversed, cmp = position_in_partition::composite_tri_compare(s)] (const auto& a, const auto& b) {
return reversed ? cmp(b, a) : cmp(a, b);
};
auto start_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.second : range.first;
auto start_bound = [reversed] (const auto& range) -> position_in_partition_view {
return reversed ? position_in_partition_view::for_range_end(range) : position_in_partition_view::for_range_start(range);
};
auto end_bound = [reversed] (const auto& range) -> const bound_view& {
return reversed ? range.first : range.second;
auto end_bound = [reversed] (const auto& range) -> position_in_partition_view {
return reversed ? position_in_partition_view::for_range_start(range) : position_in_partition_view::for_range_end(range);
};
clustering_key_prefix::equality eq(s);
auto it = ranges.begin();
while (it != ranges.end()) {
auto range = bound_view::from_range(*it);
if (cmp(end_bound(range), key) || eq(end_bound(range).prefix(), key)) {
if (cmp(end_bound(*it), pos) <= 0) {
it = ranges.erase(it);
continue;
} else if (cmp(start_bound(range), key)) {
assert(cmp(key, end_bound(range)));
auto r = reversed ? clustering_range(it->start(), clustering_range::bound { key, false })
: clustering_range(clustering_range::bound { key, false }, it->end());
} else if (cmp(start_bound(*it), pos) <= 0) {
assert(cmp(pos, end_bound(*it)) < 0);
auto r = reversed ?
clustering_range(it->start(), clustering_range::bound(pos.key(), pos.get_bound_weight() != bound_weight::before_all_prefixed)) :
clustering_range(clustering_range::bound(pos.key(), pos.get_bound_weight() != bound_weight::after_all_prefixed), it->end());
*it = std::move(r);
}
++it;
}
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
return trim_clustering_row_ranges_to(s, ranges,
position_in_partition_view(key, reversed ? bound_weight::before_all_prefixed : bound_weight::after_all_prefixed), reversed);
}
partition_slice::partition_slice(clustering_row_ranges row_ranges,
query::column_id_vector static_columns,
query::column_id_vector regular_columns,

View File

@@ -36,6 +36,15 @@ private:
mutation_fragment_opt read_next() {
return _reader(db::no_timeout).get0();
}
static bool are_tombstones_mergeable(const schema& s, const range_tombstone& a, const range_tombstone& b) {
const auto range_a = position_range(position_in_partition(a.position()), position_in_partition(a.end_position()));
const auto tri_cmp = position_in_partition::tri_compare(s);
return a.tomb.compare(b.tomb) == 0 && (range_a.overlaps(s, b.position(), b.end_position()) ||
tri_cmp(a.end_position(), b.position()) == 0 ||
tri_cmp(b.end_position(), a.position()) == 0);
}
public:
flat_reader_assertions(flat_mutation_reader reader)
: _reader(std::move(reader))
@@ -238,18 +247,19 @@ public:
BOOST_FAIL(format("Expected range tombstone {}, but got {}", rt, mutation_fragment::printer(*_reader.schema(), *mfo)));
}
const schema& s = *_reader.schema();
_tombstones.apply(s, mfo->as_range_tombstone());
range_tombstone_list actual_list(s);
actual_list.apply(s, mfo->as_range_tombstone());
_tombstones.apply(s, mfo->as_range_tombstone());
position_in_partition::equal_compare eq(s);
while (mutation_fragment* next = _reader.peek(db::no_timeout).get0()) {
if (!next->is_range_tombstone() || !eq(next->position(), mfo->position())) {
if (!next->is_range_tombstone() || !are_tombstones_mergeable(s, *actual_list.begin(), next->as_range_tombstone())) {
break;
}
auto rt = _reader(db::no_timeout).get0()->as_range_tombstone();
actual_list.apply(s, rt);
assert(actual_list.size() == 1);
_tombstones.apply(s, rt);
}
actual_list.apply(s, mfo->as_range_tombstone());
{
range_tombstone_list expected_list(s);
expected_list.apply(s, rt);

View File

@@ -21,6 +21,7 @@
#include <random>
#include <experimental/source_location>
#include <boost/range/irange.hpp>
#include <boost/range/adaptor/uniqued.hpp>
@@ -1455,6 +1456,279 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
}).get();
}
SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) {
struct key {
int c0;
std::optional<int> c1;
key(int c0, std::optional<int> c1 = {}) : c0(c0), c1(c1) { }
clustering_key to_clustering_key(const schema& s) const {
std::vector<bytes> v;
v.push_back(int32_type->decompose(data_value(c0)));
if (c1) {
v.push_back(int32_type->decompose(data_value(*c1)));
}
return clustering_key::from_exploded(s, std::move(v));
}
};
struct incl {
key value;
incl(int c0, int c1) : value(c0, c1) { }
incl(int c0) : value(c0) { }
};
struct excl {
key value;
excl(int c0, int c1) : value(c0, c1) { }
excl(int c0) : value(c0) { }
};
struct bound {
key value;
bool inclusive;
bound(incl b) : value(b.value), inclusive(true) { }
bound(excl b) : value(b.value), inclusive(false) { }
};
struct inf {
};
struct range {
std::optional<bound> start;
std::optional<bound> end;
bool singular = false;
range(bound s, bound e) : start(s), end(e) { }
range(inf, bound e) : end(e) { }
range(bound s, inf) : start(s) { }
range(inf, inf) { }
range(bound b) : start(b), end(b), singular(true) { }
static std::optional<range_bound<clustering_key>> to_bound(const schema& s, std::optional<bound> b) {
if (b) {
return range_bound<clustering_key>(b->value.to_clustering_key(s), b->inclusive);
}
return {};
}
query::clustering_range to_clustering_range(const schema& s) const {
return query::clustering_range(to_bound(s, start), to_bound(s, end), singular);
}
};
const auto schema = schema_builder("ks", get_name())
.with_column("p0", int32_type, column_kind::partition_key)
.with_column("c0", int32_type, column_kind::clustering_key)
.with_column("c1", int32_type, column_kind::clustering_key)
.with_column("v1", int32_type, column_kind::regular_column)
.build();
const auto check = [&schema] (std::vector<range> ranges, key key, std::vector<range> output_ranges, bool reversed = false,
std::experimental::source_location sl = std::experimental::source_location::current()) {
auto actual_ranges = boost::copy_range<query::clustering_row_ranges>(ranges | boost::adaptors::transformed(
[&] (const range& r) { return r.to_clustering_range(*schema); }));
query::trim_clustering_row_ranges_to(*schema, actual_ranges, key.to_clustering_key(*schema), reversed);
const auto expected_ranges = boost::copy_range<query::clustering_row_ranges>(output_ranges | boost::adaptors::transformed(
[&] (const range& r) { return r.to_clustering_range(*schema); }));
if (!std::equal(actual_ranges.begin(), actual_ranges.end(), expected_ranges.begin(), expected_ranges.end(),
[tri_cmp = clustering_key::tri_compare(*schema)] (const query::clustering_range& a, const query::clustering_range& b) {
return a.equal(b, tri_cmp);
})) {
BOOST_FAIL(fmt::format("Unexpected result\nexpected {}\ngot {}\ncalled from {}:{}", expected_ranges, actual_ranges, sl.file_name(), sl.line()));
}
};
const auto check_reversed = [&schema, &check] (std::vector<range> ranges, key key, std::vector<range> output_ranges, bool reversed = false,
std::experimental::source_location sl = std::experimental::source_location::current()) {
return check(std::move(ranges), std::move(key), std::move(output_ranges), true, sl);
};
// We want to check the following cases:
// 1) Before range
// 2) Equal to begin(range with incl begin)
// 3) Equal to begin(range with excl begin)
// 4) Intersect with range (excl end)
// 5) Intersect with range (incl end)
// 6) Intersect with range (inf end)
// 7) Equal to end(range with incl end)
// 8) Equal to end(range with excl end)
// 9) After range
// 10) Full range
// (1)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{1, 0},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (2)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{1, 6},
{ {excl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (2) - prefix
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{3, 6},
{ {excl{3, 6}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (3)
check(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 3},
{ {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (3) - prefix
check(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{3, 7},
{ {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (4)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 0},
{ {excl{2, 0}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (5)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{90, 90},
{ {excl{90, 90}, incl{999, 0}} });
// (6)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, inf{}} },
{90, 90},
{ {excl{90, 90}, inf{}} });
// (7)
check(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 3},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (7) - prefix
check(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, incl{4}}, {incl{7, 9}, excl{999, 0}} },
{4, 39},
{ {excl{4, 39}, incl{4}}, {incl{7, 9}, excl{999, 0}} });
// (8)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 3},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (8) - prefix
check(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{4, 11},
{ {incl{7, 9}, excl{999, 0}} });
// (9)
check(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 4},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (10)
check(
{ {inf{}, inf{}} },
{7, 9},
{ {excl{7, 9}, inf{}} });
// In reversed now
// (1)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{999, 1},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (2)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 4},
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, excl{2, 4}} });
// (2) - prefix
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, incl{4}}, {incl{7, 9}, incl{999, 0}} },
{4, 43453},
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4, 43453}} });
// (3)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 3},
{ {incl{1, 6}, excl{2, 3}} });
// (3) - prefix
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{4, 3},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}} });
// (4)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{8, 0},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{8, 0}} });
// (5)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{90, 90},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{90, 90}} });
// (6)
check_reversed(
{ {inf{}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, inf{}} },
{1, 90},
{ {inf{}, excl{1, 90}} });
// (7)
check_reversed(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{7, 9},
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}} });
// (7) - prefix
check_reversed(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{3, 673},
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{3, 673}} });
// (8)
check_reversed(
{ {excl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{1, 6},
{ });
// (8) - prefix
check_reversed(
{ {incl{1, 6}, incl{2, 3}}, {excl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{3, 673},
{ {incl{1, 6}, incl{2, 3}} });
// (9)
check_reversed(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{0, 4},
{});
// (10)
check_reversed(
{ {inf{}, inf{}} },
{7, 9},
{ {inf{}, excl{7, 9}} });
return make_ready_future<>();
}
// Shards tokens such that tokens are owned by shards in a round-robin manner.
class dummy_partitioner : public dht::i_partitioner {
dht::i_partitioner& _partitioner;
@@ -1591,27 +1865,28 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
}
do_with_cql_env([] (cql_test_env& env) -> future<> {
auto make_populate = [] (bool evict_paused_readers) {
return [evict_paused_readers] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
auto make_populate = [] (bool evict_paused_readers, bool single_fragment_buffer) {
return [evict_paused_readers, single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations) mutable {
// We need to group mutations that have the same token so they land on the same shard.
std::map<dht::token, std::vector<mutation>> mutations_by_token;
std::map<dht::token, std::vector<frozen_mutation>> mutations_by_token;
for (const auto& mut : mutations) {
mutations_by_token[mut.token()].push_back(mut);
mutations_by_token[mut.token()].push_back(freeze(mut));
}
auto partitioner = make_lw_shared<dummy_partitioner>(dht::global_partitioner(), mutations_by_token);
auto merged_mutations = boost::copy_range<std::vector<std::vector<mutation>>>(mutations_by_token | boost::adaptors::map_values);
auto merged_mutations = boost::copy_range<std::vector<std::vector<frozen_mutation>>>(mutations_by_token | boost::adaptors::map_values);
auto remote_memtables = make_lw_shared<std::vector<foreign_ptr<lw_shared_ptr<memtable>>>>();
for (unsigned shard = 0; shard < partitioner->shard_count(); ++shard) {
auto remote_mt = smp::submit_to(shard, [shard, s = global_schema_ptr(s), &merged_mutations, partitioner = *partitioner] {
auto mt = make_lw_shared<memtable>(s.get());
auto remote_mt = smp::submit_to(shard, [shard, gs = global_schema_ptr(s), &merged_mutations, partitioner = *partitioner] {
auto s = gs.get();
auto mt = make_lw_shared<memtable>(s);
for (unsigned i = shard; i < merged_mutations.size(); i += partitioner.shard_count()) {
for (auto& mut : merged_mutations[i]) {
mt->apply(mut);
mt->apply(mut.unfreeze(s));
}
}
@@ -1620,22 +1895,26 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
remote_memtables->emplace_back(std::move(remote_mt));
}
return mutation_source([partitioner, remote_memtables, evict_paused_readers] (schema_ptr s,
return mutation_source([partitioner, remote_memtables, evict_paused_readers, single_fragment_buffer] (schema_ptr s,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto factory = [remote_memtables] (
auto factory = [remote_memtables, single_fragment_buffer] (
schema_ptr s,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
return remote_memtables->at(engine().cpu_id())->make_flat_reader(s, range, slice, pc, std::move(trace_state),
auto reader = remote_memtables->at(engine().cpu_id())->make_flat_reader(s, range, slice, pc, std::move(trace_state),
streamed_mutation::forwarding::no, fwd_mr);
if (single_fragment_buffer) {
reader.set_max_buffer_size(1);
}
return reader;
};
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory), evict_paused_readers);
@@ -1648,11 +1927,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_as_mutation_source) {
};
};
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=false)");
run_mutation_source_tests(make_populate(false));
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=false, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false, false));
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true)");
run_mutation_source_tests(make_populate(true));
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(true, false));
BOOST_TEST_MESSAGE("run_mutation_source_tests(evict_readers=true, single_fragment_buffer=true)");
run_mutation_source_tests(make_populate(true, true));
return make_ready_future<>();
}).get();

View File

@@ -1514,6 +1514,14 @@ static mutation_sets generate_mutation_sets() {
result.unequal.emplace_back(mutations{m1, m2});
m2.partition().apply_row_tombstone(*s1, key, tomb);
result.equal.emplace_back(mutations{m1, m2});
// Add a row which falls under the tombstone prefix.
auto ts = new_timestamp();
auto key_full = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0")), data_value(bytes("ck1_1")), });
m1.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl);
result.unequal.emplace_back(mutations{m1, m2});
m2.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl);
result.equal.emplace_back(mutations{m1, m2});
}
{