/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include "test/lib/scylla_test_case.hh" #include "test/lib/test_services.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/sstable_utils.hh" #include "test/lib/random_utils.hh" #include "test/lib/key_utils.hh" #include "schema/schema.hh" #include "schema/schema_builder.hh" #include "sstables/sstables.hh" #include "sstables/compress.hh" #include "compaction/compaction.hh" #include "compaction/compaction_manager.hh" #include "replica/compaction_group.hh" using namespace sstables; static sstables::shared_sstable generate_sstable(schema_ptr s, std::function sst_gen, noncopyable_function token_filter) { auto make_insert = [&] (const dht::decorated_key& key) { static thread_local int32_t value = 1; mutation m(s, key); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)}); m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), api::timestamp_clock::now().time_since_epoch().count()); return m; }; auto keys = tests::generate_partition_keys(100, s); utils::chunked_vector muts; muts.reserve(keys.size()); for (auto& k : keys) { if (token_filter(k.token())) { muts.push_back(make_insert(k)); } } return make_sstable_containing(sst_gen, std::move(muts)); } static sstables::shared_sstable sstable_that_needs_split(schema_ptr s, std::function sst_gen) { return generate_sstable(std::move(s), std::move(sst_gen), [] (dht::token) { return true; }); } class single_compaction_group : public compaction::compaction_group_view { private: schema_ptr _schema; sstables::sstables_manager& _sst_man; sstables::sstable_set _main_set; sstables::sstable_set _maintenance_set; std::vector _compacted_undeleted_sstables; mutable compaction::compaction_strategy _compaction_strategy; compaction::compaction_strategy_state _compaction_strategy_state; tombstone_gc_state _tombstone_gc_state; compaction::compaction_backlog_tracker _backlog_tracker; condition_variable _staging_done_condition; std::function _sstable_factory; mutable tests::reader_concurrency_semaphore_wrapper _semaphore; public: single_compaction_group(table_for_tests& t, sstables::sstables_manager& sst_man, std::function sstable_factory) : _schema(t.schema()) , _sst_man(sst_man) , _main_set(sstables::make_partitioned_sstable_set(_schema, token_range())) , _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range())) , _compaction_strategy(compaction::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options())) , _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy)) , _tombstone_gc_state(tombstone_gc_state::for_tests()) , _backlog_tracker(_compaction_strategy.make_backlog_tracker()) , _sstable_factory(std::move(sstable_factory)) { t->get_compaction_manager().add(*this); } future<> stop(table_for_tests& t) { return t->get_compaction_manager().remove(*this); } void rebuild_main_set(std::vector to_add, std::vector to_remove) { for (auto& sst : to_remove) { _main_set.erase(sst); } for (auto& sst : to_add) { _main_set.insert(sst); } } virtual dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); } virtual const schema_ptr& schema() const noexcept override { return _schema; } virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); } virtual bool compaction_enforce_min_threshold() const noexcept override { return false; } virtual future> main_sstable_set() const override { co_return make_lw_shared(_main_set); } virtual future> maintenance_sstable_set() const override { co_return make_lw_shared(_maintenance_set); } virtual lw_shared_ptr sstable_set_for_tombstone_gc() const override { return make_lw_shared(_main_set); } virtual std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point compaction_time) const override { return {}; } virtual const std::vector& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; } virtual compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; } virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; } virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); } virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; } virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); } virtual sstables::shared_sstable make_sstable(sstables::sstable_state, sstables::sstable_version_types) const override { return _sstable_factory(); } virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); } virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; } virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; } virtual api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return api::min_timestamp; } virtual bool memtable_has_key(const dht::decorated_key& key) const override { return false; } virtual future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { testlog.info("Adding {} sstable(s), removing {} sstables", desc.new_sstables.size(), desc.old_sstables.size()); rebuild_main_set(desc.new_sstables, desc.old_sstables); return make_ready_future<>(); } virtual bool is_auto_compaction_disabled_by_user() const noexcept override { return false; } virtual bool tombstone_gc_enabled() const noexcept override { return false; } virtual tombstone_gc_state get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; } virtual compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } virtual const std::string get_group_id() const noexcept override { return "0"; } virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; } dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); } int64_t get_sstables_repaired_at() const noexcept override { return 0; } }; SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) { return test_env::do_with_async([] (test_env& env) { auto builder = schema_builder("tests", "compaction_group_splitting") .with_column("id", utf8_type, column_kind::partition_key) .with_column("cl", int32_type, column_kind::clustering_key) .with_column("value", int32_type); auto s = builder.build(); auto t = env.make_table_for_tests(s); auto close_table = deferred_stop(t); t->start(); auto sst_factory = env.make_sst_factory(s); auto classifier = [] (dht::token t) -> mutation_writer::token_group_id { return dht::compaction_group_of(1, t); }; auto sstable_needs_split = [&] (const sstables::shared_sstable& sst) { return classifier(sst->get_first_decorated_key().token()) != classifier(sst->get_last_decorated_key().token()); }; auto run_test = [&] (std::vector ssts, size_t expected_output, noncopyable_function validate) { auto compaction_group = std::make_unique(t, env.manager(), sst_factory); compaction_group->rebuild_main_set(ssts, {}); auto& cm = t->get_compaction_manager(); auto expected_compaction_size = std::ranges::fold_left(ssts | std::views::transform([&] (auto& sst) { // sstables that doesn't need split will have compaction bypassed. return sstable_needs_split(sst) ? sst->bytes_on_disk() : size_t(0); }), int64_t(0), std::plus{}); auto ret = cm.perform_split_compaction(*compaction_group, compaction::compaction_type_options::split{classifier}, tasks::task_info{}).get(); BOOST_REQUIRE_EQUAL(ret->start_size, expected_compaction_size); BOOST_REQUIRE(compaction_group->main_sstable_set().get()->size() == expected_output); compaction_group->main_sstable_set().get()->for_each_sstable([&] (const sstables::shared_sstable& sst) { BOOST_REQUIRE(!sstable_needs_split(sst)); validate(sst); }); compaction_group->stop(t).get(); }; // sstable that needs split case will generate 2 sstables, one for left, another for right. { auto input = sstable_that_needs_split(s, sst_factory); std::unordered_set expected_ids { 0, 1 }; run_test({ input }, 2, [&] (const sstables::shared_sstable& sst) { BOOST_REQUIRE(expected_ids.erase(classifier(sst->get_first_decorated_key().token())) == 1); }); BOOST_REQUIRE(expected_ids.empty()); } // sstable that doesn't need split won't actually be compacted { auto input = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; }); run_test({ input }, 1, [&] (const sstables::shared_sstable& sst) { BOOST_REQUIRE(sst->generation() == input->generation()); BOOST_REQUIRE_EQUAL(0, classifier(sst->get_first_decorated_key().token())); }); } // combination of both cases { auto input1 = sstable_that_needs_split(s, sst_factory); auto input2 = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; }); bool found_input2 = false; run_test({ input1, input2 }, 3, [&] (const sstables::shared_sstable& sst) { found_input2 |= sst->generation() == input2->generation(); }); BOOST_REQUIRE(found_input2); } }); } static mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, reader_permit permit) { return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice()); } SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) { return test_env::do_with_async([] (test_env& env) { auto builder = schema_builder("tests", "compactions_dont_cross_group_boundary") .with_column("id", utf8_type, column_kind::partition_key) .with_column("cl", int32_type, column_kind::clustering_key) .with_column("value", int32_type); auto s = builder.build(); auto t = env.make_table_for_tests(s); auto close_table = deferred_stop(t); t->start(); // Disable auto compaction to allow us to trigger compaction manually later. t->disable_auto_compaction().get(); auto is_unrepaired = [] (dht::token t) { return t.raw() % 3 == 0; }; auto is_repairing = [] (dht::token t) { return t.raw() % 3 == 1; }; auto is_repaired = [] (dht::token t) { return t.raw() % 3 == 2; }; auto sst_factory = env.make_sst_factory(s); auto generate_sstables = [&] (std::function filter) { for (int i = 0; i < 4; i++) { t->add_sstable_and_update_cache(generate_sstable(s, sst_factory, filter)).get(); } }; generate_sstables(is_unrepaired); generate_sstables(is_repairing); generate_sstables(is_repaired); auto repair_token_classifier = [&] (dht::token t) -> replica::repair_sstable_classification { if (is_unrepaired(t)) { return replica::repair_sstable_classification::unrepaired; } else if (is_repairing(t)) { return replica::repair_sstable_classification::repairing; } return replica::repair_sstable_classification::repaired; }; auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) -> replica::repair_sstable_classification { return repair_token_classifier(sst->get_first_decorated_key().token()); }; t.set_repair_sstable_classifier(repair_sstable_classifier); for (int i = 0; i < 4; i++) { t->compact_all_sstables({}).get(); } auto validate_sstable = [&] (const sstables::shared_sstable& sst) { auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive. auto close_reader = deferred_close(reader); auto expected_classification = repair_sstable_classifier(sst, 0); while (auto m = read_mutation_from_mutation_reader(reader).get()) { BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification); } }; auto all_sstables = t->get_sstables(); for (auto& sst : *all_sstables) { validate_sstable(sst); } }); }