test/boost/mutation_test: test_compactor_range_tombstone_spanning_many_pages extend to check v2 output too

This commit is contained in:
Botond Dénes
2022-02-03 09:42:57 +02:00
parent 7a37e30310
commit 2057db54ab

View File

@@ -2720,6 +2720,48 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
}
};
struct consumer_v2 {
reader_permit permit;
mutation& mut;
const uint64_t row_limit;
uint64_t rows = 0;
mutation_rebuilder_v2 builder;
consumer_v2(reader_permit permit, mutation& mut, uint64_t row_limit, uint64_t rows = 0)
: permit(std::move(permit)), mut(mut), row_limit(row_limit), rows(rows), builder(mut.schema())
{ }
void consume_new_partition(const dht::decorated_key& dk) {
BOOST_REQUIRE(mut.decorated_key().equal(*mut.schema(), dk));
builder.consume_new_partition(dk);
}
void consume(const tombstone& t) {
BOOST_REQUIRE_EQUAL(t, mut.partition().partition_tombstone());
builder.consume(t);
}
stop_iteration consume(static_row&& sr, tombstone, bool) {
builder.consume(std::move(sr));
return stop_iteration(++rows >= row_limit);
}
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) {
builder.consume(std::move(cr));
return stop_iteration(++rows >= row_limit);
}
stop_iteration consume(range_tombstone_change&& rtc) {
builder.consume(std::move(rtc));
return stop_iteration(++rows >= row_limit);
}
stop_iteration consume_end_of_partition() {
builder.consume_end_of_partition();
return stop_iteration::yes;
}
void consume_end_of_stream() {
if (auto mut_opt = builder.consume_end_of_stream()) {
mut += *mut_opt;
}
}
};
testlog.info("non-paged");
{
mutation res_mut(s, pk);
@@ -2732,6 +2774,18 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("non-paged v2");
{
mutation res_mut(s, pk);
auto c = compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows});
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
reader.consume(std::move(c)).get();
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("limited pages");
{
mutation res_mut(s, pk);
@@ -2748,6 +2802,22 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("limited pages v2");
{
mutation res_mut(s, pk);
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no, compactor_output_format::v2>>(*s, query_time, s->full_slice(), 1, max_partitions);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
auto c = consumer_v2{permit, res_mut, max_rows};
compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c);
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
}
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("short pages");
{
mutation res_mut(s, pk);
@@ -2764,6 +2834,22 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("short pages v2");
{
mutation res_mut(s, pk);
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no, compactor_output_format::v2>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
auto c = consumer_v2{permit, res_mut, 2};
compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c);
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
}
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("limited pages - detach state");
{
mutation res_mut(s, pk);
@@ -2785,6 +2871,27 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("limited pages - detach state v2");
{
mutation res_mut(s, pk);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
std::optional<detached_compaction_state> detached_state;
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
if (detached_state) {
restore_state(reader, std::move(*detached_state));
}
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no, compactor_output_format::v2>>(*s, query_time, s->full_slice(), 1, max_partitions);
auto c = consumer_v2{permit, res_mut, max_rows};
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
detached_state = std::move(*compaction_state).detach_state();
}
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("short pages - detach state");
{
mutation res_mut(s, pk);
@@ -2805,4 +2912,25 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
testlog.info("short pages - detach state v2");
{
mutation res_mut(s, pk);
auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
std::optional<detached_compaction_state> detached_state;
while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) {
if (detached_state) {
restore_state(reader, std::move(*detached_state));
}
auto compaction_state = make_lw_shared<compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::no, compactor_output_format::v2>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto c = consumer_v2{permit, res_mut, 2};
reader.consume(compact_for_query_v2<emit_only_live_rows::no, consumer_v2>(compaction_state, std::move(c))).get();
detached_state = std::move(*compaction_state).detach_state();
}
BOOST_REQUIRE_EQUAL(res_mut, ref_mut);
}
}