Merge 'Drop compaction_manager_for_testing' from Pavel Emelyanov
There's such a wrapper class in test_services. After #15889 this class resembles the test_env_compaction_manager and can be replaced with it. However, two users of the former wrapper class need it just to construct table object, and the way they do it is re-implementation of table_for_tests class. This PR patches the test cases to make use of table_for_tests and removes the compaction_manager_for_testing that becomes unused after it. Closes scylladb/scylladb#15909 * github.com:scylladb/scylladb: test_services: Ditch compaction_manager_for_testing test/sstable_compaction_test: Make use of make_table_for_tests() test/sstable_3_x_test: Make use of make_table_for_tests() table_for_tests: Add const operator-> overload sstable_test_env: Add test_env_compaction_manager() getter sstable_test_env: Tune up maybe_start_compaction_manager() method test/sstable_compaction_test: Remove unused tracker allocation
This commit is contained in:
@@ -3013,12 +3013,9 @@ static std::vector<sstables::shared_sstable> open_sstables(test_env& env, schema
|
||||
// Must be called in a seastar thread.
|
||||
static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_ptr s,
|
||||
sstring table_name, std::vector<sstables::generation_type::int_t> gen_values) {
|
||||
env.maybe_start_compaction_manager(false);
|
||||
auto cf = env.make_table_for_tests(s);
|
||||
auto generations = generations_from_values(gen_values);
|
||||
auto cm = make_lw_shared<compaction_manager_for_testing>(false);
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto cf = make_lw_shared<replica::column_family>(s, env.make_table_config(), make_lw_shared<replica::storage_options>(), **cm, env.manager(), *cl_stats, *tracker, nullptr);
|
||||
cf->mark_ready_for_writes(nullptr);
|
||||
lw_shared_ptr<replica::memtable> mt = make_lw_shared<replica::memtable>(s);
|
||||
|
||||
auto sstables = open_sstables(env, s, format("test/resource/sstables/3.x/uncompressed/{}", table_name), generations);
|
||||
|
||||
@@ -2806,7 +2806,6 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
|
||||
|
||||
auto sst_gen = env.make_sst_factory(s);
|
||||
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
auto cf = env.make_table_for_tests(s);
|
||||
auto close_cf = deferred_stop(cf);
|
||||
cf->start();
|
||||
@@ -4112,11 +4111,6 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
return builder.build();
|
||||
};
|
||||
|
||||
auto cm = compaction_manager_for_testing();
|
||||
|
||||
auto cl_stats = make_lw_shared<cell_locker_stats>();
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
@@ -4134,7 +4128,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
|
||||
constexpr size_t num_tables = 10;
|
||||
std::vector<schema_ptr> schemas;
|
||||
std::vector<lw_shared_ptr<replica::column_family>> tables;
|
||||
std::vector<table_for_tests> tables;
|
||||
auto stop_tables = defer([&tables] {
|
||||
for (auto& t : tables) {
|
||||
t->stop().get();
|
||||
@@ -4152,9 +4146,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
|
||||
auto cf = make_lw_shared<replica::column_family>(s, cfg, make_lw_shared<replica::storage_options>(), *cm, env.manager(), *cl_stats, *tracker, nullptr);
|
||||
cf->start();
|
||||
cf->mark_ready_for_writes(nullptr);
|
||||
auto cf = env.make_table_for_tests(s);
|
||||
tables.push_back(cf);
|
||||
}
|
||||
|
||||
@@ -4186,24 +4178,25 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
|
||||
}
|
||||
|
||||
size_t max_ongoing_compaction = 0;
|
||||
auto& cm = env.test_compaction_manager().get_compaction_manager();
|
||||
|
||||
// wait for submitted jobs to finish.
|
||||
auto end = [cm, &tables, expected_after] {
|
||||
return cm->get_stats().pending_tasks == 0 && cm->get_stats().active_tasks == 0
|
||||
auto end = [&cm, &tables, expected_after] {
|
||||
return cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0
|
||||
&& boost::algorithm::all_of(tables, [expected_after] (auto& t) { return t->sstables_count() == expected_after; });
|
||||
};
|
||||
while (!end()) {
|
||||
if (!cm->get_stats().pending_tasks && !cm->get_stats().active_tasks) {
|
||||
if (!cm.get_stats().pending_tasks && !cm.get_stats().active_tasks) {
|
||||
for (auto& t : tables) {
|
||||
if (t->sstables_count()) {
|
||||
t->trigger_compaction();
|
||||
}
|
||||
}
|
||||
}
|
||||
max_ongoing_compaction = std::max(cm->get_stats().active_tasks, max_ongoing_compaction);
|
||||
max_ongoing_compaction = std::max(cm.get_stats().active_tasks, max_ongoing_compaction);
|
||||
yield().get();
|
||||
}
|
||||
BOOST_REQUIRE(cm->get_stats().errors == 0);
|
||||
BOOST_REQUIRE(cm.get_stats().errors == 0);
|
||||
return max_ongoing_compaction;
|
||||
};
|
||||
|
||||
|
||||
@@ -89,10 +89,10 @@ class test_env {
|
||||
}
|
||||
};
|
||||
std::unique_ptr<impl> _impl;
|
||||
|
||||
void maybe_start_compaction_manager();
|
||||
public:
|
||||
|
||||
void maybe_start_compaction_manager(bool enable = true);
|
||||
|
||||
explicit test_env(test_env_config cfg = {}, sstables::storage_manager* sstm = nullptr) : _impl(std::make_unique<impl>(std::move(cfg), sstm)) { }
|
||||
|
||||
future<> stop();
|
||||
@@ -179,6 +179,7 @@ public:
|
||||
}
|
||||
|
||||
test_env_sstables_manager& manager() { return _impl->mgr; }
|
||||
test_env_compaction_manager& test_compaction_manager() { return *_impl->cmgr; }
|
||||
reader_concurrency_semaphore& semaphore() { return _impl->semaphore; }
|
||||
db::config& db_config() { return *_impl->db_config; }
|
||||
tmpdir& tempdir() noexcept { return _impl->dir; }
|
||||
|
||||
@@ -145,22 +145,6 @@ future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, ss
|
||||
throw sst_not_found(dir, generation);
|
||||
}
|
||||
|
||||
compaction_manager_for_testing::wrapped_compaction_manager::wrapped_compaction_manager(bool enabled)
|
||||
: cm(tm, compaction_manager::for_testing_tag{})
|
||||
{
|
||||
if (enabled) {
|
||||
cm.enable();
|
||||
}
|
||||
}
|
||||
|
||||
// Must run in a seastar thread
|
||||
compaction_manager_for_testing::wrapped_compaction_manager::~wrapped_compaction_manager() {
|
||||
if (!tm.abort_source().abort_requested()) {
|
||||
tm.abort_source().request_abort();
|
||||
}
|
||||
cm.stop().get();
|
||||
}
|
||||
|
||||
class compaction_manager_test_task : public compaction::compaction_task_executor {
|
||||
sstables::run_id _run_id;
|
||||
noncopyable_function<future<> (sstables::compaction_data&)> _job;
|
||||
|
||||
@@ -239,35 +239,6 @@ future<> for_each_sstable_version(AsyncAction action) {
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
// Must be used in a seastar thread
|
||||
class compaction_manager_for_testing {
|
||||
struct wrapped_compaction_manager {
|
||||
tasks::task_manager tm;
|
||||
compaction_manager cm;
|
||||
explicit wrapped_compaction_manager(bool enabled);
|
||||
// Must run in a seastar thread
|
||||
~wrapped_compaction_manager();
|
||||
};
|
||||
|
||||
lw_shared_ptr<wrapped_compaction_manager> _wcm;
|
||||
public:
|
||||
explicit compaction_manager_for_testing(bool enabled = true) : _wcm(make_lw_shared<wrapped_compaction_manager>(enabled)) {}
|
||||
|
||||
compaction_manager& operator*() noexcept {
|
||||
return _wcm->cm;
|
||||
}
|
||||
const compaction_manager& operator*() const noexcept {
|
||||
return _wcm->cm;
|
||||
}
|
||||
|
||||
compaction_manager* operator->() noexcept {
|
||||
return &_wcm->cm;
|
||||
}
|
||||
const compaction_manager* operator->() const noexcept {
|
||||
return &_wcm->cm;
|
||||
}
|
||||
};
|
||||
|
||||
class compaction_manager_test {
|
||||
compaction_manager& _cm;
|
||||
public:
|
||||
|
||||
@@ -202,10 +202,12 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm)
|
||||
}
|
||||
}
|
||||
|
||||
void test_env::maybe_start_compaction_manager() {
|
||||
void test_env::maybe_start_compaction_manager(bool enable) {
|
||||
if (!_impl->cmgr) {
|
||||
_impl->cmgr = std::make_unique<test_env_compaction_manager>();
|
||||
_impl->cmgr->get_compaction_manager().enable();
|
||||
if (enable) {
|
||||
_impl->cmgr->get_compaction_manager().enable();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ struct table_for_tests {
|
||||
|
||||
replica::column_family& operator*() { return *_data->cf; }
|
||||
replica::column_family* operator->() { return _data->cf.get(); }
|
||||
const replica::column_family* operator->() const { return _data->cf.get(); }
|
||||
|
||||
compaction::table_state& as_table_state() noexcept;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user