Merge 'Put table_for_tests on a diet' from Pavel Emelyanov

The object in question is used to facilitate creation of table objects for compaction tests. Currently the table_for_test carries a bunch of auxiliary objects that are needed for table creation, such as stats of all sorts and table state. However, there's also some "infrastructure" stuff onboard namely:

- reader concurrency semaphore
- cache tracker
- task manager
- compaction manager

And those four are excessive because all the tests in question run inside the sstables::test_env that has most of it.

This PR removes the mentioned objects from table_for_tests and re-uses those from test_env. Also, while at it, it also removes the table::config object from table_for_tests so that it looks more like core code that creates table does.

Closes scylladb/scylladb#15889

* github.com:scylladb/scylladb:
  table_for_tests: Use test_env's compaction manager
  sstables::test_env: Carry compaction manager on board
  table_for_tests: Stop table on stop
  table_for_tests: Get compaction manager from table
  table_for_tests: Ditch on-board concurrency semaphore
  table_for_tests: Require config argument to make table
  table_for_tests: Create table config locally
  table_for_tests: Get concurrency semaphore from table
  table_for_tests: Get table directory from table itself
  table_for_tests: Reuse cache tracker from sstables manager
  table_for_tests: Remove unused constructor
  tests: Split the compaction backlog test case
  sstable_test_env: Coroutinize and move to .cc test_env::stop()
This commit is contained in:
Avi Kivity
2023-10-31 18:03:07 +02:00
4 changed files with 77 additions and 58 deletions

View File

@@ -114,7 +114,7 @@ static future<compaction_result>
compact_sstables(sstables::compaction_descriptor descriptor, table_for_tests t,
std::function<shared_sstable()> creator, sstables::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op(),
can_purge_tombstones can_purge = can_purge_tombstones::yes) {
return compact_sstables(t.get_compaction_manager(), std::move(descriptor), t.as_table_state(), std::move(creator), std::move(replacer), can_purge);
return compact_sstables(t->get_compaction_manager(), std::move(descriptor), t.as_table_state(), std::move(creator), std::move(replacer), can_purge);
}
class strategy_control_for_test : public strategy_control {
@@ -164,7 +164,7 @@ SEASTAR_TEST_CASE(compaction_manager_basic_test) {
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type);
auto cf = env.make_table_for_tests(s);
auto& cm = cf.get_compaction_manager();
auto& cm = cf->get_compaction_manager();
auto close_cf = deferred_stop(cf);
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
auto sst_gen = cf.make_sst_factory();
@@ -2123,7 +2123,7 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
table.get_compaction_manager().perform_sstable_scrub(ts, opts).get();
table->get_compaction_manager().perform_sstable_scrub(ts, opts).get();
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
@@ -2284,7 +2284,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
auto table = env.make_table_for_tests(schema);
auto close_cf = deferred_stop(table);
table->start();
auto& compaction_manager = table.get_compaction_manager();
auto& compaction_manager = table->get_compaction_manager();
table->add_sstable_and_update_cache(sst).get();
@@ -2375,7 +2375,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
auto table = env.make_table_for_tests(schema);
auto close_cf = deferred_stop(table);
table->start();
auto& compaction_manager = table.get_compaction_manager();
auto& compaction_manager = table->get_compaction_manager();
table->add_sstable_and_update_cache(sst).get();
@@ -2481,7 +2481,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
auto table = env.make_table_for_tests(schema);
auto close_cf = deferred_stop(table);
table->start();
auto& compaction_manager = table.get_compaction_manager();
auto& compaction_manager = table->get_compaction_manager();
table->add_sstable_and_update_cache(sst).get();
@@ -2812,7 +2812,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
cf->start();
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
auto compact = [&, s] (std::vector<shared_sstable> all, auto replacer) -> std::vector<shared_sstable> {
return compact_sstables(cf.get_compaction_manager(), sstables::compaction_descriptor(std::move(all), 1, 0), cf.as_table_state(), sst_gen, replacer).get0().new_sstables;
return compact_sstables(cf->get_compaction_manager(), sstables::compaction_descriptor(std::move(all), 1, 0), cf.as_table_state(), sst_gen, replacer).get0().new_sstables;
};
auto make_insert = [&] (const dht::decorated_key& key) {
mutation m(s, key);
@@ -2835,7 +2835,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
sstables.insert(new_sst);
}
column_family_test(cf).rebuild_sstable_list(cf.as_table_state(), new_sstables, old_sstables).get();
compaction_manager_test(cf.get_compaction_manager()).propagate_replacement(cf.as_table_state(), old_sstables, new_sstables);
compaction_manager_test(cf->get_compaction_manager()).propagate_replacement(cf.as_table_state(), old_sstables, new_sstables);
};
auto do_incremental_replace = [&] (auto old_sstables, auto new_sstables, auto& expected_sst, auto& closed_sstables_tracker) {
@@ -3008,7 +3008,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy
BOOST_REQUIRE(ret.new_sstables.size() == 1);
}
// triggers code that iterates through registered compactions.
cf._data->cm.backlog();
cf->get_compaction_manager().backlog();
cf.as_table_state().get_backlog_tracker().backlog();
});
}
@@ -3044,7 +3044,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
// register partial sstable run
auto cm_test = compaction_manager_test(cf.get_compaction_manager());
auto cm_test = compaction_manager_test(cf->get_compaction_manager());
cm_test.run(partial_sstable_run_identifier, cf.as_table_state(), [&cf] (sstables::compaction_data&) {
return cf->compact_all_sstables();
}).get();
@@ -3383,7 +3383,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
.build();
auto cf = env.make_table_for_tests(s);
auto& cm = cf.get_compaction_manager();
auto& cm = cf->get_compaction_manager();
auto close_cf = deferred_stop(cf);
cf->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered);
@@ -4453,8 +4453,8 @@ SEASTAR_TEST_CASE(test_major_does_not_miss_data_in_memtable) {
});
}
SEASTAR_TEST_CASE(simple_backlog_controller_test) {
auto run_controller_test = [] (sstables::compaction_strategy_type compaction_strategy_type, test_env& env) {
future<> run_controller_test(sstables::compaction_strategy_type compaction_strategy_type) {
return test_env::do_with_async([compaction_strategy_type] (test_env& env) {
/////////////
// settings
static constexpr float disk_memory_ratio = 78.125; /* AWS I3en is ~78.125 */
@@ -4564,15 +4564,21 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
auto max_expected = compaction_strategy_type == sstables::compaction_strategy_type::leveled ? 0.4f : 0.0f;
BOOST_REQUIRE(r.normalized_backlog <= max_expected);
}
};
return test_env::do_with_async([run_controller_test] (test_env& env) {
run_controller_test(sstables::compaction_strategy_type::size_tiered, env);
run_controller_test(sstables::compaction_strategy_type::time_window, env);
run_controller_test(sstables::compaction_strategy_type::leveled, env);
});
}
SEASTAR_TEST_CASE(simple_backlog_controller_test_size_tiered) {
return run_controller_test(sstables::compaction_strategy_type::size_tiered);
}
SEASTAR_TEST_CASE(simple_backlog_controller_test_time_window) {
return run_controller_test(sstables::compaction_strategy_type::time_window);
}
SEASTAR_TEST_CASE(simple_backlog_controller_test_leveled) {
return run_controller_test(sstables::compaction_strategy_type::leveled);
}
SEASTAR_TEST_CASE(test_compaction_strategy_cleanup_method) {
return test_env::do_with_async([] (test_env& env) {
constexpr size_t all_files = 64;

View File

@@ -20,6 +20,7 @@
#include "sstables/version.hh"
#include "sstables/sstable_directory.hh"
#include "replica/database.hh"
#include "compaction/compaction_manager.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/test_services.hh"
@@ -44,6 +45,18 @@ public:
}
};
class test_env_compaction_manager {
tasks::task_manager _tm;
compaction_manager _cm;
public:
test_env_compaction_manager()
: _cm(_tm, compaction_manager::for_testing_tag{})
{}
compaction_manager& get_compaction_manager() { return _cm; }
};
struct test_env_config {
db::large_data_handler* large_data_handler = nullptr;
data_dictionary::storage_options storage; // will be local by default
@@ -61,6 +74,7 @@ class test_env {
gms::feature_service feature_service;
db::nop_large_data_handler nop_ld_handler;
test_env_sstables_manager mgr;
std::unique_ptr<test_env_compaction_manager> cmgr;
reader_concurrency_semaphore semaphore;
sstables::sstable_generation_generator gen{0};
sstables::uuid_identifiers use_uuid;
@@ -75,15 +89,13 @@ class test_env {
}
};
std::unique_ptr<impl> _impl;
void maybe_start_compaction_manager();
public:
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr) : _impl(std::make_unique<impl>(std::move(cfg), sstm)) { }
future<> stop() {
return _impl->mgr.close().finally([this] {
return _impl->semaphore.stop();
});
}
future<> stop();
sstables::generation_type new_generation() noexcept {
return _impl->new_generation();
@@ -213,11 +225,19 @@ public:
}
table_for_tests make_table_for_tests(schema_ptr s, sstring dir) {
return table_for_tests(manager(), s, std::move(dir), _impl->storage);
maybe_start_compaction_manager();
auto cfg = make_table_config();
cfg.datadir = dir;
cfg.enable_commitlog = false;
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage);
}
table_for_tests make_table_for_tests(schema_ptr s = nullptr) {
return table_for_tests(manager(), s, tempdir().path().native(), _impl->storage);
maybe_start_compaction_manager();
auto cfg = make_table_config();
cfg.datadir = _impl->dir.path().native();
cfg.enable_commitlog = false;
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), _impl->storage);
}
};

View File

@@ -26,7 +26,6 @@ static const sstring some_keyspace("ks");
static const sstring some_column_family("cf");
table_for_tests::data::data()
: semaphore(reader_concurrency_semaphore::no_limits{}, "table_for_tests")
{ }
table_for_tests::data::~data() {}
@@ -37,13 +36,6 @@ schema_ptr table_for_tests::make_default_schema() {
.build();
}
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager)
: table_for_tests(
sstables_manager,
make_default_schema()
)
{ }
class table_for_tests::table_state : public compaction::table_state {
table_for_tests::data& _data;
sstables::sstables_manager& _sstables_manager;
@@ -95,7 +87,7 @@ public:
return _compaction_strategy_state;
}
reader_permit make_compaction_reader_permit() const override {
return _data.semaphore.make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {});
return table().compaction_concurrency_semaphore().make_tracking_only_permit(&*schema(), "table_for_tests::table_state", db::no_timeout, {});
}
sstables::sstables_manager& get_sstables_manager() noexcept override {
return _sstables_manager;
@@ -133,20 +125,15 @@ public:
}
};
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional<sstring> datadir, data_dictionary::storage_options storage)
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage)
: _data(make_lw_shared<data>())
{
cfg.cf_stats = &_data->cf_stats;
_data->s = s ? s : make_default_schema();
_data->cfg = replica::table::config{.compaction_concurrency_semaphore = &_data->semaphore};
_data->cfg.enable_disk_writes = bool(datadir);
_data->cfg.datadir = datadir.value_or(sstring());
_data->cfg.cf_stats = &_data->cf_stats;
_data->cfg.enable_commitlog = false;
_data->cm.enable();
_data->cf = make_lw_shared<replica::column_family>(_data->s, _data->cfg, make_lw_shared<replica::storage_options>(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker, nullptr);
_data->cf = make_lw_shared<replica::column_family>(_data->s, std::move(cfg), make_lw_shared<replica::storage_options>(), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr);
_data->cf->mark_ready_for_writes(nullptr);
_data->table_s = std::make_unique<table_state>(*_data, sstables_manager);
_data->cm.add(*_data->table_s);
cm.add(*_data->table_s);
_data->storage = std::move(storage);
}
@@ -156,8 +143,8 @@ compaction::table_state& table_for_tests::as_table_state() noexcept {
future<> table_for_tests::stop() {
auto data = _data;
co_await data->cm.remove(*data->table_s);
co_await when_all_succeed(data->cm.stop(), data->semaphore.stop()).discard_result();
co_await data->cf->get_compaction_manager().remove(*data->table_s);
co_await data->cf->stop();
}
void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
@@ -215,6 +202,21 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm)
}
}
void test_env::maybe_start_compaction_manager() {
if (!_impl->cmgr) {
_impl->cmgr = std::make_unique<test_env_compaction_manager>();
_impl->cmgr->get_compaction_manager().enable();
}
}
future<> test_env::stop() {
if (_impl->cmgr) {
co_await _impl->cmgr->get_compaction_manager().stop();
}
co_await _impl->mgr.close();
co_await _impl->semaphore.stop();
}
future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg) {
if (!cfg.storage.is_local_type()) {
struct test_env_with_cql {

View File

@@ -38,13 +38,8 @@ struct table_for_tests {
class table_state;
struct data {
schema_ptr s;
reader_concurrency_semaphore semaphore;
cache_tracker tracker;
replica::cf_stats cf_stats{0};
replica::column_family::config cfg;
cell_locker_stats cl_stats;
tasks::task_manager tm;
compaction_manager cm{tm, compaction_manager::for_testing_tag{}};
lw_shared_ptr<replica::column_family> cf;
std::unique_ptr<table_state> table_s;
data_dictionary::storage_options storage;
@@ -55,9 +50,7 @@ struct table_for_tests {
static schema_ptr make_default_schema();
explicit table_for_tests(sstables::sstables_manager& sstables_manager);
explicit table_for_tests(sstables::sstables_manager& sstables_manager, schema_ptr s, std::optional<sstring> datadir = {}, data_dictionary::storage_options storage = {});
explicit table_for_tests(sstables::sstables_manager& sstables_manager, compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage = {});
schema_ptr schema() { return _data->s; }
@@ -68,8 +61,6 @@ struct table_for_tests {
replica::column_family& operator*() { return *_data->cf; }
replica::column_family* operator->() { return _data->cf.get(); }
compaction_manager& get_compaction_manager() noexcept { return _data->cm; }
compaction::table_state& as_table_state() noexcept;
future<> stop();
@@ -77,13 +68,13 @@ struct table_for_tests {
sstables::shared_sstable make_sstable() {
auto& table = *_data->cf;
auto& sstables_manager = table.get_sstables_manager();
return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table());
return sstables_manager.make_sstable(_data->s, table.dir(), _data->storage, table.calculate_generation_for_new_table());
}
sstables::shared_sstable make_sstable(sstables::sstable_version_types version) {
auto& table = *_data->cf;
auto& sstables_manager = table.get_sstables_manager();
return sstables_manager.make_sstable(_data->s, _data->cfg.datadir, _data->storage, table.calculate_generation_for_new_table(), sstables::sstable_state::normal, version);
return sstables_manager.make_sstable(_data->s, table.dir(), _data->storage, table.calculate_generation_for_new_table(), sstables::sstable_state::normal, version);
}
std::function<sstables::shared_sstable()> make_sst_factory() {