diff --git a/compaction/compaction_backlog_manager.hh b/compaction/compaction_backlog_manager.hh index 82c368e264..b996f7bc28 100644 --- a/compaction/compaction_backlog_manager.hh +++ b/compaction/compaction_backlog_manager.hh @@ -60,8 +60,7 @@ public: using ongoing_compactions = std::unordered_map; struct impl { - virtual void add_sstable(sstables::shared_sstable sst) = 0; - virtual void remove_sstable(sstables::shared_sstable sst) = 0; + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) = 0; virtual double backlog(const ongoing_writes& ow, const ongoing_compactions& oc) const = 0; virtual ~impl() { } }; @@ -72,22 +71,21 @@ public: ~compaction_backlog_tracker(); double backlog() const; - void add_sstable(sstables::shared_sstable sst); - void remove_sstable(sstables::shared_sstable sst); + void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts); void register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp); void register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp); void transfer_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges = true); void revert_charges(sstables::shared_sstable sst); -private: - // Returns true if this SSTable can be added or removed from the tracker. - bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst); void disable() { - _disabled = true; + _impl = {}; _ongoing_writes = {}; _ongoing_compactions = {}; } - bool _disabled = false; +private: + // Returns true if this SSTable can be added or removed from the tracker. + bool sstable_belongs_to_tracker(const sstables::shared_sstable& sst); + bool disabled() const noexcept { return !_impl; } std::unique_ptr _impl; // We keep track of this so that we can transfer to a new tracker if the compaction strategy is // changed in the middle of a compaction. diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 03d5bf8487..1d7d5fda5b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -237,8 +237,7 @@ private: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return _added_backlog * _available_memory; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) { @@ -480,6 +479,8 @@ void compaction_manager::register_metrics() { sm::description("Holds the number of compaction tasks waiting for an opportunity to run.")), sm::make_gauge("backlog", [this] { return _last_backlog; }, sm::description("Holds the sum of compaction backlog for all tables in the system.")), + sm::make_gauge("normalized_backlog", [this] { return _last_backlog / _available_memory; }, + sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")), }); } @@ -1145,30 +1146,26 @@ compaction::strategy_control& compaction_manager::get_strategy_control() const n } double compaction_backlog_tracker::backlog() const { - return _disabled ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions); + return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions); } -void compaction_backlog_tracker::add_sstable(sstables::shared_sstable sst) { - if (_disabled || !sstable_belongs_to_tracker(sst)) { +void compaction_backlog_tracker::replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) { + if (disabled()) { return; } - _ongoing_writes.erase(sst); - try { - _impl->add_sstable(std::move(sst)); - } catch (...) { - cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception()); - disable(); - } -} + auto filter_and_revert_charges = [this] (const std::vector& ssts) { + std::vector ret; + for (auto& sst : ssts) { + if (sstable_belongs_to_tracker(sst)) { + revert_charges(sst); + ret.push_back(sst); + } + } + return ret; + }; -void compaction_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { - if (_disabled || !sstable_belongs_to_tracker(sst)) { - return; - } - - _ongoing_compactions.erase(sst); try { - _impl->remove_sstable(std::move(sst)); + _impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts)); } catch (...) { cmlog.warn("Disabling backlog tracker due to exception {}", std::current_exception()); disable(); @@ -1180,7 +1177,7 @@ bool compaction_backlog_tracker::sstable_belongs_to_tracker(const sstables::shar } void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) { - if (_disabled) { + if (disabled()) { return; } try { @@ -1195,7 +1192,7 @@ void compaction_backlog_tracker::register_partially_written_sstable(sstables::sh } void compaction_backlog_tracker::register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp) { - if (_disabled) { + if (disabled()) { return; } diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index 764082f996..cfafa4bcdf 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -91,23 +91,16 @@ compaction_strategy_impl::compaction_strategy_impl(const std::mapwritten(); - if (written > 0) { - in.total_bytes += written; - in.contribution += written * log4(written); - } - } - return in; -} - size_tiered_backlog_tracker::inflight_component size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const { inflight_component in; for (auto const& crp : ongoing_compactions) { + // A SSTable being compacted may not contribute to backlog if compaction strategy decided + // to perform a low-efficiency compaction when system is under little load, or when user + // performs major even though strategy is completely satisfied + if (!_sstables_contributing_backlog.contains(crp.first)) { + continue; + } auto compacted = crp.second->compacted(); in.total_bytes += compacted; in.contribution += compacted * log4(crp.first->data_size()); @@ -115,34 +108,75 @@ size_tiered_backlog_tracker::compacted_backlog(const compaction_backlog_tracker: return in; } +void size_tiered_backlog_tracker::refresh_sstables_backlog_contribution() { + _sstables_backlog_contribution = 0.0f; + _sstables_contributing_backlog = {}; + if (_all.empty()) { + return; + } + using namespace sstables; + + // Deduce threshold from the last SSTable added to the set + // Low-efficiency jobs, which fan-in is smaller than min-threshold, will not have backlog accounted. + // That's because they can only run when system is under little load, and accounting them would result + // in efficient jobs acting more aggressive than they really have to. + // TODO: potentially switch to compaction manager's fan-in threshold, so to account for the dynamic + // fan-in threshold behavior. + const auto& newest_sst = std::ranges::max(_all, {}, std::mem_fn(&sstable::generation)); + auto threshold = newest_sst->get_schema()->min_compaction_threshold(); + + for (auto& bucket : size_tiered_compaction_strategy::get_buckets(boost::copy_range>(_all), _stcs_options)) { + if (!size_tiered_compaction_strategy::is_bucket_interesting(bucket, threshold)) { + continue; + } + _sstables_backlog_contribution += boost::accumulate(bucket | boost::adaptors::transformed([this] (const shared_sstable& sst) -> double { + return sst->data_size() * log4(sst->data_size()); + }), double(0.0f)); + // Controller is disabled if exception is caught during add / remove calls, so not making any effort to make this exception safe + _sstables_contributing_backlog.insert(bucket.begin(), bucket.end()); + } +} + double size_tiered_backlog_tracker::backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const { - inflight_component partial = partial_backlog(ow); inflight_component compacted = compacted_backlog(oc); - auto effective_total_size = _total_bytes + partial.total_bytes - compacted.total_bytes; - if ((effective_total_size <= 0)) { + auto total_backlog_bytes = boost::accumulate(_sstables_contributing_backlog | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::data_size)), uint64_t(0)); + + // Bail out if effective backlog is zero, which happens in a small window where ongoing compaction exhausted + // input files but is still sealing output files or doing managerial stuff like updating history table + if (total_backlog_bytes <= compacted.total_bytes) { return 0; } - if (_total_bytes == 0) { - return 0; - } - auto sstables_contribution = _sstables_backlog_contribution + partial.contribution - compacted.contribution; - auto b = (effective_total_size * log4(_total_bytes)) - sstables_contribution; + + // Formula for each SSTable is (Si - Ci) * log(T / Si) + // Which can be rewritten as: ((Si - Ci) * log(T)) - ((Si - Ci) * log(Si)) + // + // For the meaning of each variable, please refer to the doc in size_tiered_backlog_tracker.hh + + // Sum of (Si - Ci) for all SSTables contributing backlog + auto effective_backlog_bytes = total_backlog_bytes - compacted.total_bytes; + + // Sum of (Si - Ci) * log (Si) for all SSTables contributing backlog + auto sstables_contribution = _sstables_backlog_contribution - compacted.contribution; + // This is subtracting ((Si - Ci) * log (Si)) from ((Si - Ci) * log(T)), yielding the final backlog + auto b = (effective_backlog_bytes * log4(_total_bytes)) - sstables_contribution; return b > 0 ? b : 0; } -void size_tiered_backlog_tracker::add_sstable(sstables::shared_sstable sst) { - if (sst->data_size() > 0) { - _total_bytes += sst->data_size(); - _sstables_backlog_contribution += sst->data_size() * log4(sst->data_size()); +void size_tiered_backlog_tracker::replace_sstables(std::vector old_ssts, std::vector new_ssts) { + for (auto& sst : old_ssts) { + if (sst->data_size() > 0) { + _total_bytes -= sst->data_size(); + _all.erase(sst); + } } -} - -void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { - if (sst->data_size() > 0) { - _total_bytes -= sst->data_size(); - _sstables_backlog_contribution -= sst->data_size() * log4(sst->data_size()); + for (auto& sst : new_ssts) { + if (sst->data_size() > 0) { + _total_bytes += sst->data_size(); + _all.insert(std::move(sst)); + } } + refresh_sstables_backlog_contribution(); } namespace sstables { @@ -159,6 +193,7 @@ extern logging::logger clogger; // a new object for the partial write at this time. class time_window_backlog_tracker final : public compaction_backlog_tracker::impl { time_window_compaction_strategy_options _twcs_options; + size_tiered_compaction_strategy_options _stcs_options; std::unordered_map _windows; api::timestamp_type lower_bound_of(api::timestamp_type timestamp) const { @@ -166,8 +201,9 @@ class time_window_backlog_tracker final : public compaction_backlog_tracker::imp return time_window_compaction_strategy::get_window_lower_bound(_twcs_options.sstable_window_size, ts); } public: - time_window_backlog_tracker(time_window_compaction_strategy_options options) - : _twcs_options(options) + time_window_backlog_tracker(time_window_compaction_strategy_options twcs_options, size_tiered_compaction_strategy_options stcs_options) + : _twcs_options(twcs_options) + , _stcs_options(stcs_options) {} virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { @@ -213,23 +249,39 @@ public: // Partial writes that don't belong to any window are accounted here. for (auto& current : writes_per_window) { - b += size_tiered_backlog_tracker().backlog(current.second, no_oc); + b += size_tiered_backlog_tracker(_stcs_options).backlog(current.second, no_oc); } return b; } - virtual void add_sstable(sstables::shared_sstable sst) override { - auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - _windows[bound].add_sstable(sst); - } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + struct replacement { + std::vector old_ssts; + std::vector new_ssts; + }; + std::unordered_map per_window_replacement; - virtual void remove_sstable(sstables::shared_sstable sst) override { - auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); - auto it = _windows.find(bound); - if (it != _windows.end()) { - it->second.remove_sstable(sst); - if (it->second.total_bytes() <= 0) { - _windows.erase(it); + for (auto& sst : new_ssts) { + auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); + if (_windows.contains(bound)) { + _windows.emplace(bound, size_tiered_backlog_tracker(_stcs_options)); + } + per_window_replacement[bound].new_ssts.push_back(std::move(sst)); + } + for (auto& sst : old_ssts) { + auto bound = lower_bound_of(sst->get_stats_metadata().max_timestamp); + if (_windows.contains(bound)) { + per_window_replacement[bound].old_ssts.push_back(std::move(sst)); + } + } + + for (auto& [bound, r] : per_window_replacement) { + // All windows must exist here, as windows are created for new files and will + // remain alive as long as there's a single file in them + auto& w = _windows.at(bound); + w.replace_sstables(std::move(r.old_ssts), std::move(r.new_ssts)); + if (w.total_bytes() <= 0) { + _windows.erase(bound); } } } @@ -242,8 +294,9 @@ class leveled_compaction_backlog_tracker final : public compaction_backlog_track std::vector _size_per_level; uint64_t _max_sstable_size; public: - leveled_compaction_backlog_tracker(int32_t max_sstable_size_in_mb) - : _size_per_level(leveled_manifest::MAX_LEVELS, uint64_t(0)) + leveled_compaction_backlog_tracker(int32_t max_sstable_size_in_mb, size_tiered_compaction_strategy_options stcs_options) + : _l0_scts(stcs_options) + , _size_per_level(leveled_manifest::MAX_LEVELS, uint64_t(0)) , _max_sstable_size(max_sstable_size_in_mb * 1024 * 1024) {} @@ -288,20 +341,23 @@ public: return b; } - virtual void add_sstable(sstables::shared_sstable sst) override { - auto level = sst->get_sstable_level(); - _size_per_level[level] += sst->data_size(); - if (level == 0) { - _l0_scts.add_sstable(sst); + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override { + std::vector l0_old_ssts, l0_new_ssts; + for (auto& sst : new_ssts) { + auto level = sst->get_sstable_level(); + _size_per_level[level] += sst->data_size(); + if (level == 0) { + l0_new_ssts.push_back(std::move(sst)); + } } - } - - virtual void remove_sstable(sstables::shared_sstable sst) override { - auto level = sst->get_sstable_level(); - _size_per_level[level] -= sst->data_size(); - if (level == 0) { - _l0_scts.remove_sstable(sst); + for (auto& sst : old_ssts) { + auto level = sst->get_sstable_level(); + _size_per_level[level] -= sst->data_size(); + if (level == 0) { + l0_old_ssts.push_back(std::move(sst)); + } } + _l0_scts.replace_sstables(std::move(l0_old_ssts), std::move(l0_new_ssts)); } }; @@ -309,16 +365,14 @@ struct unimplemented_backlog_tracker final : public compaction_backlog_tracker:: virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return compaction_controller::disable_backlog; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; struct null_backlog_tracker final : public compaction_backlog_tracker::impl { virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return 0; } - virtual void add_sstable(sstables::shared_sstable sst) override { } - virtual void remove_sstable(sstables::shared_sstable sst) override { } + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; // Just so that if we have more than one CF with NullStrategy, we don't create a lot @@ -356,7 +410,7 @@ leveled_compaction_strategy::leveled_compaction_strategy(const std::map(_max_sstable_size_in_mb)) + , _backlog_tracker(std::make_unique(_max_sstable_size_in_mb, _stcs_options)) { _compaction_counter.resize(leveled_manifest::MAX_LEVELS); } @@ -380,7 +434,7 @@ time_window_compaction_strategy::time_window_compaction_strategy(const std::map< : compaction_strategy_impl(options) , _options(options) , _stcs_options(options) - , _backlog_tracker(std::make_unique(_options)) + , _backlog_tracker(std::make_unique(_options, _stcs_options)) { if (!options.contains(TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.contains(TOMBSTONE_THRESHOLD_OPTION)) { _disable_tombstone_compaction = true; @@ -582,12 +636,12 @@ compaction_descriptor date_tiered_compaction_strategy::get_sstables_for_compacti size_tiered_compaction_strategy::size_tiered_compaction_strategy(const std::map& options) : compaction_strategy_impl(options) , _options(options) - , _backlog_tracker(std::make_unique()) + , _backlog_tracker(std::make_unique(_options)) {} size_tiered_compaction_strategy::size_tiered_compaction_strategy(const size_tiered_compaction_strategy_options& options) : _options(options) - , _backlog_tracker(std::make_unique()) + , _backlog_tracker(std::make_unique(_options)) {} compaction_strategy::compaction_strategy(::shared_ptr impl) diff --git a/compaction/size_tiered_backlog_tracker.hh b/compaction/size_tiered_backlog_tracker.hh index 67ca3aa005..3165f3bf64 100644 --- a/compaction/size_tiered_backlog_tracker.hh +++ b/compaction/size_tiered_backlog_tracker.hh @@ -7,6 +7,7 @@ #pragma once #include "compaction_backlog_manager.hh" +#include "size_tiered_compaction_strategy.hh" #include #include @@ -63,30 +64,33 @@ // certain point in time, whose size is the amount of bytes currently written. So all we need // to do is keep track of them too, and add the current estimate to the static part of (4). class size_tiered_backlog_tracker final : public compaction_backlog_tracker::impl { + sstables::size_tiered_compaction_strategy_options _stcs_options; int64_t _total_bytes = 0; double _sstables_backlog_contribution = 0.0f; + std::unordered_set _sstables_contributing_backlog; + std::unordered_set _all; struct inflight_component { - int64_t total_bytes = 0; + uint64_t total_bytes = 0; double contribution = 0; }; - inflight_component partial_backlog(const compaction_backlog_tracker::ongoing_writes& ongoing_writes) const; - inflight_component compacted_backlog(const compaction_backlog_tracker::ongoing_compactions& ongoing_compactions) const; double log4(double x) const { double inv_log_4 = 1.0f / std::log(4); return log(x) * inv_log_4; } -public: - virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override; - virtual void add_sstable(sstables::shared_sstable sst) override; + void refresh_sstables_backlog_contribution(); +public: + size_tiered_backlog_tracker(sstables::size_tiered_compaction_strategy_options stcs_options) : _stcs_options(stcs_options) {} + + virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override; // Removing could be the result of a failure of an in progress write, successful finish of a // compaction, or some one-off operation, like drop - virtual void remove_sstable(sstables::shared_sstable sst) override; + virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override; int64_t total_bytes() const { return _total_bytes; diff --git a/compaction/size_tiered_compaction_strategy.hh b/compaction/size_tiered_compaction_strategy.hh index c0d5e0bc46..84b8571cc4 100644 --- a/compaction/size_tiered_compaction_strategy.hh +++ b/compaction/size_tiered_compaction_strategy.hh @@ -13,6 +13,8 @@ #include "sstables/sstables.hh" #include +class size_tiered_backlog_tracker; + namespace sstables { class size_tiered_compaction_strategy_options { @@ -106,7 +108,7 @@ class size_tiered_compaction_strategy : public compaction_strategy_impl { return n / sstables.size(); } - bool is_bucket_interesting(const std::vector& bucket, int min_threshold) const { + static bool is_bucket_interesting(const std::vector& bucket, int min_threshold) { return bucket.size() >= size_t(min_threshold); } @@ -142,6 +144,7 @@ public: virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) override; + friend class ::size_tiered_backlog_tracker; }; } diff --git a/replica/table.cc b/replica/table.cc index 3fd6e427e7..828f13f052 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -368,21 +368,16 @@ void table::update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable) no } inline void table::add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { - tracker.add_sstable(std::move(sstable)); + tracker.replace_sstables({}, {std::move(sstable)}); } inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable) { - tracker.remove_sstable(std::move(sstable)); + tracker.replace_sstables({std::move(sstable)}, {}); } void table::backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables) { auto& tracker = _compaction_strategy.get_backlog_tracker(); - for (auto& sst : new_sstables) { - tracker.add_sstable(sst); - } - for (auto& sst : old_sstables) { - tracker.remove_sstable(sst); - } + tracker.replace_sstables(old_sstables, new_sstables); } lw_shared_ptr @@ -755,6 +750,7 @@ table::stop() { _sstables = make_compound_sstable_set(); _sstables_staging.clear(); })).then([this] { + _compaction_strategy.get_backlog_tracker().disable(); _cache.refresh_snapshot(); }); }); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index f46b07dc4f..4e42288db6 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3209,7 +3209,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy }; for (auto& sst : ssts) { - cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); + cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst}); } // Start compaction, then stop tracking compaction, switch to TWCS, wait for compaction to finish and check for backlog. @@ -3220,7 +3220,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker. cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window); for (auto& sst : ssts) { - cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); + cf->get_compaction_strategy().get_backlog_tracker().replace_sstables({}, {sst}); } auto ret = fut.get0(); @@ -4811,3 +4811,110 @@ SEASTAR_TEST_CASE(test_major_does_not_miss_data_in_memtable) { .produces_end_of_stream(); }); } + +SEASTAR_TEST_CASE(simple_backlog_controller_test) { + return test_env::do_with_async([] (test_env& env) { + ///////////// + // settings + static constexpr float disk_memory_ratio = 78.125; /* AWS I3en is ~78.125 */ + static constexpr uint64_t available_memory_per_shard = 8'000'000'000; /* AWS I3en */ + static constexpr float target_disk_usage = 0.50; + + const uint64_t available_disk_size_per_shard = disk_memory_ratio * available_memory_per_shard; + const uint64_t available_memory = available_memory_per_shard * 0.92; /* 8% is reserved for the OS */ + const uint64_t estimated_flush_size = double(available_memory) * 0.05; /* flush threshold is 5% of available shard mem */ + const uint64_t all_tables_disk_usage = double(available_disk_size_per_shard) * target_disk_usage; + + auto as = abort_source(); + compaction_manager::compaction_scheduling_group csg = { default_scheduling_group(), default_priority_class() }; + compaction_manager::maintenance_scheduling_group msg = { default_scheduling_group(), default_priority_class() }; + auto manager = compaction_manager(csg, msg, available_memory, as); + + auto add_sstable = [&env, &manager, gen = make_lw_shared(1)] (replica::table& t, uint64_t data_size) { + auto sst = env.make_sstable(t.schema(), "", (*gen)++, la, big); + auto key = make_local_key(t.schema()); + sstables::test(sst).set_values_for_leveled_strategy(data_size, 0 /*level*/, 0 /*max ts*/, key, key); + assert(sst->data_size() == data_size); + auto backlog_before = t.get_compaction_strategy().get_backlog_tracker().backlog(); + t.add_sstable_and_update_cache(sst).get(); + testlog.debug("\tNew sstable of size={}; Backlog diff={};", + sstables::pretty_printed_data_size(data_size), + t.get_compaction_strategy().get_backlog_tracker().backlog() - backlog_before); + }; + + auto tracker = make_lw_shared(); + cell_locker_stats cl_stats; + auto create_table = [&] () { + simple_schema ss; + auto s = ss.schema(); + + replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore()); + cfg.datadir = ""; + cfg.enable_disk_writes = true; + cfg.enable_cache = false; + auto t = make_lw_shared(s, cfg, replica::table::no_commitlog(), manager, cl_stats, *tracker); + t->mark_ready_for_writes(); + t->start(); + t->set_compaction_strategy(sstables::compaction_strategy_type::size_tiered); + return t; + }; + + auto get_size_for_tier = [&] (int tier) -> uint64_t { + return std::pow(4, tier) * estimated_flush_size; + }; + auto get_total_tiers = [&] (uint64_t target_size) -> unsigned { + double inv_log_4 = 1.0f / std::log(4); + return std::ceil(std::log(double(target_size) / estimated_flush_size) * inv_log_4); + }; + auto normalize_backlog = [&] (double backlog) -> double { + return backlog / available_memory; + }; + + struct result { + unsigned table_count; + uint64_t per_table_max_disk_usage; + double normalized_backlog; + }; + std::vector results; + + std::vector target_table_count_s = { 1, 2, 5, 10, 20 }; + for (auto target_table_count : target_table_count_s) { + const uint64_t per_table_max_disk_usage = std::ceil(all_tables_disk_usage / target_table_count); + + testlog.info("Creating tables, with max size={}", sstables::pretty_printed_data_size(per_table_max_disk_usage)); + + std::vector> tables; + uint64_t tables_total_size = 0; + + for (uint64_t t_idx = 0, available_space = all_tables_disk_usage; available_space >= estimated_flush_size; t_idx++) { + auto target_disk_usage = std::min(available_space, per_table_max_disk_usage); + auto tiers = get_total_tiers(target_disk_usage); + + auto t = create_table(); + for (auto tier_idx = 0; tier_idx < tiers; tier_idx++) { + auto tier_size = get_size_for_tier(tier_idx); + if (tier_size > available_space) { + break; + } + add_sstable(*t, tier_size); + available_space -= std::min(available_space, uint64_t(tier_size)); + } + + auto table_size = t->get_stats().live_disk_space_used; + testlog.debug("T{}: {} tiers, with total size={}", t_idx, tiers, sstables::pretty_printed_data_size(table_size)); + tables.push_back(t); + tables_total_size += table_size; + } + testlog.debug("Created {} tables, with total size={}", tables.size(), sstables::pretty_printed_data_size(tables_total_size)); + results.push_back(result{ tables.size(), per_table_max_disk_usage, normalize_backlog(manager.backlog()) }); + for (auto& t : tables) { + t->stop().get(); + } + } + for (auto& r : results) { + testlog.info("Tables={} with max size={} -> NormalizedBacklog={}", r.table_count, sstables::pretty_printed_data_size(r.per_table_max_disk_usage), r.normalized_backlog); + // Expect 0 backlog as tiers are all perfectly compacted + BOOST_REQUIRE(r.normalized_backlog == 0.0f); + } + }); +} diff --git a/test/lib/sstable_utils.hh b/test/lib/sstable_utils.hh index d4cba44b6f..3c0ac76d4a 100644 --- a/test/lib/sstable_utils.hh +++ b/test/lib/sstable_utils.hh @@ -198,6 +198,7 @@ public: // Used to create synthetic sstables for testing leveled compaction strategy. void set_values_for_leveled_strategy(uint64_t fake_data_size, uint32_t sstable_level, int64_t max_timestamp, sstring first_key, sstring last_key) { _sst->_data_file_size = fake_data_size; + _sst->_bytes_on_disk = fake_data_size; // Create a synthetic stats metadata stats_metadata stats = {}; // leveled strategy sorts sstables by age using max_timestamp, let's set it to 0.