mutation_reader_test: add unit test for evictable reader range tombstone trimming

This commit is contained in:
Botond Dénes
2020-09-23 17:48:42 +03:00
parent 4f2e7a18e2
commit d1b0573e1c

View File

@@ -2984,3 +2984,148 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source)
run_mutation_source_tests(make_populate);
}
namespace {
std::deque<mutation_fragment> copy_fragments(const schema& s, const std::deque<mutation_fragment>& o) {
std::deque<mutation_fragment> buf;
for (const auto& mf : o) {
buf.emplace_back(s, mf);
}
return buf;
}
flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment> second_buffer,
size_t max_buffer_size) {
class factory {
schema_ptr _schema;
std::optional<std::deque<mutation_fragment>> _first_buffer;
std::optional<std::deque<mutation_fragment>> _second_buffer;
size_t _max_buffer_size;
private:
std::optional<std::deque<mutation_fragment>> copy_buffer(const std::optional<std::deque<mutation_fragment>>& o) {
if (!o) {
return {};
}
return copy_fragments(*_schema, *o);
}
public:
factory(schema_ptr schema, std::deque<mutation_fragment> first_buffer, std::deque<mutation_fragment> second_buffer, size_t max_buffer_size)
: _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) {
}
factory(const factory& o)
: _schema(o._schema)
, _first_buffer(copy_buffer(o._first_buffer))
, _second_buffer(copy_buffer(o._second_buffer)) {
}
factory(factory&& o) = default;
flat_mutation_reader operator()(
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
BOOST_REQUIRE(s == _schema);
if (_first_buffer) {
auto buf = *std::exchange(_first_buffer, {});
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
if (_second_buffer) {
auto buf = *std::exchange(_second_buffer, {});
auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
return make_empty_flat_reader(_schema);
}
};
auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size));
auto [rd, handle] = make_manually_paused_evictable_reader(
std::move(ms),
schema,
permit,
prange,
slice,
seastar::default_priority_class(),
nullptr,
mutation_reader::forwarding::yes);
rd.set_max_buffer_size(max_buffer_size);
rd.fill_buffer(db::no_timeout).get0();
const auto eq_cmp = position_in_partition::equal_compare(*schema);
BOOST_REQUIRE(rd.is_buffer_full());
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position));
BOOST_REQUIRE(!rd.is_end_of_stream());
rd.detach_buffer();
handle.pause();
while(permit.semaphore().try_evict_one_inactive_read());
return std::move(rd);
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name());
simple_schema s;
const auto pkey = s.make_pkey();
size_t max_buffer_size = 512;
const int first_ck = 100;
const int second_buffer_ck = first_ck + 100;
size_t mem_usage = 0;
std::deque<mutation_fragment> first_buffer;
first_buffer.emplace_back(partition_start{pkey, {}});
mem_usage = first_buffer.back().memory_usage(*s.schema());
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage(*s.schema());
}
const auto last_fragment_position = position_in_partition(first_buffer.back().position());
max_buffer_size = mem_usage;
first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v"));
std::deque<mutation_fragment> second_buffer;
second_buffer.emplace_back(partition_start{pkey, {}});
mem_usage = second_buffer.back().memory_usage(*s.schema());
second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10))));
int ckey = second_buffer_ck;
while (mem_usage <= max_buffer_size) {
second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v"));
mem_usage += second_buffer.back().memory_usage(*s.schema());
}
second_buffer.emplace_back(partition_end{});
auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore.make_permit(), query::full_partition_range,
s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size);
rd.fill_buffer(db::no_timeout).get();
const auto tri_cmp = position_in_partition::tri_compare(*s.schema());
BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0);
}