diff --git a/compaction/compaction.cc b/compaction/compaction.cc index e3efbbe08d..bcf177c34d 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -344,10 +344,8 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { return _last_position_seen; } - void remove_sstable(bool is_tracking) { - if (is_tracking && _sst) { - _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst); - } else if (_sst) { + void remove_sstable() { + if (_sst) { _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst); } _sst = {}; @@ -368,30 +366,24 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { }; virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override { - _generated_monitors.emplace_back(std::move(sst), _cf); - return _generated_monitors.back(); + auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _cf)); + return p.first->second; } explicit compaction_read_monitor_generator(column_family& cf) : _cf(cf) {} - void remove_sstables(bool is_tracking) { - for (auto& rm : _generated_monitors) { - rm.remove_sstable(is_tracking); - } - } - - void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) { - for (auto& rm : _generated_monitors) { - if (rm._sst == sst) { - rm.remove_sstable(is_tracking); - break; + void remove_exhausted_sstables(const std::vector& exhausted_sstables) { + for (auto& sst : exhausted_sstables) { + auto it = _generated_monitors.find(sst->generation()); + if (it != _generated_monitors.end()) { + it->second.remove_sstable(); } } } private: column_family& _cf; - std::deque _generated_monitors; + std::unordered_map _generated_monitors; }; // Writes a temporary sstable run containing only garbage collected data. @@ -751,8 +743,6 @@ private: std::chrono::duration_cast(duration).count(), pretty_printed_throughput(_info->end_size, duration), _info->total_partitions, _info->total_keys_written); - backlog_tracker_adjust_charges(); - auto info = std::move(_info); _cf.get_compaction_manager().deregister_compaction(info); return std::move(*info); @@ -760,7 +750,6 @@ private: virtual std::string_view report_start_desc() const = 0; virtual std::string_view report_finish_desc() const = 0; - virtual void backlog_tracker_adjust_charges() { }; std::function max_purgeable_func() { if (!tombstone_expiration_enabled()) { @@ -962,7 +951,6 @@ public: class regular_compaction : public compaction { // sstable being currently written. mutable compaction_read_monitor_generator _monitor_generator; - std::vector _unused_sstables = {}; public: regular_compaction(column_family& cf, compaction_descriptor descriptor) : compaction(cf, std::move(descriptor)) @@ -990,19 +978,9 @@ public: return "Compacted"; } - void backlog_tracker_adjust_charges() override { - _monitor_generator.remove_sstables(_info->tracking); - auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); - for (auto& sst : _unused_sstables) { - tracker.add_sstable(sst); - } - _unused_sstables.clear(); - } - virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { auto sst = _sstable_creator(this_shard_id()); setup_new_sstable(sst); - _unused_sstables.push_back(sst); auto monitor = std::make_unique(sst, _cf, maximum_timestamp(), _sstable_level); sstable_writer_config cfg = make_sstable_writer_config(_info->type); @@ -1031,26 +1009,6 @@ public: _monitor_generator(std::move(sstable)); } private: - void backlog_tracker_incrementally_adjust_charges(std::vector exhausted_sstables) { - // - // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach. - // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the - // new sstables, which replaced the exhausted ones, are not partially written sstables and they can - // be added to tracker like any other regular sstable in the table's set. - // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's - // an early sstable replacement. - // - - for (auto& sst : exhausted_sstables) { - _monitor_generator.remove_sstable(_info->tracking, sst); - } - auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); - for (auto& sst : _unused_sstables) { - tracker.add_sstable(sst); - } - _unused_sstables.clear(); - } - void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) { // Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs, // meaning incremental compaction is disabled for this compaction. @@ -1083,7 +1041,7 @@ private: auto exhausted_ssts = std::vector(exhausted, _sstables.end()); _replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables))); _sstables.erase(exhausted, _sstables.end()); - backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts)); + _monitor_generator.remove_exhausted_sstables(exhausted_ssts); } } @@ -1564,8 +1522,6 @@ public: return "Resharded"; } - void backlog_tracker_adjust_charges() override { } - compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { auto shard = dht::shard_of(*_schema, dk.token()); auto sst = _sstable_creator(shard); diff --git a/compaction/compaction.hh b/compaction/compaction.hh index d6fabcd8da..5241cd6740 100644 --- a/compaction/compaction.hh +++ b/compaction/compaction.hh @@ -66,7 +66,6 @@ namespace sstables { int64_t ended_at; std::vector new_sstables; sstring stop_requested; - bool tracking = true; utils::UUID run_identifier; utils::UUID compaction_uuid; struct replacement { @@ -82,10 +81,6 @@ namespace sstables { void stop(sstring reason) { stop_requested = std::move(reason); } - - void stop_tracking() { - tracking = false; - } }; // Compact a list of N sstables into M sstables. diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f50e339b62..9e7d1e3690 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -940,14 +940,6 @@ future<> compaction_manager::remove(column_family* cf) { }); } -void compaction_manager::stop_tracking_ongoing_compactions(column_family* cf) { - for (auto& info : _compactions) { - if (info->cf == cf) { - info->stop_tracking(); - } - } -} - void compaction_manager::stop_compaction(sstring type) { sstables::compaction_type target_type; try { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index aab63a7746..ed58ecb95d 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -241,10 +241,6 @@ public: // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf); - // No longer interested in tracking backlog for compactions in this column - // family. For instance, we could be ALTERing TABLE to a different strategy. - void stop_tracking_ongoing_compactions(column_family* cf); - const stats& get_stats() const { return _stats; } diff --git a/database.hh b/database.hh index f45d65392e..fc1bb14668 100644 --- a/database.hh +++ b/database.hh @@ -546,6 +546,8 @@ private: void add_maintenance_sstable(sstables::shared_sstable sst); static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable); static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable); + // Update compaction backlog tracker with the same changes applied to the underlying sstable set. + void backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables); lw_shared_ptr new_memtable(); future try_flush_memtable_to_sstable(lw_shared_ptr memt, sstable_write_permit&& permit); // Caller must keep m alive. diff --git a/table.cc b/table.cc index a45b649ae1..6c76748cd0 100644 --- a/table.cc +++ b/table.cc @@ -337,6 +337,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke tracker.remove_sstable(std::move(sstable)); } +void table::backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables) { + auto& tracker = _compaction_strategy.get_backlog_tracker(); + for (auto& sst : new_sstables) { + tracker.add_sstable(sst); + } + for (auto& sst : old_sstables) { + tracker.remove_sstable(sst); + } +} + lw_shared_ptr table::do_add_sstable(lw_shared_ptr sstables, sstables::shared_sstable sstable, enable_backlog_tracker backlog_tracker) { @@ -784,6 +794,8 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) { return std::make_unique(t, std::move(permit), old_maintenance, new_main); @@ -855,6 +867,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) { virtual void execute() override { _t._main_sstables = std::move(_new_sstables); _t.refresh_compound_sstable_set(); + _t.backlog_tracker_adjust_charges(_desc.old_sstables, _desc.new_sstables); } static std::unique_ptr make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) { return std::make_unique(t, std::move(permit), d); @@ -1037,10 +1050,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) new_sstables.insert(s); }); - if (!move_read_charges) { - _compaction_manager.stop_tracking_ongoing_compactions(this); - } - // now exception safe: _compaction_strategy = std::move(new_cs); _main_sstables = std::move(new_sstables); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index ed941ca2db..50b4d35d38 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -2945,11 +2945,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) { }); } -SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { +SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) { return test_env::do_with_async([] (test_env& env) { cell_locker_stats cl_stats; - auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction") + auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy") .with_column("id", utf8_type, column_kind::partition_key) .with_column("value", int32_type); auto s = builder.build(); @@ -2991,15 +2991,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen); - bool stopped_tracking = false; - for (auto& info : cf._data->cm.get_compactions()) { - if (info->cf == &*cf) { - info->stop_tracking(); - stopped_tracking = true; - } - } - BOOST_REQUIRE(stopped_tracking); - + // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker. cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window); for (auto& sst : ssts) { cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); @@ -3007,7 +2999,6 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { auto ret = fut.get0(); BOOST_REQUIRE(ret.new_sstables.size() == 1); - BOOST_REQUIRE(ret.tracking == false); } // triggers code that iterates through registered compactions. cf._data->cm.backlog();