sstables: crawling mx-reader: make on_out_of_clustering_range() no-op

Said method currently emits a partition-end. This method is only called
when the last fragment in the stream is a range tombstone change with a
position after all clustered rows. The problem is that
consume_partition_end() is also called unconditionally, resulting in two
partition-end fragments being emitted. The fix is simple: make this
method a no-op, there is nothing to do there.

Also add two tests: one targeted to this bug and another one testing the
crawling reader with random mutations generated for random schema.

Fixes: #11421

Closes #11422

(cherry picked from commit be9d1c4df4)
This commit is contained in:
Botond Dénes
2022-08-31 17:40:07 +03:00
committed by Avi Kivity
parent 426d045249
commit 9b1a570f6f
2 changed files with 56 additions and 3 deletions

View File

@@ -1754,9 +1754,7 @@ public:
_monitor.on_read_started(_context->reader_position());
}
public:
void on_out_of_clustering_range() override {
push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, partition_end()));
}
void on_out_of_clustering_range() override { }
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_internal_error(sstlog, "mx_crawling_sstable_mutation_reader: doesn't support fast_forward_to(const dht::partition_range&)");
}

View File

@@ -3149,3 +3149,58 @@ SEASTAR_TEST_CASE(test_index_fast_forwarding_after_eof) {
return make_ready_future<>();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_out_of_range_last_range_tombstone_change) {
return test_env::do_with_async([] (test_env& env) {
simple_schema table;
auto mut = table.new_mutation("pk0");
auto ckeys = table.make_ckeys(4);
table.add_row(mut, ckeys[0], "v0");
table.add_row(mut, ckeys[1], "v1");
table.add_row(mut, ckeys[2], "v2");
using bound = query::clustering_range::bound;
table.delete_range(mut, query::clustering_range::make(bound{ckeys[3], true}, bound{clustering_key::make_empty(), true}), tombstone(1, gc_clock::now()));
auto tmp = tmpdir();
auto sst_gen = [&env, &table, &tmp] () {
return env.make_sstable(table.schema(), tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, {mut});
assert_that(sst->make_crawling_reader(table.schema(), env.make_reader_permit())).has_monotonic_positions();
});
}
SEASTAR_TEST_CASE(test_crawling_reader_random_schema_random_mutations) {
return test_env::do_with_async([this] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
testlog.info("Random schema:\n{}", random_schema.cql());
const auto muts = tests::generate_random_mutations(random_schema, 20).get();
auto tmp = tmpdir();
auto sst_gen = [&env, schema, &tmp] () {
return env.make_sstable(schema, tmp.path().string(), 1, sstables::get_highest_sstable_version(), big);
};
auto sst = make_sstable_containing(sst_gen, muts);
{
auto rd = assert_that(sst->make_crawling_reader(schema, env.make_reader_permit()));
for (const auto& mut : muts) {
rd.produces(mut);
}
}
assert_that(sst->make_crawling_reader(schema, env.make_reader_permit())).has_monotonic_positions();
});
}