compaction_manager: add maybe_wait_for_sstable_count_reduction

Called from try_flush_memtable_to_sstable,
maybe_wait_for_sstable_count_reduction will wait for
compaction to catch up with memtable flush if there
the bucket to compact is inflated, having too many
sstables.  In that case we don't want to add fuel
to the fire by creating yet another sstable.

Fixes #4116

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-07-03 18:36:50 +03:00
parent 69d4a16908
commit f26e655646
3 changed files with 66 additions and 0 deletions

View File

@@ -513,6 +513,10 @@ inline compaction_controller make_compaction_controller(compaction_manager::sche
return compaction_controller(csg, static_shares, 250ms, std::move(fn));
}
compaction_manager::compaction_state::~compaction_state() {
compaction_done.broken();
}
std::string compaction_manager::task::describe() const {
auto* t = _compacting_table;
auto s = t->schema();
@@ -611,6 +615,7 @@ void compaction_manager::task::finish_compaction(state finish_state) noexcept {
if (finish_state != state::failed) {
_compaction_retry.reset();
}
_compaction_state.compaction_done.signal();
}
void compaction_manager::task::stop(sstring reason) noexcept {
@@ -981,6 +986,50 @@ void compaction_manager::submit(compaction::table_state& t) {
(void)perform_task(make_shared<regular_compaction_task>(*this, t));
}
bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) {
return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user();
}
future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) {
auto schema = t.schema();
if (!can_perform_regular_compaction(t)) {
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction",
schema->ks_name(), schema->cf_name());
co_return;
}
auto num_runs_for_compaction = [&, this] {
auto& cs = t.get_compaction_strategy();
auto desc = cs.get_sstables_for_compaction(t, get_strategy_control(), get_candidates(t));
return boost::copy_range<std::unordered_set<utils::UUID>>(
desc.sstables
| boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size();
};
const auto threshold = std::max(schema->max_compaction_threshold(), 32);
auto count = num_runs_for_compaction();
if (count <= threshold) {
cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}",
schema->ks_name(), schema->cf_name(), count, threshold);
co_return;
}
// Reduce the chances of falling into an endless wait, if compaction
// wasn't scheduled for the table due to a problem.
submit(t);
using namespace std::chrono_literals;
auto start = db_clock::now();
auto& cstate = get_compaction_state(&t);
try {
co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] {
return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t);
});
} catch (const broken_condition_variable&) {
co_return;
}
auto end = db_clock::now();
auto elapsed_ms = (end - start) / 1ms;
cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs",
elapsed_ms, schema->ks_name(), schema->cf_name(), count);
}
class compaction_manager::offstrategy_compaction_task : public compaction_manager::task {
bool _performed = false;
public:

View File

@@ -66,6 +66,13 @@ private:
// Raised by any function running under run_with_compaction_disabled();
long compaction_disabled_counter = 0;
// Signaled whenever a compaction task completes.
condition_variable compaction_done;
compaction_state() = default;
compaction_state(compaction_state&&) = default;
~compaction_state();
bool compaction_disabled() const noexcept {
return compaction_disabled_counter > 0;
}
@@ -379,6 +386,13 @@ public:
// Submit a table to be compacted.
void submit(compaction::table_state& t);
// Can regular compaction be performed in the given table
bool can_perform_regular_compaction(compaction::table_state& t);
// Maybe wait before adding more sstables
// if there are too many sstables.
future<> maybe_wait_for_sstable_count_reduction(compaction::table_state& t);
// Submit a table to be off-strategy compacted.
// Returns true iff off-strategy compaction was required and performed.
future<bool> perform_offstrategy(compaction::table_state& t);

View File

@@ -761,6 +761,9 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_
_memtables->erase(old);
co_return;
}
if (!_async_gate.is_closed()) {
co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(as_table_state());
}
} catch (...) {
err = std::current_exception();
}