From 0643faafd773c49ff80fedc8fdd844a927b8da8d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 12:36:49 -0300 Subject: [PATCH 1/7] compaction_manager: extract "stop tasks" from stop_ongoing_compactions() into new function Procedure will be reused to stop a list of tasks Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 18 +++++++++++------- compaction/compaction_manager.hh | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f456433530..6f40c3095e 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -485,13 +485,8 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf _postponed.insert(cf); } -future<> compaction_manager::stop_ongoing_compactions(sstring reason) { - cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason); - - // Wait for each task handler to stop. Copy list because task remove itself - // from the list when done. - auto tasks = _tasks; - return do_with(std::move(tasks), [this, reason] (std::list>& tasks) { +future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) { + return do_with(std::move(tasks), [this, reason] (std::vector>& tasks) { return parallel_for_each(tasks, [this, reason] (auto& task) { return this->task_stop(task, reason).then_wrapped([](future <> f) { try { @@ -507,6 +502,15 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) { }); } +future<> compaction_manager::stop_ongoing_compactions(sstring reason) { + cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason); + + // Wait for each task handler to stop. Copy list because task remove itself + // from the list when done. + auto tasks = boost::copy_range>>(_tasks); + return stop_tasks(std::move(tasks), std::move(reason)); +} + future<> compaction_manager::drain() { _state = state::disabled; return stop_ongoing_compactions("drain"); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 0c00abe356..6054a54b64 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -134,6 +134,7 @@ private: static constexpr std::chrono::seconds periodic_compaction_submission_interval() { return std::chrono::seconds(3600); } private: future<> task_stop(lw_shared_ptr task, sstring reason); + future<> stop_tasks(std::vector> tasks, sstring reason); // Return the largest fan-in of currently running compactions unsigned current_compaction_fan_in_threshold() const; From 2f293fa09c7e7f1736b97733a0204587253fd01a Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 12:52:09 -0300 Subject: [PATCH 2/7] compaction_manager: prevent compaction from being postponed when stopping tasks stop_tasks() must make sure that no ongoing task will postpone compaction when asked to stop. Therefore, let's set all tasks as stopping before any deferring point, such that no task will postpone compaction for a table which is being stopped. compaction_manager::remove() already handles this race with the same method, and given that remove() will later switch to stop_tasks(), let's do the same in stop_tasks(). Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 6f40c3095e..2aac9951c1 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -486,6 +486,11 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf } future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) { + // To prevent compaction from being postponed while tasks are being stopped, let's set all + // tasks as stopping before the deferring point below. + for (auto& t : tasks) { + t->stopping = true; + } return do_with(std::move(tasks), [this, reason] (std::vector>& tasks) { return parallel_for_each(tasks, [this, reason] (auto& task) { return this->task_stop(task, reason).then_wrapped([](future <> f) { From c0047bb9c04c4901f464a3235a037b83066f0879 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 12:59:07 -0300 Subject: [PATCH 3/7] compaction_manager: introduce stop_ongoing_compactions() for a table New variant of stop_ongoing_compactions() which will stop all compactions for a given table. Will be reused in both remove() and by run_with_compaction_disabled() which soon be moved into the compaction_manager. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 8 ++++++++ compaction/compaction_manager.hh | 1 + 2 files changed, 9 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 2aac9951c1..f8afc910e6 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -516,6 +516,14 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) { return stop_tasks(std::move(tasks), std::move(reason)); } +future<> compaction_manager::stop_ongoing_compactions(sstring reason, column_family* cf) { + auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([cf] (auto& task) { + return task->compacting_cf == cf; + })); + cmlog.info("Stopping {} ongoing compactions for table {}.{} due to {}", tasks.size(), cf->schema()->ks_name(), cf->schema()->cf_name(), reason); + return stop_tasks(std::move(tasks), std::move(reason)); +} + future<> compaction_manager::drain() { _state = state::disabled; return stop_ongoing_compactions("drain"); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 6054a54b64..ffc523358b 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -186,6 +186,7 @@ private: future<> rewrite_sstables(column_family* cf, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); future<> stop_ongoing_compactions(sstring reason); + future<> stop_ongoing_compactions(sstring reason, column_family* cf); optimized_optional _early_abort_subscription; public: compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as); From 7876bd4331662ae7e39ad83c4ed4ae7f0d3e65e5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 13:12:16 -0300 Subject: [PATCH 4/7] compaction_manager: wire stop_ongoing_compactions() into remove() Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f8afc910e6..0209764ddb 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -952,19 +952,11 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: future<> compaction_manager::remove(column_family* cf) { // We need to guarantee that a task being stopped will not retry to compact // a column family being removed. - auto tasks_to_stop = make_lw_shared>>(); - for (auto& task : _tasks) { - if (task->compacting_cf == cf) { - tasks_to_stop->push_back(task); - task->stopping = true; - } - } + // The requirement above is provided by stop_ongoing_compactions(). _postponed.erase(cf); // Wait for the termination of an ongoing compaction on cf, if any. - return parallel_for_each(*tasks_to_stop, [this, cf] (auto& task) { - return this->task_stop(task, "column family removal"); - }).then([this, cf, tasks_to_stop] { + return stop_ongoing_compactions("column family removal", cf).then([this, cf] { #ifdef DEBUG assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); #endif From aa9b1c1fa357ac8475194b7ab29894f77a29f476 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 18 Oct 2021 14:20:07 -0300 Subject: [PATCH 5/7] compaction_manager: add struct for per table compaction state This will make it easier to pack all state data for a given table T. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 14 +++++++------- compaction/compaction_manager.hh | 8 ++++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 0209764ddb..f1c585264d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -271,7 +271,7 @@ future<> compaction_manager::perform_major_compaction(column_family* cf) { // it cannot be the other way around, or minor compaction for this column family would be // prevented while an ongoing major compaction doesn't release the semaphore. task->compaction_done = with_semaphore(_major_compaction_sem, 1, [this, task, cf] { - return with_lock(_compaction_locks[cf].for_write(), [this, task, cf] { + return with_lock(_compaction_state[cf].lock.for_write(), [this, task, cf] { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -325,7 +325,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, &job = *job_ptr] () mutable { // take read lock for cf, so major compaction and resharding can't proceed in parallel. - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, &job] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf, &job] () mutable { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -451,7 +451,7 @@ void compaction_manager::disable() { std::function compaction_manager::compaction_submission_callback() { return [this] () mutable { - for (auto& e: _compaction_locks) { + for (auto& e: _compaction_state) { submit(e.first); } }; @@ -617,7 +617,7 @@ void compaction_manager::submit(column_family* cf) { _stats.pending_tasks--; return make_ready_future(stop_iteration::yes); } - return with_lock(_compaction_locks[cf].for_read(), [this, task] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task] () mutable { return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable { column_family& cf = *task->compacting_cf; sstables::compaction_strategy cs = cf.get_compaction_strategy(); @@ -685,7 +685,7 @@ void compaction_manager::submit_offstrategy(column_family* cf) { return make_ready_future(stop_iteration::yes); } return with_semaphore(_custom_job_sem, 1, [this, task, cf] () mutable { - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf] () mutable { _stats.pending_tasks--; if (!can_proceed(task)) { return make_ready_future(stop_iteration::yes); @@ -765,7 +765,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { // Take write lock for cf to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard. - return with_lock(_compaction_locks[&cf].for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { + return with_lock(_compaction_state[&cf].lock.for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { _stats.pending_tasks--; _stats.active_tasks++; task->setup_new_compaction(); @@ -960,7 +960,7 @@ future<> compaction_manager::remove(column_family* cf) { #ifdef DEBUG assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); #endif - _compaction_locks.erase(cf); + _compaction_state.erase(cf); }); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index ffc523358b..585510c3b4 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -121,8 +121,12 @@ private: // Purpose is to serialize major compaction across all column families, so as to // reduce disk space requirement. semaphore _major_compaction_sem{1}; - // Prevents column family from running major and minor compaction at same time. - std::unordered_map _compaction_locks; + + struct compaction_state { + // Prevents table from running major and minor compaction at the same time. + rwlock lock; + }; + std::unordered_map _compaction_state; semaphore _custom_job_sem{1}; seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}}; From 52feb4146839522975451a44f94c8e1cddc8bb38 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 14:16:42 -0300 Subject: [PATCH 6/7] compaction_manager: switch to coroutine in compaction_manager::remove() Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f1c585264d..69909fbc83 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -956,12 +956,12 @@ future<> compaction_manager::remove(column_family* cf) { _postponed.erase(cf); // Wait for the termination of an ongoing compaction on cf, if any. - return stop_ongoing_compactions("column family removal", cf).then([this, cf] { + co_await stop_ongoing_compactions("column family removal", cf); #ifdef DEBUG - assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); + assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); #endif - _compaction_state.erase(cf); - }); + _compaction_state.erase(cf); + co_return; } const std::vector compaction_manager::get_compactions() const { From 33b39a2bfc5f7f514de42166a7b90045258eeda7 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 4 Nov 2021 14:39:39 -0300 Subject: [PATCH 7/7] compaction: move run_with_compaction_disabled() from table into compaction_manager That's intended to fix a bad layer violation as table was given the responsibility of disabling compaction for a given table T, but that logic clearly belongs to compaction_manager instead. Additionally, gate will be used instead of counter, as former provides manager with a way to synchronize with functions running under run_with_compaction_disabled. so remove() can wait for their termination. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 43 +++++++++++++++++++++++++++++--- compaction/compaction_manager.hh | 11 ++++++++ database.cc | 4 +-- database.hh | 3 --- table.cc | 16 ++---------- test/perf/perf_fast_forward.cc | 8 +++--- 6 files changed, 58 insertions(+), 27 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 69909fbc83..1ab2672d2b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -353,6 +353,37 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact return task->compaction_done.get_future().then([task] {}); } +bool compaction_manager::compaction_state::compaction_disabled() const { + return with_compaction_disabled_gate.get_count() > 0; +} + +future<> +compaction_manager::run_with_compaction_disabled(table* t, std::function ()> func) { + auto& c_state = _compaction_state[t]; + auto holder = c_state.with_compaction_disabled_gate.hold(); + co_await stop_ongoing_compactions("user-triggered operation", t); + + std::exception_ptr err; + try { + co_await func(); + } catch (...) { + err = std::current_exception(); + } + +#ifdef DEBUG + assert(_compaction_state.contains(t)); +#endif + // submit compaction request if we're the last holder of the gate which is still opened. + if (!c_state.with_compaction_disabled_gate.is_closed() && c_state.with_compaction_disabled_gate.get_count() == 1) { + holder.release(); + submit(t); + } + if (err) { + std::rethrow_exception(err); + } + co_return; +} + void compaction_manager::task::setup_new_compaction() { compaction_data = create_compaction_data(); compaction_running = true; @@ -572,7 +603,8 @@ void compaction_manager::do_stop() noexcept { } inline bool compaction_manager::can_proceed(const lw_shared_ptr& task) { - return (_state == state::enabled) && !task->stopping; + return (_state == state::enabled) && !task->stopping && + (task->type != sstables::compaction_type::Compaction || !_compaction_state[task->compacting_cf].compaction_disabled()); } inline future<> compaction_manager::put_task_to_sleep(lw_shared_ptr& task) { @@ -906,7 +938,7 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family // must ensure that all sstables created before we run are included // in the re-write, we need to barrier out any previously running // compaction. - return cf->run_with_compaction_disabled([this, cf, &tables, exclude_current_version] { + return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] { auto last_version = cf->get_sstables_manager().get_highest_supported_format(); for (auto& sst : get_candidates(*cf)) { @@ -942,7 +974,7 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: // since we might potentially have ongoing compactions, and we // must ensure that all sstables created before we run are scrubbed, // we need to barrier out any previously running compaction. - return cf->run_with_compaction_disabled([this, cf, scrub_mode] { + return run_with_compaction_disabled(cf, [this, cf, scrub_mode] { return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) { return get_candidates(cf); }, can_purge_tombstones::no); @@ -955,8 +987,11 @@ future<> compaction_manager::remove(column_family* cf) { // The requirement above is provided by stop_ongoing_compactions(). _postponed.erase(cf); + // Wait for all functions running under with_compaction_disabled_gate to terminate. + auto close_gate = _compaction_state[cf].with_compaction_disabled_gate.close(); + // Wait for the termination of an ongoing compaction on cf, if any. - co_await stop_ongoing_compactions("column family removal", cf); + co_await when_all_succeed(stop_ongoing_compactions("column family removal", cf), std::move(close_gate)); #ifdef DEBUG assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); #endif diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 585510c3b4..9a33e4e251 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -125,6 +125,10 @@ private: struct compaction_state { // Prevents table from running major and minor compaction at the same time. rwlock lock; + // Used to wait for termination of any function running under run_with_compaction_disabled(). + seastar::gate with_compaction_disabled_gate; + + bool compaction_disabled() const; }; std::unordered_map _compaction_state; @@ -251,6 +255,9 @@ public: // parameter job is a function that will carry the operation future<> run_custom_job(column_family* cf, sstables::compaction_type type, noncopyable_function(sstables::compaction_data&)> job); + // Run a function with compaction temporarily disabled for a table T. + future<> run_with_compaction_disabled(table* t, std::function ()> func); + // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf); @@ -268,6 +275,10 @@ public: }); }; + bool compaction_disabled(table* t) const { + return _compaction_state.contains(t) && _compaction_state.at(t).compaction_disabled(); + } + // Stops ongoing compaction of a given type. void stop_compaction(sstring type); diff --git a/database.cc b/database.cc index b38e1d0d19..b84dbc6a0e 100644 --- a/database.cc +++ b/database.cc @@ -2128,7 +2128,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun auto low_mark = cf.set_low_replay_position_mark(); - return cf.run_with_compaction_disabled([this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable { + return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable { future<> f = make_ready_future<>(); bool did_flush = false; if (should_flush && cf.can_flush()) { @@ -2179,7 +2179,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) { return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) { auto& vcf = find_column_family(v); - return vcf.run_with_compaction_disabled([&vcf, truncated_at, should_flush] { + return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] { return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] { return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) { return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp); diff --git a/database.hh b/database.hh index eac13cd780..4f66a4065f 100644 --- a/database.hh +++ b/database.hh @@ -442,7 +442,6 @@ private: bool _durable_writes; compaction_manager& _compaction_manager; secondary_index::secondary_index_manager _index_manager; - int _compaction_disabled = 0; bool _compaction_disabled_by_user = false; utils::phased_barrier _flush_barrier; std::vector _views; @@ -922,8 +921,6 @@ public: cache_hit_rate get_hit_rate(gms::inet_address addr); void drop_hit_rate(gms::inet_address addr); - future<> run_with_compaction_disabled(std::function ()> func); - void enable_auto_compaction(); void disable_auto_compaction(); bool is_auto_compaction_disabled_by_user() const { diff --git a/table.cc b/table.cc index c5878f0d23..6909279c45 100644 --- a/table.cc +++ b/table.cc @@ -981,7 +981,7 @@ void table::try_trigger_compaction() noexcept { void table::do_trigger_compaction() { // But not if we're locked out or stopping - if (!_compaction_disabled && !_async_gate.is_closed()) { + if (!_async_gate.is_closed()) { _compaction_manager.submit(this); } } @@ -1505,7 +1505,7 @@ future<> table::clear() { // NOTE: does not need to be futurized, but might eventually, depending on // if we implement notifications, whatnot. future table::discard_sstables(db_clock::time_point truncated_at) { - assert(_compaction_disabled > 0); + assert(_compaction_manager.compaction_disabled(this)); struct pruner { column_family& cf; @@ -2131,18 +2131,6 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double return _percentile_cache_value; } -future<> -table::run_with_compaction_disabled(std::function ()> func) { - ++_compaction_disabled; - return _compaction_manager.remove(this).then(std::move(func)).finally([this] { - if (--_compaction_disabled == 0) { - // we're turning if on again, use function that does not increment - // the counter further. - do_trigger_compaction(); - } - }); -} - void table::enable_auto_compaction() { // XXX: unmute backlog. turn table backlog back on. diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index e124e33143..fc99649f98 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -1764,7 +1764,7 @@ void populate(const std::vector& datasets, cql_test_env& env, const ta output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names()); - cf.run_with_compaction_disabled([&] { + db.get_compaction_manager().run_with_compaction_disabled(&cf, [&] { return seastar::async([&] { auto gen = ds.make_generator(s, cfg); while (auto mopt = gen()) { @@ -1866,11 +1866,11 @@ static std::initializer_list test_groups = { // Disables compaction for given tables. // Compaction will be resumed when the returned object dies. -auto make_compaction_disabling_guard(std::vector tables) { +auto make_compaction_disabling_guard(database& db, std::vector tables) { shared_promise<> pr; for (auto&& t : tables) { // FIXME: discarded future. - (void)t->run_with_compaction_disabled([f = shared_future<>(pr.get_shared_future())] { + (void)db.get_compaction_manager().run_with_compaction_disabled(t, [f = shared_future<>(pr.get_shared_future())] { return f.get_future(); }); } @@ -2034,7 +2034,7 @@ int main(int argc, char** argv) { return requested_test_groups.contains(tc.name); }); - auto compaction_guard = make_compaction_disabling_guard(boost::copy_range>( + auto compaction_guard = make_compaction_disabling_guard(db, boost::copy_range>( enabled_datasets | boost::adaptors::transformed([&] (auto&& ds) { return &find_table(db, *ds); })));