diff --git a/database.cc b/database.cc index 823106de52..874c4a1437 100644 --- a/database.cc +++ b/database.cc @@ -1324,7 +1324,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool return make_ready_future<>(); } - return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] { + return with_lock(_sstables_lock.for_read(), [this, descriptor = std::move(descriptor), cleanup] () mutable { auto create_sstable = [this] { auto gen = this->calculate_generation_for_new_table(); auto sst = sstables::make_sstable(_schema, _config.datadir, gen, diff --git a/sstables/compaction.cc b/sstables/compaction.cc index f839b867be..51298d8e2d 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -356,6 +356,7 @@ class regular_compaction : public compaction { // sstable being currently written. shared_sstable _sst; stdx::optional _writer; + stdx::optional _weight_registration; public: regular_compaction(column_family& cf, compaction_descriptor descriptor, std::function creator, seastar::thread_scheduling_group* tsg) @@ -363,6 +364,7 @@ public: , _creator(std::move(creator)) , _set(cf.get_sstable_set()) , _selector(_set.make_incremental_selector()) + , _weight_registration(std::move(descriptor.weight_registration)) { } @@ -405,10 +407,17 @@ public: } virtual void finish_sstable_writer() override { + on_end_of_stream(); if (_writer) { stop_sstable_writer(); } } +private: + void on_end_of_stream() { + if (_weight_registration) { + _cf.get_compaction_manager().on_compaction_complete(*_weight_registration); + } + } }; class cleanup_compaction final : public regular_compaction { diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 2bb6b43fef..62a7439e41 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -25,6 +25,7 @@ #include "database_fwd.hh" #include "shared_sstable.hh" #include "gc_clock.hh" +#include "compaction_weight_registration.hh" #include #include @@ -37,6 +38,8 @@ namespace sstables { int level; // Threshold size for sstable(s) to be created. uint64_t max_sstable_bytes; + // Holds ownership of a weight assigned to this compaction iff it's a regular one. + stdx::optional weight_registration; compaction_descriptor() = default; diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 5c88876801..cb1356c0c1 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -25,6 +25,7 @@ #include #include "exceptions.hh" #include +#include static logging::logger cmlog("compaction_manager"); @@ -69,7 +70,7 @@ compaction_weight_registration::compaction_weight_registration(compaction_manage , _cf(cf) , _weight(weight) { - _cm->register_weight(_cf, _weight); + _cm->register_weight(_weight); } compaction_weight_registration& compaction_weight_registration::operator=(compaction_weight_registration&& other) noexcept { @@ -92,12 +93,12 @@ compaction_weight_registration::compaction_weight_registration(compaction_weight compaction_weight_registration::~compaction_weight_registration() { if (_cm) { - _cm->deregister_weight(_cf, _weight); + _cm->deregister_weight(_weight); } } void compaction_weight_registration::deregister() { - _cm->deregister_weight(_cf, _weight); + _cm->deregister_weight(_weight); _cm = nullptr; } @@ -140,17 +141,12 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_ if (descriptor.level != 0 || descriptor.sstables.empty()) { return weight; } - auto it = _weight_tracker.find(cf); - if (it == _weight_tracker.end()) { - return weight; - } - std::unordered_set& s = it->second; uint64_t total_size = get_total_size(descriptor.sstables); int min_threshold = cf->schema()->min_compaction_threshold(); while (descriptor.sstables.size() > size_t(min_threshold)) { - if (s.count(weight)) { + if (_weight_tracker.count(weight)) { total_size -= descriptor.sstables.back()->data_size(); descriptor.sstables.pop_back(); weight = calculate_weight(total_size); @@ -161,20 +157,25 @@ int compaction_manager::trim_to_compact(column_family* cf, sstables::compaction_ return weight; } -bool compaction_manager::can_register_weight(column_family* cf, int weight, bool parallel_compaction) { - auto it = _weight_tracker.find(cf); - if (it == _weight_tracker.end()) { +bool compaction_manager::can_register_weight(column_family* cf, int weight) { + if (_weight_tracker.empty()) { return true; } - std::unordered_set& s = it->second; + + auto has_cf_ongoing_compaction = [&] { + return boost::algorithm::any_of(_tasks, [&] (const lw_shared_ptr& task) { + return task->compacting_cf == cf; + }); + }; + // Only one weight is allowed if parallel compaction is disabled. - if (!parallel_compaction && !s.empty()) { + if (!cf->get_compaction_strategy().parallel_compaction() && has_cf_ongoing_compaction()) { return false; } // TODO: Maybe allow only *smaller* compactions to start? That can be done // by returning true only if weight is not in the set and is lower than any // entry in the set. - if (s.count(weight)) { + if (_weight_tracker.count(weight)) { // If reached this point, it means that there is an ongoing compaction // with the weight of the compaction job. return false; @@ -182,19 +183,13 @@ bool compaction_manager::can_register_weight(column_family* cf, int weight, bool return true; } -void compaction_manager::register_weight(column_family* cf, int weight) { - auto it = _weight_tracker.find(cf); - if (it == _weight_tracker.end()) { - _weight_tracker.insert({cf, {weight}}); - } else { - it->second.insert(weight); - } +void compaction_manager::register_weight(int weight) { + _weight_tracker.insert(weight); } -void compaction_manager::deregister_weight(column_family* cf, int weight) { - auto it = _weight_tracker.find(cf); - assert(it != _weight_tracker.end()); - it->second.erase(weight); +void compaction_manager::deregister_weight(int weight) { + _weight_tracker.erase(weight); + reevalute_postponed_compactions(); } std::vector compaction_manager::get_candidates(const column_family& cf) { @@ -381,6 +376,7 @@ void compaction_manager::start() { _stopped = false; register_metrics(); _compaction_submission_timer.arm(periodic_compaction_submission_interval()); + postponed_compactions_reevaluation(); } std::function compaction_manager::compaction_submission_callback() { @@ -391,6 +387,34 @@ std::function compaction_manager::compaction_submission_callback() { }; } +void compaction_manager::postponed_compactions_reevaluation() { + _waiting_reevalution = repeat([this] { + return _postponed_reevaluation.wait().then([this] { + if (_stopped) { + _postponed.clear(); + return stop_iteration::yes; + } + auto postponed = std::move(_postponed); + try { + for (auto& cf : postponed) { + submit(cf); + } + } catch (...) { + _postponed = std::move(postponed); + } + return stop_iteration::no; + }); + }); +} + +void compaction_manager::reevalute_postponed_compactions() { + _postponed_reevaluation.signal(); +} + +void compaction_manager::postpone_compaction_for_column_family(column_family* cf) { + _postponed.push_back(cf); +} + future<> compaction_manager::stop() { cmlog.info("Asked to stop"); if (_stopped) { @@ -410,6 +434,9 @@ future<> compaction_manager::stop() { return parallel_for_each(tasks, [this] (auto& task) { return this->task_stop(task); }); + }).then([this] () mutable { + reevalute_postponed_compactions(); + return std::move(_waiting_reevalution); }).then([this] { _weight_tracker.clear(); _compaction_submission_timer.cancel(); @@ -466,22 +493,25 @@ void compaction_manager::submit(column_family* cf) { sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(cf, get_candidates(cf)); int weight = trim_to_compact(&cf, descriptor); - // Stop compaction task immediately if strategy is satisfied or job cannot run in parallel. - if (descriptor.sstables.empty() || !can_register_weight(&cf, weight, cs.parallel_compaction())) { + if (descriptor.sstables.empty() || !can_proceed(task)) { _stats.pending_tasks--; - cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}", + return make_ready_future(stop_iteration::yes); + } + if (!can_register_weight(&cf, weight)) { + _stats.pending_tasks--; + cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name()); + postpone_compaction_for_column_family(&cf); return make_ready_future(stop_iteration::yes); } auto compacting = compacting_sstable_registration(this, descriptor.sstables); - auto c_weight = compaction_weight_registration(this, &cf, weight); + descriptor.weight_registration = compaction_weight_registration(this, &cf, weight); cmlog.debug("Accepted compaction job ({} sstable(s)) of weight {} for {}.{}", descriptor.sstables.size(), weight, cf.schema()->ks_name(), cf.schema()->cf_name()); _stats.pending_tasks--; _stats.active_tasks++; - return cf.run_compaction(std::move(descriptor)) - .then_wrapped([this, task, compacting = std::move(compacting), c_weight = std::move(c_weight)] (future<> f) mutable { + return cf.run_compaction(std::move(descriptor)).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable { _stats.active_tasks--; if (!can_proceed(task)) { @@ -579,11 +609,12 @@ future<> compaction_manager::remove(column_family* cf) { task->stopping = true; } } + _postponed.erase(boost::remove(_postponed, cf), _postponed.end()); + // Wait for the termination of an ongoing compaction on cf, if any. return do_for_each(*tasks_to_stop, [this, cf] (auto& task) { return this->task_stop(task); }).then([this, cf, tasks_to_stop] { - _weight_tracker.erase(cf); _compaction_locks.erase(cf); }); } @@ -605,3 +636,8 @@ void compaction_manager::stop_compaction(sstring type) { } } } + +void compaction_manager::on_compaction_complete(compaction_weight_registration& weight_registration) { + weight_registration.deregister(); + reevalute_postponed_compactions(); +} diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index 2889972c3e..9bf9802463 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -73,9 +73,13 @@ private: // a sstable from being compacted twice. std::unordered_set _compacting_sstables; - // Keep track of weight of ongoing compaction for each column family. - // That's used to allow parallel compaction on the same column family. - std::unordered_map> _weight_tracker; + future<> _waiting_reevalution = make_ready_future<>(); + condition_variable _postponed_reevaluation; + // column families that wait for compaction but had its submission postponed due to ongoing compaction. + std::vector _postponed; + // tracks taken weights of ongoing compactions, only one compaction per weight is allowed. + // weight is value assigned to a compaction job that is log base N of total size of all input sstables. + std::unordered_set _weight_tracker; // Purpose is to serialize major compaction across all column families, so as to // reduce disk space requirement. @@ -93,14 +97,13 @@ private: private: future<> task_stop(lw_shared_ptr task); - // Return true if weight is not registered. If parallel_compaction is not - // true, only one weight is allowed to be registered. - bool can_register_weight(column_family* cf, int weight, bool parallel_compaction); + // Return true if weight is not registered. + bool can_register_weight(column_family* cf, int weight); // Register weight for a column family. Do that only if can_register_weight() // returned true. - void register_weight(column_family* cf, int weight); + void register_weight(int weight); // Deregister weight for a column family. - void deregister_weight(column_family* cf, int weight); + void deregister_weight(int weight); // If weight of compaction job is taken, it will be trimmed until its new // weight is not taken or its size is equal to minimum threshold. @@ -125,6 +128,12 @@ private: // stop of transportation services. It cannot make progress anyway. // Returns true if error is judged not fatal, and compaction can be retried. inline bool maybe_stop_on_error(future<> f); + + void postponed_compactions_reevaluation(); + void reevalute_postponed_compactions(); + // Postpone compaction for a column family that couldn't be executed due to ongoing + // similar-sized compaction. + void postpone_compaction_for_column_family(column_family* cf); public: compaction_manager(); ~compaction_manager(); @@ -188,6 +197,11 @@ public: // Stops ongoing compaction of a given type. void stop_compaction(sstring type); + // Called by compaction procedure to release the weight lock assigned to it, such that + // another compaction waiting on same weight can start as soon as possible. That's usually + // called before compaction seals sstable and such and after all compaction work is done. + void on_compaction_complete(compaction_weight_registration& weight_registration); + friend class compacting_sstable_registration; friend class compaction_weight_registration; };