test: Add test that compaction doesn't cross logical group boundary

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2025-06-23 16:31:13 -03:00
committed by Botond Dénes
parent d351b0726b
commit beaaf00fac
4 changed files with 78 additions and 0 deletions

View File

@@ -202,3 +202,70 @@ SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
}
});
}
static mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, reader_permit permit) {
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
}
SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "compactions_dont_cross_group_boundary")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
auto s = builder.build();
auto t = env.make_table_for_tests(s);
auto close_table = deferred_stop(t);
t->start();
// Disable auto compaction to allow us to trigger compaction manually later.
t->disable_auto_compaction().get();
auto is_unrepaired = [] (dht::token t) { return t.raw() % 3 == 0; };
auto is_repairing = [] (dht::token t) { return t.raw() % 3 == 1; };
auto is_repaired = [] (dht::token t) { return t.raw() % 3 == 2; };
auto sst_factory = env.make_sst_factory(s);
auto generate_sstables = [&] (std::function<bool(dht::token)> filter) {
for (int i = 0; i < 4; i++) {
t->add_sstable_and_update_cache(generate_sstable(s, sst_factory, filter)).get();
}
};
generate_sstables(is_unrepaired);
generate_sstables(is_repairing);
generate_sstables(is_repaired);
auto repair_token_classifier = [&] (dht::token t) -> replica::repair_sstable_classification {
if (is_unrepaired(t)) {
return replica::repair_sstable_classification::unrepaired;
} else if (is_repairing(t)) {
return replica::repair_sstable_classification::repairing;
}
return replica::repair_sstable_classification::repaired;
};
auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst) -> replica::repair_sstable_classification {
return repair_token_classifier(sst->get_first_decorated_key().token());
};
t.set_repair_sstable_classifier(repair_sstable_classifier);
for (int i = 0; i < 4; i++) {
t->compact_all_sstables({}).get();
}
auto validate_sstable = [&] (const sstables::shared_sstable& sst) {
auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive.
auto close_reader = deferred_close(reader);
auto expected_classification = repair_sstable_classifier(sst);
while (auto m = read_mutation_from_mutation_reader(reader).get()) {
BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification);
}
};
auto all_sstables = t->get_sstables();
for (auto& sst : *all_sstables) {
validate_sstable(sst);
}
});
}

View File

@@ -170,6 +170,12 @@ void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexce
_data->cf->set_tombstone_gc_enabled(tombstone_gc_enabled);
}
void table_for_tests::set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier) {
_data->cf->for_each_compaction_group([&] (replica::compaction_group& cg) {
cg.set_repair_sstable_classifier(repair_sstable_classifier);
});
}
namespace sstables {
std::vector<db::object_storage_endpoint_param> make_storage_options_config(const data_dictionary::storage_options& so) {

View File

@@ -67,4 +67,5 @@ struct table_for_tests {
future<> stop();
void set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept;
void set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier);
};