diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index b7e33e5f23..dd7d21397f 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -114,7 +114,7 @@ static future compact_sstables(sstables::compaction_descriptor descriptor, table_for_tests t, std::function creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op(), can_purge_tombstones can_purge = can_purge_tombstones::yes) { - return compact_sstables(t.get_compaction_manager(), std::move(descriptor), t.as_table_state(), std::move(creator), std::move(replacer), can_purge); + return compact_sstables(t->get_compaction_manager(), std::move(descriptor), t.as_table_state(), std::move(creator), std::move(replacer), can_purge); } class strategy_control_for_test : public strategy_control { @@ -164,7 +164,7 @@ SEASTAR_TEST_CASE(compaction_manager_basic_test) { {{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type); auto cf = env.make_table_for_tests(s); - auto& cm = cf.get_compaction_manager(); + auto& cm = cf->get_compaction_manager(); auto close_cf = deferred_stop(cf); cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); auto sst_gen = cf.make_sst_factory(); @@ -2123,7 +2123,7 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { sstables::compaction_type_options::scrub opts = { .operation_mode = sstables::compaction_type_options::scrub::mode::validate, }; - table.get_compaction_manager().perform_sstable_scrub(ts, opts).get(); + table->get_compaction_manager().perform_sstable_scrub(ts, opts).get(); BOOST_REQUIRE(sst->is_quarantined()); BOOST_REQUIRE(in_strategy_sstables(ts).empty()); @@ -2284,7 +2284,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { auto table = env.make_table_for_tests(schema); auto close_cf = deferred_stop(table); table->start(); - auto& compaction_manager = table.get_compaction_manager(); + auto& compaction_manager = table->get_compaction_manager(); table->add_sstable_and_update_cache(sst).get(); @@ -2375,7 +2375,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { auto table = env.make_table_for_tests(schema); auto close_cf = deferred_stop(table); table->start(); - auto& compaction_manager = table.get_compaction_manager(); + auto& compaction_manager = table->get_compaction_manager(); table->add_sstable_and_update_cache(sst).get(); @@ -2481,7 +2481,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { auto table = env.make_table_for_tests(schema); auto close_cf = deferred_stop(table); table->start(); - auto& compaction_manager = table.get_compaction_manager(); + auto& compaction_manager = table->get_compaction_manager(); table->add_sstable_and_update_cache(sst).get(); @@ -2812,7 +2812,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) { cf->start(); cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); auto compact = [&, s] (std::vector all, auto replacer) -> std::vector { - return compact_sstables(cf.get_compaction_manager(), sstables::compaction_descriptor(std::move(all), 1, 0), cf.as_table_state(), sst_gen, replacer).get0().new_sstables; + return compact_sstables(cf->get_compaction_manager(), sstables::compaction_descriptor(std::move(all), 1, 0), cf.as_table_state(), sst_gen, replacer).get0().new_sstables; }; auto make_insert = [&] (const dht::decorated_key& key) { mutation m(s, key); @@ -2835,7 +2835,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) { sstables.insert(new_sst); } column_family_test(cf).rebuild_sstable_list(cf.as_table_state(), new_sstables, old_sstables).get(); - compaction_manager_test(cf.get_compaction_manager()).propagate_replacement(cf.as_table_state(), old_sstables, new_sstables); + compaction_manager_test(cf->get_compaction_manager()).propagate_replacement(cf.as_table_state(), old_sstables, new_sstables); }; auto do_incremental_replace = [&] (auto old_sstables, auto new_sstables, auto& expected_sst, auto& closed_sstables_tracker) { @@ -3008,7 +3008,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy BOOST_REQUIRE(ret.new_sstables.size() == 1); } // triggers code that iterates through registered compactions. - cf._data->cm.backlog(); + cf->get_compaction_manager().backlog(); cf.as_table_state().get_backlog_tracker().backlog(); }); } @@ -3044,7 +3044,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) { BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation())); // register partial sstable run - auto cm_test = compaction_manager_test(cf.get_compaction_manager()); + auto cm_test = compaction_manager_test(cf->get_compaction_manager()); cm_test.run(partial_sstable_run_identifier, cf.as_table_state(), [&cf] (sstables::compaction_data&) { return cf->compact_all_sstables(); }).get(); @@ -3383,7 +3383,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) { .build(); auto cf = env.make_table_for_tests(s); - auto& cm = cf.get_compaction_manager(); + auto& cm = cf->get_compaction_manager(); auto close_cf = deferred_stop(cf); cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); @@ -4453,8 +4453,8 @@ SEASTAR_TEST_CASE(test_major_does_not_miss_data_in_memtable) { }); } -SEASTAR_TEST_CASE(simple_backlog_controller_test) { - auto run_controller_test = [] (sstables::compaction_strategy_type compaction_strategy_type, test_env& env) { +future<> run_controller_test(sstables::compaction_strategy_type compaction_strategy_type) { + return test_env::do_with_async([compaction_strategy_type] (test_env& env) { ///////////// // settings static constexpr float disk_memory_ratio = 78.125; /* AWS I3en is ~78.125 */ @@ -4564,15 +4564,21 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) { auto max_expected = compaction_strategy_type == sstables::compaction_strategy_type::leveled ? 0.4f : 0.0f; BOOST_REQUIRE(r.normalized_backlog <= max_expected); } - }; - - return test_env::do_with_async([run_controller_test] (test_env& env) { - run_controller_test(sstables::compaction_strategy_type::size_tiered, env); - run_controller_test(sstables::compaction_strategy_type::time_window, env); - run_controller_test(sstables::compaction_strategy_type::leveled, env); }); } +SEASTAR_TEST_CASE(simple_backlog_controller_test_size_tiered) { + return run_controller_test(sstables::compaction_strategy_type::size_tiered); +} + +SEASTAR_TEST_CASE(simple_backlog_controller_test_time_window) { + return run_controller_test(sstables::compaction_strategy_type::time_window); +} + +SEASTAR_TEST_CASE(simple_backlog_controller_test_leveled) { + return run_controller_test(sstables::compaction_strategy_type::leveled); +} + SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) { return test_env::do_with_async([] (test_env& env) { constexpr size_t all_files = 64; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 923144ac76..d2f425356d 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -20,6 +20,7 @@ #include "sstables/version.hh" #include "sstables/sstable_directory.hh" #include "replica/database.hh" +#include "compaction/compaction_manager.hh" #include "test/lib/tmpdir.hh" #include "test/lib/test_services.hh" @@ -44,6 +45,18 @@ public: } }; +class test_env_compaction_manager { + tasks::task_manager _tm; + compaction_manager _cm; + +public: + test_env_compaction_manager() + : _cm(_tm, compaction_manager::for_testing_tag{}) + {} + + compaction_manager& get_compaction_manager() { return _cm; } +}; + struct test_env_config { db::large_data_handler* large_data_handler = nullptr; data_dictionary::storage_options storage; // will be local by default @@ -61,6 +74,7 @@ class test_env { gms::feature_service feature_service; db::nop_large_data_handler nop_ld_handler; test_env_sstables_manager mgr; + std::unique_ptr cmgr; reader_concurrency_semaphore semaphore; sstables::sstable_generation_generator gen{0}; sstables::uuid_identifiers use_uuid; @@ -75,15 +89,13 @@ class test_env { } }; std::unique_ptr _impl; + + void maybe_start_compaction_manager(); public: explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr) : _impl(std::make_unique(std::move(cfg), sstm)) { } - future<> stop() { - return _impl->mgr.close().finally([this] { - return _impl->semaphore.stop(); - }); - } + future<> stop(); sstables::generation_type new_generation() noexcept { return _impl->new_generation(); @@ -213,11 +225,19 @@ public: } table_for_tests make_table_for_tests(schema_ptr s, sstring dir) { - return table_for_tests(manager(), s, std::move(dir), _impl->storage); + maybe_start_compaction_manager(); + auto cfg = make_table_config(); + cfg.datadir = dir; + cfg.enable_commitlog = false; + return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage); } table_for_tests make_table_for_tests(schema_ptr s = nullptr) { - return table_for_tests(manager(), s, tempdir().path().native(), _impl->storage); + maybe_start_compaction_manager(); + auto cfg = make_table_config(); + cfg.datadir = _impl->dir.path().native(); + cfg.enable_commitlog = false; + return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage); } }; diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 9a06cfb91e..2363291cd8 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -26,7 +26,6 @@ static const sstring some_keyspace("ks"); static const sstring some_column_family("cf"); table_for_tests::data::data() - : semaphore(reader_concurrency_semaphore::no_limits{}, "table_for_tests") { } table_for_tests::data::~data() {} @@ -37,13 +36,6 @@ schema_ptr table_for_tests::make_default_schema() { .build(); } -table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager) - : table_for_tests( - sstables_manager, - make_default_schema() - ) -{ } - class table_for_tests::table_state : public compaction::table_state { table_for_tests::data& _data; sstables::sstables_manager& _sstables_manager; @@ -95,7 +87,7 @@ public: return _compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { - return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); + return table().compaction_concurrency_semaphore().make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; @@ -133,20 +125,15 @@ public: } }; -table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional datadir, data_dictionary::storage_options storage) +table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage) : _data(make_lw_shared()) { + cfg.cf_stats = &_data->cf_stats; _data->s = s ? s : make_default_schema(); - _data->cfg = replica::table::config{.compaction_concurrency_semaphore = &_data->semaphore}; - _data->cfg.enable_disk_writes = bool(datadir); - _data->cfg.datadir = datadir.value_or(sstring()); - _data->cfg.cf_stats = &_data->cf_stats; - _data->cfg.enable_commitlog = false; - _data->cm.enable(); - _data->cf = make_lw_shared(_data->s, _data->cfg, make_lw_shared(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker, nullptr); + _data->cf = make_lw_shared(_data->s, std::move(cfg), make_lw_shared(), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr); _data->cf->mark_ready_for_writes(nullptr); _data->table_s = std::make_unique(*_data, sstables_manager); - _data->cm.add(*_data->table_s); + cm.add(*_data->table_s); _data->storage = std::move(storage); } @@ -156,8 +143,8 @@ compaction::table_state& table_for_tests::as_table_state() noexcept { future<> table_for_tests::stop() { auto data = _data; - co_await data->cm.remove(*data->table_s); - co_await when_all_succeed(data->cm.stop(), data->semaphore.stop()).discard_result(); + co_await data->cf->get_compaction_manager().remove(*data->table_s); + co_await data->cf->stop(); } void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { @@ -215,6 +202,21 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm) } } +void test_env::maybe_start_compaction_manager() { + if (!_impl->cmgr) { + _impl->cmgr = std::make_unique(); + _impl->cmgr->get_compaction_manager().enable(); + } +} + +future<> test_env::stop() { + if (_impl->cmgr) { + co_await _impl->cmgr->get_compaction_manager().stop(); + } + co_await _impl->mgr.close(); + co_await _impl->semaphore.stop(); +} + future<> test_env::do_with_async(noncopyable_function func, test_env_config cfg) { if (!cfg.storage.is_local_type()) { struct test_env_with_cql { diff --git a/test/lib/test_services.hh b/test/lib/test_services.hh index 3efeea2824..d88bbdbbca 100644 --- a/test/lib/test_services.hh +++ b/test/lib/test_services.hh @@ -38,13 +38,8 @@ struct table_for_tests { class table_state; struct data { schema_ptr s; - reader_concurrency_semaphore semaphore; - cache_tracker tracker; replica::cf_stats cf_stats{0}; - replica::column_family::config cfg; cell_locker_stats cl_stats; - tasks::task_manager tm; - compaction_manager cm{tm, compaction_manager::for_testing_tag{}}; lw_shared_ptr cf; std::unique_ptr table_s; data_dictionary::storage_options storage; @@ -55,9 +50,7 @@ struct table_for_tests { static schema_ptr make_default_schema(); - explicit table_for_tests(sstables::sstables_manager& sstables_manager); - - explicit table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional datadir = {}, data_dictionary::storage_options storage = {}); + explicit table_for_tests(sstables::sstables_manager& sstables_manager, compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage = {}); schema_ptr schema() { return _data->s; } @@ -68,8 +61,6 @@ struct table_for_tests { replica::column_family& operator*() { return *_data->cf; } replica::column_family* operator->() { return _data->cf.get(); } - compaction_manager& get_compaction_manager() noexcept { return _data->cm; } - compaction::table_state& as_table_state() noexcept; future<> stop(); @@ -77,13 +68,13 @@ struct table_for_tests { sstables::shared_sstable make_sstable() { auto& table = *_data->cf; auto& sstables_manager = table.get_sstables_manager(); - return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table()); + return sstables_manager.make_sstable(_data->s, table.dir(), _data->storage, table.calculate_generation_for_new_table()); } sstables::shared_sstable make_sstable(sstables::sstable_version_types version) { auto& table = *_data->cf; auto& sstables_manager = table.get_sstables_manager(); - return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table(), sstables::sstable_state::normal, version); + return sstables_manager.make_sstable(_data->s, table.dir(), _data->storage, table.calculate_generation_for_new_table(), sstables::sstable_state::normal, version); } std::function make_sst_factory() {