Merge "exception safety and minimum work for compaction controller" from Glauber
" This was sent before as two separate patchsets. It is now unified because it has a lot of common infrastructure. In this patchset I am aiming at two goals: 1) Provide a minimum amount of shares for user-initiated operations like nodetool compact and nodetool cleanup 2) Be more robust with exceptions in the backlog tracker For the first, the main difference is that I now made the compaction controller a part of the compaction manager. It then becomes easy to consult with the compaction controller for the correct amount of shares those operations should have. In compaction_strategy.cc, the major_compaction_strategy object was actually already unused before. So instead of making use of it, which would require some form of information flow downwards about the backlog we need to export, I am creating a user-initiated backlog type inside the compaction manager. With the two changes described above everything is very well self-contained within the compaction manager and the implementation becomes trivial. For the second, I am now handling exceptions in two places: 1) the backlog computation. Those are const functions so if we just have a transient exception when compacting the backlog, all we need to do is return some fixed amount of shares and try again in the next adjustment window. 2) the process of adding / removing SSTables. Those are harder, since if we fail to manipulate the list we'll be left in an inconsistent state. The best approach is then to disable the backlog tracker and return a fixed amount of shares globally. Tests: unit (release) " * 'backlog-improvements-v3' of github.com:glommer/scylla: compaction_manager: disable backlog tracker if we see an exception backlog tracker: protect against exceptions in backlog calculation. STCS_backlog: protect against negative backlog STCS_backlog: remove unused attribute compaction strategy: move size tiered backlog to a header compaction_strategy: delete major_compaction_strategy class compaction: make sure that user-initiated compactions always have a minimum priority backlog_controller: add constants to represent a globally disabled controller backlog_controller: move compaction controller to the compaction manager backlog_controller: allow users to compute inverse function of shares
This commit is contained in:
@@ -96,6 +96,12 @@ protected:
|
||||
}
|
||||
|
||||
virtual ~backlog_controller() {}
|
||||
public:
|
||||
backlog_controller(backlog_controller&&) = default;
|
||||
float backlog_of_shares(float shares) const;
|
||||
seastar::scheduling_group sg() {
|
||||
return _scheduling_group;
|
||||
}
|
||||
};
|
||||
|
||||
// memtable flush CPU controller.
|
||||
@@ -128,6 +134,8 @@ public:
|
||||
class compaction_controller : public backlog_controller {
|
||||
public:
|
||||
static constexpr unsigned normalization_factor = 10;
|
||||
static constexpr float disable_backlog = std::numeric_limits<double>::infinity();
|
||||
static constexpr float backlog_disabled(float backlog) { return std::isinf(backlog); }
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, float static_shares) : backlog_controller(sg, iop, static_shares) {}
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, std::chrono::milliseconds interval, std::function<float()> current_backlog)
|
||||
: backlog_controller(sg, iop, std::move(interval),
|
||||
|
||||
39
database.cc
39
database.cc
@@ -2085,12 +2085,12 @@ make_flush_controller(db::config& cfg, seastar::scheduling_group sg, const ::io_
|
||||
}
|
||||
|
||||
inline
|
||||
compaction_controller
|
||||
make_compaction_controller(db::config& cfg, seastar::scheduling_group sg, const ::io_priority_class& iop, std::function<double()> fn) {
|
||||
std::unique_ptr<compaction_manager>
|
||||
make_compaction_manager(db::config& cfg, database_config& dbcfg) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return compaction_controller(sg, iop, cfg.compaction_static_shares());
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), cfg.compaction_static_shares());
|
||||
}
|
||||
return compaction_controller(sg, iop, 250ms, std::move(fn));
|
||||
return std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group, service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
@@ -2127,19 +2127,8 @@ database::database(const db::config& cfg, database_config dbcfg)
|
||||
, _data_query_stage("data_query", _dbcfg.statement_scheduling_group, &column_family::query)
|
||||
, _mutation_query_stage(_dbcfg.statement_scheduling_group)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group))
|
||||
, _compaction_manager(make_compaction_manager(*_cfg, dbcfg))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _compaction_controller(make_compaction_controller(*_cfg, dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), [this] () -> float {
|
||||
auto backlog = _compaction_manager->backlog();
|
||||
// This means we are using an unimplemented strategy
|
||||
if (std::isinf(backlog)) {
|
||||
// returning the normalization factor means that we'll return the maximum
|
||||
// output in the _control_points. We can get rid of this when we implement
|
||||
// all strategies.
|
||||
return compaction_controller::normalization_factor;
|
||||
}
|
||||
return _compaction_manager->backlog() / memory::stats().total_memory();
|
||||
}))
|
||||
, _large_partition_handler(std::make_unique<db::cql_table_large_partition_handler>(_cfg->compaction_large_partition_warning_threshold_mb()*1024*1024))
|
||||
{
|
||||
local_schema_registry().init(*this); // TODO: we're never unbound.
|
||||
@@ -2165,6 +2154,22 @@ void backlog_controller::adjust() {
|
||||
update_controller(result);
|
||||
}
|
||||
|
||||
float backlog_controller::backlog_of_shares(float shares) const {
|
||||
size_t idx = 1;
|
||||
while ((idx < _control_points.size() - 1) && (_control_points[idx].output < shares)) {
|
||||
idx++;
|
||||
}
|
||||
const control_point& cp = _control_points[idx];
|
||||
const control_point& last = _control_points[idx - 1];
|
||||
// Compute the inverse function of the backlog in the interpolation interval that we fall
|
||||
// into.
|
||||
//
|
||||
// The formula for the backlog inside an interpolation point is y = a + bx, so the inverse
|
||||
// function is x = (y - a) / b
|
||||
|
||||
return last.input + (shares - last.output) * (cp.input - last.input) / (cp.output - last.output);
|
||||
}
|
||||
|
||||
void backlog_controller::update_controller(float shares) {
|
||||
_scheduling_group.set_shares(shares);
|
||||
if (!_inflight_update.available()) {
|
||||
@@ -3704,8 +3709,6 @@ database::stop() {
|
||||
return _streaming_dirty_memory_manager.shutdown();
|
||||
}).then([this] {
|
||||
return _memtable_controller.shutdown();
|
||||
}).then([this] {
|
||||
return _compaction_controller.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1171,8 +1171,6 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
bool _enable_incremental_backups = false;
|
||||
|
||||
compaction_controller _compaction_controller;
|
||||
|
||||
querier_cache _querier_cache;
|
||||
|
||||
std::unique_ptr<db::large_partition_handler> _large_partition_handler;
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "sstables/progress_monitor.hh"
|
||||
|
||||
class compaction_backlog_manager;
|
||||
class compaction_controller;
|
||||
|
||||
// Read and write progress are provided by structures present in progress_manager.hh
|
||||
// However, we don't want to be tied to their lifetimes and for that reason we will not
|
||||
@@ -88,6 +89,12 @@ public:
|
||||
void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true);
|
||||
void revert_charges(sstables::shared_sstable sst);
|
||||
private:
|
||||
void disable() {
|
||||
_disabled = true;
|
||||
_ongoing_writes = {};
|
||||
_ongoing_compactions = {};
|
||||
}
|
||||
bool _disabled = false;
|
||||
std::unique_ptr<impl> _impl;
|
||||
// We keep track of this so that we can transfer to a new tracker if the compaction strategy is
|
||||
// changed in the middle of a compaction.
|
||||
@@ -112,9 +119,11 @@ private:
|
||||
class compaction_backlog_manager {
|
||||
std::unordered_set<compaction_backlog_tracker*> _backlog_trackers;
|
||||
void remove_backlog_tracker(compaction_backlog_tracker* tracker);
|
||||
compaction_controller* _compaction_controller;
|
||||
friend class compaction_backlog_tracker;
|
||||
public:
|
||||
~compaction_backlog_manager();
|
||||
compaction_backlog_manager(compaction_controller& controller) : _compaction_controller(&controller) {}
|
||||
double backlog() const;
|
||||
void register_backlog_tracker(compaction_backlog_tracker& tracker);
|
||||
};
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
*/
|
||||
|
||||
#include "compaction_manager.hh"
|
||||
#include "compaction_strategy.hh"
|
||||
#include "compaction_backlog_manager.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include "database.hh"
|
||||
@@ -29,6 +30,7 @@
|
||||
#include <boost/range/algorithm/count_if.hpp>
|
||||
|
||||
static logging::logger cmlog("compaction_manager");
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
class compacting_sstable_registration {
|
||||
compaction_manager* _cm;
|
||||
@@ -214,6 +216,18 @@ void compaction_manager::deregister_compacting_sstables(const std::vector<sstabl
|
||||
}
|
||||
}
|
||||
|
||||
class user_initiated_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
public:
|
||||
explicit user_initiated_backlog_tracker(float added_backlog) : _added_backlog(added_backlog) {}
|
||||
private:
|
||||
float _added_backlog;
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return _added_backlog * memory::stats().total_memory();
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
};
|
||||
|
||||
future<> compaction_manager::submit_major_compaction(column_family* cf) {
|
||||
if (_stopped) {
|
||||
return make_ready_future<>();
|
||||
@@ -240,9 +254,12 @@ future<> compaction_manager::submit_major_compaction(column_family* cf) {
|
||||
auto compacting = compacting_sstable_registration(this, sstables);
|
||||
|
||||
cmlog.info0("User initiated compaction started on behalf of {}.{}", cf->schema()->ks_name(), cf->schema()->cf_name());
|
||||
|
||||
return with_scheduling_group(_scheduling_group, [this, cf, sstables = std::move(sstables)] () mutable {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(std::move(sstables)));
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200)));
|
||||
return do_with(std::move(user_initiated), [this, cf, sstables = std::move(sstables)] (compaction_backlog_tracker& bt) mutable {
|
||||
register_backlog_tracker(bt);
|
||||
return with_scheduling_group(_scheduling_group, [this, cf, sstables = std::move(sstables)] () mutable {
|
||||
return cf->compact_sstables(sstables::compaction_descriptor(std::move(sstables)));
|
||||
});
|
||||
}).then([compacting = std::move(compacting)] {});
|
||||
});
|
||||
}).then_wrapped([this, task] (future<> f) {
|
||||
@@ -310,8 +327,31 @@ future<> compaction_manager::task_stop(lw_shared_ptr<compaction_manager::task> t
|
||||
});
|
||||
}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg)
|
||||
: _scheduling_group(sg) {}
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop)
|
||||
: _compaction_controller(sg, iop, 250ms, [this] () -> float {
|
||||
auto b = backlog() / memory::stats().total_memory();
|
||||
// This means we are using an unimplemented strategy
|
||||
if (compaction_controller::backlog_disabled(b)) {
|
||||
// returning the normalization factor means that we'll return the maximum
|
||||
// output in the _control_points. We can get rid of this when we implement
|
||||
// all strategies.
|
||||
return compaction_controller::normalization_factor;
|
||||
}
|
||||
return b;
|
||||
})
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, uint64_t shares)
|
||||
: _compaction_controller(sg, iop, shares)
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _scheduling_group(_compaction_controller.sg())
|
||||
{}
|
||||
|
||||
compaction_manager::compaction_manager()
|
||||
: compaction_manager(seastar::default_scheduling_group(), default_priority_class(), 1)
|
||||
{}
|
||||
|
||||
compaction_manager::~compaction_manager() {
|
||||
// Assert that compaction manager was explicitly stopped, if started.
|
||||
@@ -397,7 +437,7 @@ future<> compaction_manager::stop() {
|
||||
_weight_tracker.clear();
|
||||
_compaction_submission_timer.cancel();
|
||||
cmlog.info("Stopped");
|
||||
return make_ready_future<>();
|
||||
return _compaction_controller.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -526,8 +566,11 @@ future<> compaction_manager::perform_cleanup(column_family* cf) {
|
||||
|
||||
_stats.pending_tasks--;
|
||||
_stats.active_tasks++;
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
return cf.cleanup_sstables(std::move(descriptor));
|
||||
compaction_backlog_tracker user_initiated(std::make_unique<user_initiated_backlog_tracker>(_compaction_controller.backlog_of_shares(200)));
|
||||
return do_with(std::move(user_initiated), [this, &cf, descriptor = std::move(descriptor)] (compaction_backlog_tracker& bt) mutable {
|
||||
return with_scheduling_group(_scheduling_group, [this, &cf, descriptor = std::move(descriptor)] () mutable {
|
||||
return cf.cleanup_sstables(std::move(descriptor));
|
||||
});
|
||||
}).then_wrapped([this, task, compacting = std::move(compacting)] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
if (!can_proceed(task)) {
|
||||
@@ -611,25 +654,61 @@ void compaction_manager::on_compaction_complete(compaction_weight_registration&
|
||||
}
|
||||
|
||||
double compaction_backlog_tracker::backlog() const {
|
||||
return _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
return _disabled ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions);
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
return;
|
||||
}
|
||||
_ongoing_writes.erase(sst);
|
||||
_impl->add_sstable(std::move(sst));
|
||||
try {
|
||||
_impl->add_sstable(std::move(sst));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) {
|
||||
if (_disabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
_ongoing_compactions.erase(sst);
|
||||
_impl->remove_sstable(std::move(sst));
|
||||
try {
|
||||
_impl->remove_sstable(std::move(sst));
|
||||
} catch (...) {
|
||||
cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception());
|
||||
disable();
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) {
|
||||
_ongoing_writes.emplace(sst, &wp);
|
||||
if (_disabled) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
_ongoing_writes.emplace(sst, &wp);
|
||||
} catch (...) {
|
||||
// We can potentially recover from adding ongoing compactions or writes when the process
|
||||
// ends. The backlog will just be temporarily wrong. If we are are suffering from something
|
||||
// more serious like memory exhaustion we will soon fail again in either add / remove and
|
||||
// then we'll disable the tracker. For now, try our best.
|
||||
cmlog.warn("backlog tracker couldn't register partially written SSTable to exception {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp) {
|
||||
_ongoing_compactions.emplace(sst, &rp);
|
||||
if (_disabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
_ongoing_compactions.emplace(sst, &rp);
|
||||
} catch (...) {
|
||||
cmlog.warn("backlog tracker couldn't register partially compacting SSTable to exception {}", std::current_exception());
|
||||
}
|
||||
}
|
||||
|
||||
void compaction_backlog_tracker::transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges) {
|
||||
@@ -662,12 +741,20 @@ void compaction_backlog_manager::remove_backlog_tracker(compaction_backlog_track
|
||||
}
|
||||
|
||||
double compaction_backlog_manager::backlog() const {
|
||||
double backlog = 0;
|
||||
try {
|
||||
double backlog = 0;
|
||||
|
||||
for (auto& tracker: _backlog_trackers) {
|
||||
backlog += tracker->backlog();
|
||||
for (auto& tracker: _backlog_trackers) {
|
||||
backlog += tracker->backlog();
|
||||
}
|
||||
if (compaction_controller::backlog_disabled(backlog)) {
|
||||
return compaction_controller::disable_backlog;
|
||||
} else {
|
||||
return backlog;
|
||||
}
|
||||
} catch (...) {
|
||||
return _compaction_controller->backlog_of_shares(1000);
|
||||
}
|
||||
return backlog;
|
||||
}
|
||||
|
||||
void compaction_backlog_manager::register_backlog_tracker(compaction_backlog_tracker& tracker) {
|
||||
|
||||
@@ -37,6 +37,7 @@
|
||||
#include "sstables/compaction.hh"
|
||||
#include "compaction_weight_registration.hh"
|
||||
#include "compaction_backlog_manager.hh"
|
||||
#include "backlog_controller.hh"
|
||||
|
||||
class column_family;
|
||||
class compacting_sstable_registration;
|
||||
@@ -137,10 +138,13 @@ private:
|
||||
// similar-sized compaction.
|
||||
void postpone_compaction_for_column_family(column_family* cf);
|
||||
|
||||
compaction_controller _compaction_controller;
|
||||
compaction_backlog_manager _backlog_manager;
|
||||
seastar::scheduling_group _scheduling_group;
|
||||
public:
|
||||
compaction_manager(seastar::scheduling_group sg = default_scheduling_group());
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop);
|
||||
compaction_manager(seastar::scheduling_group sg, const ::io_priority_class& iop, uint64_t shares);
|
||||
compaction_manager();
|
||||
~compaction_manager();
|
||||
|
||||
void register_metrics();
|
||||
|
||||
@@ -39,10 +39,7 @@
|
||||
|
||||
#include <vector>
|
||||
#include <chrono>
|
||||
#include <cmath>
|
||||
#include <ctgmath>
|
||||
#include <seastar/core/shared_ptr.hh>
|
||||
|
||||
#include "sstables.hh"
|
||||
#include "compaction.hh"
|
||||
#include "database.hh"
|
||||
@@ -60,6 +57,7 @@
|
||||
#include "leveled_compaction_strategy.hh"
|
||||
#include "time_window_compaction_strategy.hh"
|
||||
#include "sstables/compaction_backlog_manager.hh"
|
||||
#include "sstables/size_tiered_backlog_tracker.hh"
|
||||
|
||||
logging::logger date_tiered_manifest::logger = logging::logger("DateTieredCompactionStrategy");
|
||||
logging::logger leveled_manifest::logger("LeveledManifest");
|
||||
@@ -360,126 +358,9 @@ compaction_strategy_impl::get_resharding_jobs(column_family& cf, std::vector<sst
|
||||
return jobs;
|
||||
}
|
||||
|
||||
// Backlog for one SSTable under STCS:
|
||||
//
|
||||
// (1) Bi = Ei * log4 (T / Si),
|
||||
//
|
||||
// where Ei is the effective size of the SStable, Si is the Size of this
|
||||
// SSTable, and T is the total size of the Table.
|
||||
//
|
||||
// To calculate the backlog, we can use the logarithm in any base, but we choose
|
||||
// 4 as that is the historical minimum for the number of SSTables being compacted
|
||||
// together. Although now that minimum could be lifted, this is still a good number
|
||||
// of SSTables to aim for in a compaction execution.
|
||||
//
|
||||
// T, the total table size, is defined as
|
||||
//
|
||||
// (2) T = Sum(i = 0...N) { Si }.
|
||||
//
|
||||
// Ei, the effective size, is defined as
|
||||
//
|
||||
// (3) Ei = Si - Ci,
|
||||
//
|
||||
// where Ci is the total amount of bytes already compacted for this table.
|
||||
// For tables that are not under compaction, Ci = 0 and Si = Ei.
|
||||
//
|
||||
// Using the fact that log(a / b) = log(a) - log(b), we rewrite (1) as:
|
||||
//
|
||||
// Bi = Ei log4(T) - Ei log4(Si)
|
||||
//
|
||||
// For the entire Table, the Aggregate Backlog (A) is
|
||||
//
|
||||
// A = Sum(i = 0...N) { Ei * log4(T) - Ei * log4(Si) },
|
||||
//
|
||||
// which can be expressed as a sum of a table component and a SSTable component:
|
||||
//
|
||||
// A = Sum(i = 0...N) { Ei } * log4(T) - Sum(i = 0...N) { Ei * log4(Si) },
|
||||
//
|
||||
// and if we define C = Sum(i = 0...N) { Ci }, then we can write
|
||||
//
|
||||
// A = (T - C) * log4(T) - Sum(i = 0...N) { (Si - Ci)* log4(Si) }.
|
||||
//
|
||||
// Because the SSTable number can be quite big, we'd like to keep iterations to a minimum.
|
||||
// We can do that if we rewrite the expression above one more time, yielding:
|
||||
//
|
||||
// (4) A = T * log4(T) - C * log4(T) - (Sum(i = 0...N) { Si * log4(Si) } - Sum(i = 0...N) { Ci * log4(Si) }
|
||||
//
|
||||
// When SSTables are added or removed, we update the static parts of the equation, and
|
||||
// every time we need to compute the backlog we use the most up-to-date estimate of Ci to
|
||||
// calculate the compacted parts, having to iterate only over the SSTables that are compacting,
|
||||
// instead of all of them.
|
||||
//
|
||||
// For SSTables that are being currently written, we assume that they are a full SSTable in a
|
||||
// certain point in time, whose size is the amount of bytes currently written. So all we need
|
||||
// to do is keep track of them too, and add the current estimate to the static part of (4).
|
||||
class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
uint64_t _total_bytes = 0;
|
||||
uint64_t _sstables_compacted_total_bytes = 0;
|
||||
double _sstables_backlog_contribution = 0.0f;
|
||||
|
||||
struct inflight_component {
|
||||
uint64_t total_bytes = 0;
|
||||
double contribution = 0;
|
||||
};
|
||||
|
||||
inflight_component partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const {
|
||||
inflight_component in;
|
||||
for (auto& swp : ongoing_writes) {
|
||||
auto written = swp.second->written();
|
||||
if (written > 0) {
|
||||
in.total_bytes += written;
|
||||
in.contribution += written * log4(written);
|
||||
}
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const {
|
||||
inflight_component in;
|
||||
for (auto& crp : ongoing_compactions) {
|
||||
auto compacted = crp.second->compacted();
|
||||
in.total_bytes += compacted;
|
||||
in.contribution += compacted * log4((crp.first->data_size()));
|
||||
}
|
||||
return in;
|
||||
}
|
||||
double log4(double x) const {
|
||||
static constexpr double inv_log_4 = 1.0f / std::log(4);
|
||||
return log(x) * inv_log_4;
|
||||
}
|
||||
public:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
inflight_component partial = partial_backlog(ow);
|
||||
inflight_component compacted = compacted_backlog(oc);
|
||||
|
||||
auto total_bytes = _total_bytes + partial.total_bytes - compacted.total_bytes;
|
||||
if ((total_bytes == 0)) {
|
||||
return 0;
|
||||
}
|
||||
auto sstables_contribution = _sstables_backlog_contribution + partial.contribution - compacted.contribution;
|
||||
return (total_bytes * log4(total_bytes)) - sstables_contribution;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct unimplemented_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
return std::numeric_limits<double>::infinity();
|
||||
return compaction_controller::disable_backlog;
|
||||
}
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override { }
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override { }
|
||||
@@ -524,33 +405,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
//
|
||||
// Major compaction strategy is about compacting all available sstables into one.
|
||||
//
|
||||
class major_compaction_strategy : public compaction_strategy_impl {
|
||||
static constexpr size_t min_compact_threshold = 2;
|
||||
public:
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override {
|
||||
// At least, two sstables must be available for compaction to take place.
|
||||
if (cfs.sstables_count() < min_compact_threshold) {
|
||||
return sstables::compaction_descriptor();
|
||||
}
|
||||
return sstables::compaction_descriptor(std::move(candidates));
|
||||
}
|
||||
|
||||
virtual int64_t estimated_pending_compactions(column_family& cf) const override {
|
||||
return (cf.sstables_count() < min_compact_threshold) ? 0 : 1;
|
||||
}
|
||||
|
||||
virtual compaction_strategy_type type() const {
|
||||
return compaction_strategy_type::major;
|
||||
}
|
||||
|
||||
virtual compaction_backlog_tracker& get_backlog_tracker() override {
|
||||
return get_null_backlog_tracker();
|
||||
}
|
||||
};
|
||||
|
||||
leveled_compaction_strategy::leveled_compaction_strategy(const std::map<sstring, sstring>& options)
|
||||
: compaction_strategy_impl(options)
|
||||
, _stcs_options(options)
|
||||
@@ -670,9 +524,6 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy,
|
||||
case compaction_strategy_type::null:
|
||||
impl = make_shared<null_compaction_strategy>(null_compaction_strategy());
|
||||
break;
|
||||
case compaction_strategy_type::major:
|
||||
impl = make_shared<major_compaction_strategy>(major_compaction_strategy());
|
||||
break;
|
||||
case compaction_strategy_type::size_tiered:
|
||||
impl = make_shared<size_tiered_compaction_strategy>(size_tiered_compaction_strategy(options));
|
||||
break;
|
||||
|
||||
141
sstables/size_tiered_backlog_tracker.hh
Normal file
141
sstables/size_tiered_backlog_tracker.hh
Normal file
@@ -0,0 +1,141 @@
|
||||
/*
|
||||
* Copyright (C) 2018 ScyllaDB
|
||||
*/
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include "sstables/compaction_backlog_manager.hh"
|
||||
#include <cmath>
|
||||
#include <ctgmath>
|
||||
|
||||
// Backlog for one SSTable under STCS:
|
||||
//
|
||||
// (1) Bi = Ei * log4 (T / Si),
|
||||
//
|
||||
// where Ei is the effective size of the SStable, Si is the Size of this
|
||||
// SSTable, and T is the total size of the Table.
|
||||
//
|
||||
// To calculate the backlog, we can use the logarithm in any base, but we choose
|
||||
// 4 as that is the historical minimum for the number of SSTables being compacted
|
||||
// together. Although now that minimum could be lifted, this is still a good number
|
||||
// of SSTables to aim for in a compaction execution.
|
||||
//
|
||||
// T, the total table size, is defined as
|
||||
//
|
||||
// (2) T = Sum(i = 0...N) { Si }.
|
||||
//
|
||||
// Ei, the effective size, is defined as
|
||||
//
|
||||
// (3) Ei = Si - Ci,
|
||||
//
|
||||
// where Ci is the total amount of bytes already compacted for this table.
|
||||
// For tables that are not under compaction, Ci = 0 and Si = Ei.
|
||||
//
|
||||
// Using the fact that log(a / b) = log(a) - log(b), we rewrite (1) as:
|
||||
//
|
||||
// Bi = Ei log4(T) - Ei log4(Si)
|
||||
//
|
||||
// For the entire Table, the Aggregate Backlog (A) is
|
||||
//
|
||||
// A = Sum(i = 0...N) { Ei * log4(T) - Ei * log4(Si) },
|
||||
//
|
||||
// which can be expressed as a sum of a table component and a SSTable component:
|
||||
//
|
||||
// A = Sum(i = 0...N) { Ei } * log4(T) - Sum(i = 0...N) { Ei * log4(Si) },
|
||||
//
|
||||
// and if we define C = Sum(i = 0...N) { Ci }, then we can write
|
||||
//
|
||||
// A = (T - C) * log4(T) - Sum(i = 0...N) { (Si - Ci)* log4(Si) }.
|
||||
//
|
||||
// Because the SSTable number can be quite big, we'd like to keep iterations to a minimum.
|
||||
// We can do that if we rewrite the expression above one more time, yielding:
|
||||
//
|
||||
// (4) A = T * log4(T) - C * log4(T) - (Sum(i = 0...N) { Si * log4(Si) } - Sum(i = 0...N) { Ci * log4(Si) }
|
||||
//
|
||||
// When SSTables are added or removed, we update the static parts of the equation, and
|
||||
// every time we need to compute the backlog we use the most up-to-date estimate of Ci to
|
||||
// calculate the compacted parts, having to iterate only over the SSTables that are compacting,
|
||||
// instead of all of them.
|
||||
//
|
||||
// For SSTables that are being currently written, we assume that they are a full SSTable in a
|
||||
// certain point in time, whose size is the amount of bytes currently written. So all we need
|
||||
// to do is keep track of them too, and add the current estimate to the static part of (4).
|
||||
class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
int64_t _total_bytes = 0;
|
||||
double _sstables_backlog_contribution = 0.0f;
|
||||
|
||||
struct inflight_component {
|
||||
int64_t total_bytes = 0;
|
||||
double contribution = 0;
|
||||
};
|
||||
|
||||
inflight_component partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const {
|
||||
inflight_component in;
|
||||
for (auto& swp : ongoing_writes) {
|
||||
auto written = swp.second->written();
|
||||
if (written > 0) {
|
||||
in.total_bytes += written;
|
||||
in.contribution += written * log4(written);
|
||||
}
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const {
|
||||
inflight_component in;
|
||||
for (auto& crp : ongoing_compactions) {
|
||||
auto compacted = crp.second->compacted();
|
||||
in.total_bytes += compacted;
|
||||
in.contribution += compacted * log4((crp.first->data_size()));
|
||||
}
|
||||
return in;
|
||||
}
|
||||
double log4(double x) const {
|
||||
static constexpr double inv_log_4 = 1.0f / std::log(4);
|
||||
return log(x) * inv_log_4;
|
||||
}
|
||||
public:
|
||||
virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override {
|
||||
inflight_component partial = partial_backlog(ow);
|
||||
inflight_component compacted = compacted_backlog(oc);
|
||||
|
||||
auto total_bytes = _total_bytes + partial.total_bytes - compacted.total_bytes;
|
||||
if ((total_bytes <= 0)) {
|
||||
return 0;
|
||||
}
|
||||
auto sstables_contribution = _sstables_backlog_contribution + partial.contribution - compacted.contribution;
|
||||
auto b = (total_bytes * log4(total_bytes)) - sstables_contribution;
|
||||
return b > 0 ? b : 0;
|
||||
}
|
||||
|
||||
virtual void add_sstable(sstables::shared_sstable sst) override {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes += sst->data_size();
|
||||
_sstables_backlog_contribution += sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
|
||||
// Removing could be the result of a failure of an in progress write, successful finish of a
|
||||
// compaction, or some one-off operation, like drop
|
||||
virtual void remove_sstable(sstables::shared_sstable sst) override {
|
||||
if (sst->data_size() > 0) {
|
||||
_total_bytes -= sst->data_size();
|
||||
_sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size());
|
||||
}
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user