Merge branch 'move_disable_compaction_to_manager/v6' from Raphael S. Carvalho

Move run_with_compaction_disabled() into compaction manager

run_with_compaction_disabled() living in table is a layer violation as the
logic of disabling compaction for a table T clearly belongs to manager
and table shouldn't be aware of such implementation details.
This makes things less error prone too as there's no longer a need for
coordination between table and manager.
Manager now takes all the responsibility.

* 'move_disable_compaction_to_manager/v6' of https://github.com/raphaelsc/scylla:
  compaction: move run_with_compaction_disabled() from table into compaction_manager
  compaction_manager: switch to coroutine in compaction_manager::remove()
  compaction_manager: add struct for per table compaction state
  compaction_manager: wire stop_ongoing_compactions() into remove()
  compaction_manager: introduce stop_ongoing_compactions() for a table
  compaction_manager: prevent compaction from being postponed when stopping tasks
  compaction_manager: extract "stop tasks" from stop_ongoing_compactions() into new function
This commit is contained in:
Pavel Emelyanov
2021-11-09 17:00:52 +03:00
committed by Avi Kivity
6 changed files with 100 additions and 54 deletions

View File

@@ -271,7 +271,7 @@ future<> compaction_manager::perform_major_compaction(column_family* cf) {
// it cannot be the other way around, or minor compaction for this column family would be
// prevented while an ongoing major compaction doesn't release the semaphore.
task->compaction_done = with_semaphore(_major_compaction_sem, 1, [this, task, cf] {
return with_lock(_compaction_locks[cf].for_write(), [this, task, cf] {
return with_lock(_compaction_state[cf].lock.for_write(), [this, task, cf] {
_stats.active_tasks++;
if (!can_proceed(task)) {
return make_ready_future<>();
@@ -325,7 +325,7 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact
task->compaction_done = with_semaphore(_custom_job_sem, 1, [this, task, cf, &job = *job_ptr] () mutable {
// take read lock for cf, so major compaction and resharding can't proceed in parallel.
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf, &job] () mutable {
return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf, &job] () mutable {
_stats.active_tasks++;
if (!can_proceed(task)) {
return make_ready_future<>();
@@ -353,6 +353,37 @@ future<> compaction_manager::run_custom_job(column_family* cf, sstables::compact
return task->compaction_done.get_future().then([task] {});
}
bool compaction_manager::compaction_state::compaction_disabled() const {
return with_compaction_disabled_gate.get_count() > 0;
}
future<>
compaction_manager::run_with_compaction_disabled(table* t, std::function<future<> ()> func) {
auto& c_state = _compaction_state[t];
auto holder = c_state.with_compaction_disabled_gate.hold();
co_await stop_ongoing_compactions("user-triggered operation", t);
std::exception_ptr err;
try {
co_await func();
} catch (...) {
err = std::current_exception();
}
#ifdef DEBUG
assert(_compaction_state.contains(t));
#endif
// submit compaction request if we're the last holder of the gate which is still opened.
if (!c_state.with_compaction_disabled_gate.is_closed() && c_state.with_compaction_disabled_gate.get_count() == 1) {
holder.release();
submit(t);
}
if (err) {
std::rethrow_exception(err);
}
co_return;
}
void compaction_manager::task::setup_new_compaction() {
compaction_data = create_compaction_data();
compaction_running = true;
@@ -451,7 +482,7 @@ void compaction_manager::disable() {
std::function<void()> compaction_manager::compaction_submission_callback() {
return [this] () mutable {
for (auto& e: _compaction_locks) {
for (auto& e: _compaction_state) {
submit(e.first);
}
};
@@ -485,13 +516,13 @@ void compaction_manager::postpone_compaction_for_column_family(column_family* cf
_postponed.insert(cf);
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason);
// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = _tasks;
return do_with(std::move(tasks), [this, reason] (std::list<lw_shared_ptr<task>>& tasks) {
future<> compaction_manager::stop_tasks(std::vector<lw_shared_ptr<task>> tasks, sstring reason) {
// To prevent compaction from being postponed while tasks are being stopped, let's set all
// tasks as stopping before the deferring point below.
for (auto& t : tasks) {
t->stopping = true;
}
return do_with(std::move(tasks), [this, reason] (std::vector<lw_shared_ptr<task>>& tasks) {
return parallel_for_each(tasks, [this, reason] (auto& task) {
return this->task_stop(task, reason).then_wrapped([](future <> f) {
try {
@@ -507,6 +538,23 @@ future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
});
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason) {
cmlog.info("Stopping {} ongoing compactions due to {}", get_compactions().size(), reason);
// Wait for each task handler to stop. Copy list because task remove itself
// from the list when done.
auto tasks = boost::copy_range<std::vector<lw_shared_ptr<task>>>(_tasks);
return stop_tasks(std::move(tasks), std::move(reason));
}
future<> compaction_manager::stop_ongoing_compactions(sstring reason, column_family* cf) {
auto tasks = boost::copy_range<std::vector<lw_shared_ptr<task>>>(_tasks | boost::adaptors::filtered([cf] (auto& task) {
return task->compacting_cf == cf;
}));
cmlog.info("Stopping {} ongoing compactions for table {}.{} due to {}", tasks.size(), cf->schema()->ks_name(), cf->schema()->cf_name(), reason);
return stop_tasks(std::move(tasks), std::move(reason));
}
future<> compaction_manager::drain() {
_state = state::disabled;
return stop_ongoing_compactions("drain");
@@ -555,7 +603,8 @@ void compaction_manager::do_stop() noexcept {
}
inline bool compaction_manager::can_proceed(const lw_shared_ptr<task>& task) {
return (_state == state::enabled) && !task->stopping;
return (_state == state::enabled) && !task->stopping &&
(task->type != sstables::compaction_type::Compaction || !_compaction_state[task->compacting_cf].compaction_disabled());
}
inline future<> compaction_manager::put_task_to_sleep(lw_shared_ptr<task>& task) {
@@ -600,7 +649,7 @@ void compaction_manager::submit(column_family* cf) {
_stats.pending_tasks--;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_lock(_compaction_locks[cf].for_read(), [this, task] () mutable {
return with_lock(_compaction_state[cf].lock.for_read(), [this, task] () mutable {
return with_scheduling_group(_compaction_controller.sg(), [this, task = std::move(task)] () mutable {
column_family& cf = *task->compacting_cf;
sstables::compaction_strategy cs = cf.get_compaction_strategy();
@@ -668,7 +717,7 @@ void compaction_manager::submit_offstrategy(column_family* cf) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return with_semaphore(_custom_job_sem, 1, [this, task, cf] () mutable {
return with_lock(_compaction_locks[cf].for_read(), [this, task, cf] () mutable {
return with_lock(_compaction_state[cf].lock.for_read(), [this, task, cf] () mutable {
_stats.pending_tasks--;
if (!can_proceed(task)) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -748,7 +797,7 @@ future<> compaction_manager::rewrite_sstables(column_family* cf, sstables::compa
return with_semaphore(_rewrite_sstables_sem, 1, [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable {
// Take write lock for cf to serialize cleanup/upgrade sstables/scrub with major compaction/reshape/reshard.
return with_lock(_compaction_locks[&cf].for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable {
return with_lock(_compaction_state[&cf].lock.for_write(), [this, task, &cf, descriptor = std::move(descriptor), compacting] () mutable {
_stats.pending_tasks--;
_stats.active_tasks++;
task->setup_new_compaction();
@@ -889,7 +938,7 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family
// must ensure that all sstables created before we run are included
// in the re-write, we need to barrier out any previously running
// compaction.
return cf->run_with_compaction_disabled([this, cf, &tables, exclude_current_version] {
return run_with_compaction_disabled(cf, [this, cf, &tables, exclude_current_version] {
auto last_version = cf->get_sstables_manager().get_highest_supported_format();
for (auto& sst : get_candidates(*cf)) {
@@ -925,7 +974,7 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::
// since we might potentially have ongoing compactions, and we
// must ensure that all sstables created before we run are scrubbed,
// we need to barrier out any previously running compaction.
return cf->run_with_compaction_disabled([this, cf, scrub_mode] {
return run_with_compaction_disabled(cf, [this, cf, scrub_mode] {
return rewrite_sstables(cf, sstables::compaction_type_options::make_scrub(scrub_mode), [this] (const table& cf) {
return get_candidates(cf);
}, can_purge_tombstones::no);
@@ -935,24 +984,19 @@ future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::
future<> compaction_manager::remove(column_family* cf) {
// We need to guarantee that a task being stopped will not retry to compact
// a column family being removed.
auto tasks_to_stop = make_lw_shared<std::vector<lw_shared_ptr<task>>>();
for (auto& task : _tasks) {
if (task->compacting_cf == cf) {
tasks_to_stop->push_back(task);
task->stopping = true;
}
}
// The requirement above is provided by stop_ongoing_compactions().
_postponed.erase(cf);
// Wait for all functions running under with_compaction_disabled_gate to terminate.
auto close_gate = _compaction_state[cf].with_compaction_disabled_gate.close();
// Wait for the termination of an ongoing compaction on cf, if any.
return parallel_for_each(*tasks_to_stop, [this, cf] (auto& task) {
return this->task_stop(task, "column family removal");
}).then([this, cf, tasks_to_stop] {
co_await when_all_succeed(stop_ongoing_compactions("column family removal", cf), std::move(close_gate));
#ifdef DEBUG
assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end());
assert(std::find_if(_tasks.begin(), _tasks.end(), [cf] (auto& task) { return task->compacting_cf == cf; }) == _tasks.end());
#endif
_compaction_locks.erase(cf);
});
_compaction_state.erase(cf);
co_return;
}
const std::vector<sstables::compaction_info> compaction_manager::get_compactions() const {

View File

@@ -121,8 +121,16 @@ private:
// Purpose is to serialize major compaction across all column families, so as to
// reduce disk space requirement.
semaphore _major_compaction_sem{1};
// Prevents column family from running major and minor compaction at same time.
std::unordered_map<column_family*, rwlock> _compaction_locks;
struct compaction_state {
// Prevents table from running major and minor compaction at the same time.
rwlock lock;
// Used to wait for termination of any function running under run_with_compaction_disabled().
seastar::gate with_compaction_disabled_gate;
bool compaction_disabled() const;
};
std::unordered_map<table*, compaction_state> _compaction_state;
semaphore _custom_job_sem{1};
seastar::named_semaphore _rewrite_sstables_sem = {1, named_semaphore_exception_factory{"rewrite sstables"}};
@@ -134,6 +142,7 @@ private:
static constexpr std::chrono::seconds periodic_compaction_submission_interval() { return std::chrono::seconds(3600); }
private:
future<> task_stop(lw_shared_ptr<task> task, sstring reason);
future<> stop_tasks(std::vector<lw_shared_ptr<task>> tasks, sstring reason);
// Return the largest fan-in of currently running compactions
unsigned current_compaction_fan_in_threshold() const;
@@ -185,6 +194,7 @@ private:
future<> rewrite_sstables(column_family* cf, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<> stop_ongoing_compactions(sstring reason);
future<> stop_ongoing_compactions(sstring reason, column_family* cf);
optimized_optional<abort_source::subscription> _early_abort_subscription;
public:
compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);
@@ -245,6 +255,9 @@ public:
// parameter job is a function that will carry the operation
future<> run_custom_job(column_family* cf, sstables::compaction_type type, noncopyable_function<future<>(sstables::compaction_data&)> job);
// Run a function with compaction temporarily disabled for a table T.
future<> run_with_compaction_disabled(table* t, std::function<future<> ()> func);
// Remove a column family from the compaction manager.
// Cancel requests on cf and wait for a possible ongoing compaction on cf.
future<> remove(column_family* cf);
@@ -262,6 +275,10 @@ public:
});
};
bool compaction_disabled(table* t) const {
return _compaction_state.contains(t) && _compaction_state.at(t).compaction_disabled();
}
// Stops ongoing compaction of a given type.
void stop_compaction(sstring type);

View File

@@ -2128,7 +2128,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
auto low_mark = cf.set_low_replay_position_mark();
return cf.run_with_compaction_disabled([this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
return _compaction_manager->run_with_compaction_disabled(&cf, [this, &cf, should_flush, auto_snapshot, tsf = std::move(tsf), low_mark]() mutable {
future<> f = make_ready_future<>();
bool did_flush = false;
if (should_flush && cf.can_flush()) {
@@ -2179,7 +2179,7 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun
future<> database::truncate_views(const column_family& base, db_clock::time_point truncated_at, bool should_flush) {
return parallel_for_each(base.views(), [this, truncated_at, should_flush] (view_ptr v) {
auto& vcf = find_column_family(v);
return vcf.run_with_compaction_disabled([&vcf, truncated_at, should_flush] {
return _compaction_manager->run_with_compaction_disabled(&vcf, [&vcf, truncated_at, should_flush] {
return (should_flush ? vcf.flush() : vcf.clear()).then([&vcf, truncated_at, should_flush] {
return vcf.discard_sstables(truncated_at).then([&vcf, truncated_at, should_flush](db::replay_position rp) {
return db::system_keyspace::save_truncation_record(vcf, truncated_at, rp);

View File

@@ -442,7 +442,6 @@ private:
bool _durable_writes;
compaction_manager& _compaction_manager;
secondary_index::secondary_index_manager _index_manager;
int _compaction_disabled = 0;
bool _compaction_disabled_by_user = false;
utils::phased_barrier _flush_barrier;
std::vector<view_ptr> _views;
@@ -922,8 +921,6 @@ public:
cache_hit_rate get_hit_rate(gms::inet_address addr);
void drop_hit_rate(gms::inet_address addr);
future<> run_with_compaction_disabled(std::function<future<> ()> func);
void enable_auto_compaction();
void disable_auto_compaction();
bool is_auto_compaction_disabled_by_user() const {

View File

@@ -981,7 +981,7 @@ void table::try_trigger_compaction() noexcept {
void table::do_trigger_compaction() {
// But not if we're locked out or stopping
if (!_compaction_disabled && !_async_gate.is_closed()) {
if (!_async_gate.is_closed()) {
_compaction_manager.submit(this);
}
}
@@ -1505,7 +1505,7 @@ future<> table::clear() {
// NOTE: does not need to be futurized, but might eventually, depending on
// if we implement notifications, whatnot.
future<db::replay_position> table::discard_sstables(db_clock::time_point truncated_at) {
assert(_compaction_disabled > 0);
assert(_compaction_manager.compaction_disabled(this));
struct pruner {
column_family& cf;
@@ -2131,18 +2131,6 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double
return _percentile_cache_value;
}
future<>
table::run_with_compaction_disabled(std::function<future<> ()> func) {
++_compaction_disabled;
return _compaction_manager.remove(this).then(std::move(func)).finally([this] {
if (--_compaction_disabled == 0) {
// we're turning if on again, use function that does not increment
// the counter further.
do_trigger_compaction();
}
});
}
void
table::enable_auto_compaction() {
// XXX: unmute backlog. turn table backlog back on.

View File

@@ -1764,7 +1764,7 @@ void populate(const std::vector<dataset*>& datasets, cql_test_env& env, const ta
output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names());
cf.run_with_compaction_disabled([&] {
db.get_compaction_manager().run_with_compaction_disabled(&cf, [&] {
return seastar::async([&] {
auto gen = ds.make_generator(s, cfg);
while (auto mopt = gen()) {
@@ -1866,11 +1866,11 @@ static std::initializer_list<test_group> test_groups = {
// Disables compaction for given tables.
// Compaction will be resumed when the returned object dies.
auto make_compaction_disabling_guard(std::vector<table*> tables) {
auto make_compaction_disabling_guard(database& db, std::vector<table*> tables) {
shared_promise<> pr;
for (auto&& t : tables) {
// FIXME: discarded future.
(void)t->run_with_compaction_disabled([f = shared_future<>(pr.get_shared_future())] {
(void)db.get_compaction_manager().run_with_compaction_disabled(t, [f = shared_future<>(pr.get_shared_future())] {
return f.get_future();
});
}
@@ -2034,7 +2034,7 @@ int main(int argc, char** argv) {
return requested_test_groups.contains(tc.name);
});
auto compaction_guard = make_compaction_disabling_guard(boost::copy_range<std::vector<table*>>(
auto compaction_guard = make_compaction_disabling_guard(db, boost::copy_range<std::vector<table*>>(
enabled_datasets | boost::adaptors::transformed([&] (auto&& ds) {
return &find_table(db, *ds);
})));