diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f456433530..1ab2672d2b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -271,7 +271,7 @@ future<> compaction_manager::perform_major_compaction(column_family* cf) { // it cannot be the other way around, or minor compaction for this column family would be // prevented while an ongoing major compaction doesn't release the semaphore. task->compaction_done = with_semaphore(_major_compaction_sem, 1, [this, task, cf] { - return with_lock(_compaction_locks[cf].for_write(), [this, task, cf] { + return with_lock(_compaction_state[cf].lock.for_write(), [this, task, cf] { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -325,7 +325,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, &job = *job_ptr] () mutable { // take read lock for cf, so major compaction and resharding can't proceed in parallel. - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, &job] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf, &job] () mutable { _stats.active_tasks++; if (!can_proceed(task)) { return make_ready_future<>(); @@ -353,6 +353,37 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact return task->compaction_done.get_future().then([task] {}); } +bool compaction_manager::compaction_state::compaction_disabled() const { + return with_compaction_disabled_gate.get_count() > 0; +} + +future<> +compaction_manager::run_with_compaction_disabled(table* t, std::function ()> func) { + auto& c_state = _compaction_state[t]; + auto holder = c_state.with_compaction_disabled_gate.hold(); + co_await stop_ongoing_compactions("user-triggered operation", t); + + std::exception_ptr err; + try { + co_await func(); + } catch (...) { + err = std::current_exception(); + } + +#ifdef DEBUG + assert(_compaction_state.contains(t)); +#endif + // submit compaction request if we're the last holder of the gate which is still opened. + if (!c_state.with_compaction_disabled_gate.is_closed() && c_state.with_compaction_disabled_gate.get_count() == 1) { + holder.release(); + submit(t); + } + if (err) { + std::rethrow_exception(err); + } + co_return; +} + void compaction_manager::task::setup_new_compaction() { compaction_data = create_compaction_data(); compaction_running = true; @@ -451,7 +482,7 @@ void compaction_manager::disable() { std::function compaction_manager::compaction_submission_callback() { return [this] () mutable { - for (auto& e: _compaction_locks) { + for (auto& e: _compaction_state) { submit(e.first); } }; @@ -485,13 +516,13 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf _postponed.insert(cf); } -future<> compaction_manager::stop_ongoing_compactions(sstring reason) { - cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason); - - // Wait for each task handler to stop. Copy list because task remove itself - // from the list when done. - auto tasks = _tasks; - return do_with(std::move(tasks), [this, reason] (std::list>& tasks) { +future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) { + // To prevent compaction from being postponed while tasks are being stopped, let's set all + // tasks as stopping before the deferring point below. + for (auto& t : tasks) { + t->stopping = true; + } + return do_with(std::move(tasks), [this, reason] (std::vector>& tasks) { return parallel_for_each(tasks, [this, reason] (auto& task) { return this->task_stop(task, reason).then_wrapped([](future <> f) { try { @@ -507,6 +538,23 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) { }); } +future<> compaction_manager::stop_ongoing_compactions(sstring reason) { + cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason); + + // Wait for each task handler to stop. Copy list because task remove itself + // from the list when done. + auto tasks = boost::copy_range>>(_tasks); + return stop_tasks(std::move(tasks), std::move(reason)); +} + +future<> compaction_manager::stop_ongoing_compactions(sstring reason, column_family* cf) { + auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([cf] (auto& task) { + return task->compacting_cf == cf; + })); + cmlog.info("Stopping {} ongoing compactions for table {}.{} due to {}", tasks.size(), cf->schema()->ks_name(), cf->schema()->cf_name(), reason); + return stop_tasks(std::move(tasks), std::move(reason)); +} + future<> compaction_manager::drain() { _state = state::disabled; return stop_ongoing_compactions("drain"); @@ -555,7 +603,8 @@ void compaction_manager::do_stop() noexcept { } inline bool compaction_manager::can_proceed(const lw_shared_ptr& task) { - return (_state == state::enabled) && !task->stopping; + return (_state == state::enabled) && !task->stopping && + (task->type != sstables::compaction_type::Compaction || !_compaction_state[task->compacting_cf].compaction_disabled()); } inline future<> compaction_manager::put_task_to_sleep(lw_shared_ptr& task) { @@ -600,7 +649,7 @@ void compaction_manager::submit(column_family* cf) { _stats.pending_tasks--; return make_ready_future(stop_iteration::yes); } - return with_lock(_compaction_locks[cf].for_read(), [this, task] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task] () mutable { return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable { column_family& cf = *task->compacting_cf; sstables::compaction_strategy cs = cf.get_compaction_strategy(); @@ -668,7 +717,7 @@ void compaction_manager::submit_offstrategy(column_family* cf) { return make_ready_future(stop_iteration::yes); } return with_semaphore(_custom_job_sem, 1, [this, task, cf] () mutable { - return with_lock(_compaction_locks[cf].for_read(), [this, task, cf] () mutable { + return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf] () mutable { _stats.pending_tasks--; if (!can_proceed(task)) { return make_ready_future(stop_iteration::yes); @@ -748,7 +797,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { // Take write lock for cf to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard. - return with_lock(_compaction_locks[&cf].for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { + return with_lock(_compaction_state[&cf].lock.for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable { _stats.pending_tasks--; _stats.active_tasks++; task->setup_new_compaction(); @@ -889,7 +938,7 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family // must ensure that all sstables created before we run are included // in the re-write, we need to barrier out any previously running // compaction. - return cf->run_with_compaction_disabled([this, cf, &tables, exclude_current_version] { + return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] { auto last_version = cf->get_sstables_manager().get_highest_supported_format(); for (auto& sst : get_candidates(*cf)) { @@ -925,7 +974,7 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: // since we might potentially have ongoing compactions, and we // must ensure that all sstables created before we run are scrubbed, // we need to barrier out any previously running compaction. - return cf->run_with_compaction_disabled([this, cf, scrub_mode] { + return run_with_compaction_disabled(cf, [this, cf, scrub_mode] { return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) { return get_candidates(cf); }, can_purge_tombstones::no); @@ -935,24 +984,19 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables:: future<> compaction_manager::remove(column_family* cf) { // We need to guarantee that a task being stopped will not retry to compact // a column family being removed. - auto tasks_to_stop = make_lw_shared>>(); - for (auto& task : _tasks) { - if (task->compacting_cf == cf) { - tasks_to_stop->push_back(task); - task->stopping = true; - } - } + // The requirement above is provided by stop_ongoing_compactions(). _postponed.erase(cf); + // Wait for all functions running under with_compaction_disabled_gate to terminate. + auto close_gate = _compaction_state[cf].with_compaction_disabled_gate.close(); + // Wait for the termination of an ongoing compaction on cf, if any. - return parallel_for_each(*tasks_to_stop, [this, cf] (auto& task) { - return this->task_stop(task, "column family removal"); - }).then([this, cf, tasks_to_stop] { + co_await when_all_succeed(stop_ongoing_compactions("column family removal", cf), std::move(close_gate)); #ifdef DEBUG - assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); + assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end()); #endif - _compaction_locks.erase(cf); - }); + _compaction_state.erase(cf); + co_return; } const std::vector compaction_manager::get_compactions() const { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 0c00abe356..9a33e4e251 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -121,8 +121,16 @@ private: // Purpose is to serialize major compaction across all column families, so as to // reduce disk space requirement. semaphore _major_compaction_sem{1}; - // Prevents column family from running major and minor compaction at same time. - std::unordered_map _compaction_locks; + + struct compaction_state { + // Prevents table from running major and minor compaction at the same time. + rwlock lock; + // Used to wait for termination of any function running under run_with_compaction_disabled(). + seastar::gate with_compaction_disabled_gate; + + bool compaction_disabled() const; + }; + std::unordered_map _compaction_state; semaphore _custom_job_sem{1}; seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}}; @@ -134,6 +142,7 @@ private: static constexpr std::chrono::seconds periodic_compaction_submission_interval() { return std::chrono::seconds(3600); } private: future<> task_stop(lw_shared_ptr task, sstring reason); + future<> stop_tasks(std::vector> tasks, sstring reason); // Return the largest fan-in of currently running compactions unsigned current_compaction_fan_in_threshold() const; @@ -185,6 +194,7 @@ private: future<> rewrite_sstables(column_family* cf, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); future<> stop_ongoing_compactions(sstring reason); + future<> stop_ongoing_compactions(sstring reason, column_family* cf); optimized_optional _early_abort_subscription; public: compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as); @@ -245,6 +255,9 @@ public: // parameter job is a function that will carry the operation future<> run_custom_job(column_family* cf, sstables::compaction_type type, noncopyable_function(sstables::compaction_data&)> job); + // Run a function with compaction temporarily disabled for a table T. + future<> run_with_compaction_disabled(table* t, std::function ()> func); + // Remove a column family from the compaction manager. // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf); @@ -262,6 +275,10 @@ public: }); }; + bool compaction_disabled(table* t) const { + return _compaction_state.contains(t) && _compaction_state.at(t).compaction_disabled(); + } + // Stops ongoing compaction of a given type. void stop_compaction(sstring type); diff --git a/database.cc b/database.cc index b38e1d0d19..b84dbc6a0e 100644 --- a/database.cc +++ b/database.cc @@ -2128,7 +2128,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun auto low_mark = cf.set_low_replay_position_mark(); - return cf.run_with_compaction_disabled([this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable { + return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable { future<> f = make_ready_future<>(); bool did_flush = false; if (should_flush && cf.can_flush()) { @@ -2179,7 +2179,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) { return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) { auto& vcf = find_column_family(v); - return vcf.run_with_compaction_disabled([&vcf, truncated_at, should_flush] { + return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] { return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] { return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) { return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp); diff --git a/database.hh b/database.hh index eac13cd780..4f66a4065f 100644 --- a/database.hh +++ b/database.hh @@ -442,7 +442,6 @@ private: bool _durable_writes; compaction_manager& _compaction_manager; secondary_index::secondary_index_manager _index_manager; - int _compaction_disabled = 0; bool _compaction_disabled_by_user = false; utils::phased_barrier _flush_barrier; std::vector _views; @@ -922,8 +921,6 @@ public: cache_hit_rate get_hit_rate(gms::inet_address addr); void drop_hit_rate(gms::inet_address addr); - future<> run_with_compaction_disabled(std::function ()> func); - void enable_auto_compaction(); void disable_auto_compaction(); bool is_auto_compaction_disabled_by_user() const { diff --git a/table.cc b/table.cc index c5878f0d23..6909279c45 100644 --- a/table.cc +++ b/table.cc @@ -981,7 +981,7 @@ void table::try_trigger_compaction() noexcept { void table::do_trigger_compaction() { // But not if we're locked out or stopping - if (!_compaction_disabled && !_async_gate.is_closed()) { + if (!_async_gate.is_closed()) { _compaction_manager.submit(this); } } @@ -1505,7 +1505,7 @@ future<> table::clear() { // NOTE: does not need to be futurized, but might eventually, depending on // if we implement notifications, whatnot. future table::discard_sstables(db_clock::time_point truncated_at) { - assert(_compaction_disabled > 0); + assert(_compaction_manager.compaction_disabled(this)); struct pruner { column_family& cf; @@ -2131,18 +2131,6 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double return _percentile_cache_value; } -future<> -table::run_with_compaction_disabled(std::function ()> func) { - ++_compaction_disabled; - return _compaction_manager.remove(this).then(std::move(func)).finally([this] { - if (--_compaction_disabled == 0) { - // we're turning if on again, use function that does not increment - // the counter further. - do_trigger_compaction(); - } - }); -} - void table::enable_auto_compaction() { // XXX: unmute backlog. turn table backlog back on. diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index e124e33143..fc99649f98 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -1764,7 +1764,7 @@ void populate(const std::vector& datasets, cql_test_env& env, const ta output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names()); - cf.run_with_compaction_disabled([&] { + db.get_compaction_manager().run_with_compaction_disabled(&cf, [&] { return seastar::async([&] { auto gen = ds.make_generator(s, cfg); while (auto mopt = gen()) { @@ -1866,11 +1866,11 @@ static std::initializer_list test_groups = { // Disables compaction for given tables. // Compaction will be resumed when the returned object dies. -auto make_compaction_disabling_guard(std::vector tables) { +auto make_compaction_disabling_guard(database& db, std::vector tables) { shared_promise<> pr; for (auto&& t : tables) { // FIXME: discarded future. - (void)t->run_with_compaction_disabled([f = shared_future<>(pr.get_shared_future())] { + (void)db.get_compaction_manager().run_with_compaction_disabled(t, [f = shared_future<>(pr.get_shared_future())] { return f.get_future(); }); } @@ -2034,7 +2034,7 @@ int main(int argc, char** argv) { return requested_test_groups.contains(tc.name); }); - auto compaction_guard = make_compaction_disabling_guard(boost::copy_range>( + auto compaction_guard = make_compaction_disabling_guard(db, boost::copy_range>( enabled_datasets | boost::adaptors::transformed([&] (auto&& ds) { return &find_table(db, *ds); })));