From 35050a82171bea8d26a67ec24641060a6363dbd2 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 20 Sep 2021 15:06:37 -0300 Subject: [PATCH 1/4] compaction: simplify removal of monitors by switching to unordered_map, removal of generated monitors is made easier. this is a preparatory change for patch which will remove monitor for all exhausted sstables Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index e3efbbe08d..3379226fbf 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -368,30 +368,28 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { }; virtual sstables::read_monitor& operator()(sstables::shared_sstable sst) override { - _generated_monitors.emplace_back(std::move(sst), _cf); - return _generated_monitors.back(); + auto p = _generated_monitors.emplace(sst->generation(), compaction_read_monitor(sst, _cf)); + return p.first->second; } explicit compaction_read_monitor_generator(column_family& cf) : _cf(cf) {} void remove_sstables(bool is_tracking) { - for (auto& rm : _generated_monitors) { + for (auto& rm : _generated_monitors | boost::adaptors::map_values) { rm.remove_sstable(is_tracking); } } void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) { - for (auto& rm : _generated_monitors) { - if (rm._sst == sst) { - rm.remove_sstable(is_tracking); - break; - } + auto it = _generated_monitors.find(sst->generation()); + if (it != _generated_monitors.end()) { + it->second.remove_sstable(is_tracking); } } private: column_family& _cf; - std::deque _generated_monitors; + std::unordered_map _generated_monitors; }; // Writes a temporary sstable run containing only garbage collected data. From 05126cfe29616e354ee03867415377bd19dedbfa Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 27 Sep 2021 14:01:40 -0300 Subject: [PATCH 2/4] compaction: introduce compaction_read_monitor_generator::remove_exhausted_sstables() This new function makes it easier to remove monitor of exhausted sstables. Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 3379226fbf..374ee9c30a 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -381,10 +381,12 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { } } - void remove_sstable(bool is_tracking, sstables::shared_sstable& sst) { - auto it = _generated_monitors.find(sst->generation()); - if (it != _generated_monitors.end()) { - it->second.remove_sstable(is_tracking); + void remove_exhausted_sstables(bool is_tracking, const std::vector& exhausted_sstables) { + for (auto& sst : exhausted_sstables) { + auto it = _generated_monitors.find(sst->generation()); + if (it != _generated_monitors.end()) { + it->second.remove_sstable(is_tracking); + } } } private: @@ -1039,9 +1041,7 @@ private: // an early sstable replacement. // - for (auto& sst : exhausted_sstables) { - _monitor_generator.remove_sstable(_info->tracking, sst); - } + _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables); auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); for (auto& sst : _unused_sstables) { tracker.add_sstable(sst); From afd45b9f4948c30981d201f48abcf2c7a448f938 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 27 Sep 2021 14:02:46 -0300 Subject: [PATCH 3/4] compaction: Don't leak backlog of input sstable when compaction strategy is changed The generic backlog formula is: ALL + PARTIAL - COMPACTING With transfer_ongoing_charges() we already ignore the effect of ongoing compactions on COMPACTING as we judge them to be pointless. But ongoing compactions will run to completion, meaning that output sstables will be added to ALL anyway, in the formula above. With stop_tracking_ongoing_compactions(), input sstables are never removed from the tracker, but output sstables are added, which means we end up with duplicate backlog in the tracker. By removing this tracking mechanism, pointless ongoing compaction will be ignored as expected and the leaks will be fixed. Later, the intention is to force a stop on ongoing compactions if strategy has changed as they're pointless anyway. Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 18 ++++++++---------- compaction/compaction.hh | 5 ----- compaction/compaction_manager.cc | 8 -------- compaction/compaction_manager.hh | 4 ---- table.cc | 4 ---- test/boost/sstable_compaction_test.cc | 15 +++------------ 6 files changed, 11 insertions(+), 43 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index 374ee9c30a..ac2bc688b8 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -344,11 +344,9 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { return _last_position_seen; } - void remove_sstable(bool is_tracking) { - if (is_tracking && _sst) { + void remove_sstable() { + if (_sst) { _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst); - } else if (_sst) { - _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst); } _sst = {}; } @@ -375,17 +373,17 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { explicit compaction_read_monitor_generator(column_family& cf) : _cf(cf) {} - void remove_sstables(bool is_tracking) { + void remove_sstables() { for (auto& rm : _generated_monitors | boost::adaptors::map_values) { - rm.remove_sstable(is_tracking); + rm.remove_sstable(); } } - void remove_exhausted_sstables(bool is_tracking, const std::vector& exhausted_sstables) { + void remove_exhausted_sstables(const std::vector& exhausted_sstables) { for (auto& sst : exhausted_sstables) { auto it = _generated_monitors.find(sst->generation()); if (it != _generated_monitors.end()) { - it->second.remove_sstable(is_tracking); + it->second.remove_sstable(); } } } @@ -991,7 +989,7 @@ public: } void backlog_tracker_adjust_charges() override { - _monitor_generator.remove_sstables(_info->tracking); + _monitor_generator.remove_sstables(); auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); for (auto& sst : _unused_sstables) { tracker.add_sstable(sst); @@ -1041,7 +1039,7 @@ private: // an early sstable replacement. // - _monitor_generator.remove_exhausted_sstables(_info->tracking, exhausted_sstables); + _monitor_generator.remove_exhausted_sstables(exhausted_sstables); auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); for (auto& sst : _unused_sstables) { tracker.add_sstable(sst); diff --git a/compaction/compaction.hh b/compaction/compaction.hh index d6fabcd8da..5241cd6740 100644 --- a/compaction/compaction.hh +++ b/compaction/compaction.hh @@ -66,7 +66,6 @@ namespace sstables { int64_t ended_at; std::vector new_sstables; sstring stop_requested; - bool tracking = true; utils::UUID run_identifier; utils::UUID compaction_uuid; struct replacement { @@ -82,10 +81,6 @@ namespace sstables { void stop(sstring reason) { stop_requested = std::move(reason); } - - void stop_tracking() { - tracking = false; - } }; // Compact a list of N sstables into M sstables. diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f50e339b62..9e7d1e3690 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -940,14 +940,6 @@ future<> compaction_manager::remove(column_family* cf) { }); } -void compaction_manager::stop_tracking_ongoing_compactions(column_family* cf) { - for (auto& info : _compactions) { - if (info->cf == cf) { - info->stop_tracking(); - } - } -} - void compaction_manager::stop_compaction(sstring type) { sstables::compaction_type target_type; try { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index aab63a7746..ed58ecb95d 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -241,10 +241,6 @@ public: // Cancel requests on cf and wait for a possible ongoing compaction on cf. future<> remove(column_family* cf); - // No longer interested in tracking backlog for compactions in this column - // family. For instance, we could be ALTERing TABLE to a different strategy. - void stop_tracking_ongoing_compactions(column_family* cf); - const stats& get_stats() const { return _stats; } diff --git a/table.cc b/table.cc index a45b649ae1..ccfd5ac251 100644 --- a/table.cc +++ b/table.cc @@ -1037,10 +1037,6 @@ void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) new_sstables.insert(s); }); - if (!move_read_charges) { - _compaction_manager.stop_tracking_ongoing_compactions(this); - } - // now exception safe: _compaction_strategy = std::move(new_cs); _main_sstables = std::move(new_sstables); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index ed941ca2db..50b4d35d38 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -2945,11 +2945,11 @@ SEASTAR_TEST_CASE(compaction_strategy_aware_major_compaction_test) { }); } -SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { +SEASTAR_TEST_CASE(backlog_tracker_correctness_after_changing_compaction_strategy) { return test_env::do_with_async([] (test_env& env) { cell_locker_stats cl_stats; - auto builder = schema_builder("tests", "backlog_correctness_after_stop_tracking_compaction") + auto builder = schema_builder("tests", "backlog_tracker_correctness_after_changing_compaction_strategy") .with_column("id", utf8_type, column_kind::partition_key) .with_column("value", int32_type); auto s = builder.build(); @@ -2991,15 +2991,7 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { auto fut = compact_sstables(sstables::compaction_descriptor(ssts, cf->get_sstable_set(), default_priority_class()), *cf, sst_gen); - bool stopped_tracking = false; - for (auto& info : cf._data->cm.get_compactions()) { - if (info->cf == &*cf) { - info->stop_tracking(); - stopped_tracking = true; - } - } - BOOST_REQUIRE(stopped_tracking); - + // set_compaction_strategy() itself is responsible for transferring charges from old to new backlog tracker. cf->set_compaction_strategy(sstables::compaction_strategy_type::time_window); for (auto& sst : ssts) { cf->get_compaction_strategy().get_backlog_tracker().add_sstable(sst); @@ -3007,7 +2999,6 @@ SEASTAR_TEST_CASE(backlog_tracker_correctness_after_stop_tracking_compaction) { auto ret = fut.get0(); BOOST_REQUIRE(ret.new_sstables.size() == 1); - BOOST_REQUIRE(ret.tracking == false); } // triggers code that iterates through registered compactions. cf._data->cm.backlog(); From 97181735988934497592b8410381122e90e5ea6b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 27 Sep 2021 14:04:46 -0300 Subject: [PATCH 4/4] compaction: Update backlog tracker correctly when schema is updated Currently the following can happen: 1) there's ongoing compaction with input sstable A, so sstable set and backlog tracker both contains A. 2) ongoing compaction replaces input sstable A by B, so sstable set contains only B now. 3) schema is updated, so a new backlog tracker is built without A because sstable set now contains only B. 4) ongoing compaction tries to remove A from tracker, but it was excluded in step 3. 5) tracker can now have a negative value if table is decreasing in size, which leads to log() == -NaN This problem happens because backlog tracker updates are decoupled from sstable set updates. Given that the essential content of backlog tracker should be the same as one of sstable set, let's move tracker management to table. Whenever sstable set is updated, backlog tracker will be updated with the same changes, making their management less error prone. Fixes #9157 Signed-off-by: Raphael S. Carvalho --- compaction/compaction.cc | 44 ++-------------------------------------- database.hh | 2 ++ table.cc | 13 ++++++++++++ 3 files changed, 17 insertions(+), 42 deletions(-) diff --git a/compaction/compaction.cc b/compaction/compaction.cc index ac2bc688b8..bcf177c34d 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -346,7 +346,7 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { void remove_sstable() { if (_sst) { - _cf.get_compaction_strategy().get_backlog_tracker().remove_sstable(_sst); + _cf.get_compaction_strategy().get_backlog_tracker().revert_charges(_sst); } _sst = {}; } @@ -373,12 +373,6 @@ struct compaction_read_monitor_generator final : public read_monitor_generator { explicit compaction_read_monitor_generator(column_family& cf) : _cf(cf) {} - void remove_sstables() { - for (auto& rm : _generated_monitors | boost::adaptors::map_values) { - rm.remove_sstable(); - } - } - void remove_exhausted_sstables(const std::vector& exhausted_sstables) { for (auto& sst : exhausted_sstables) { auto it = _generated_monitors.find(sst->generation()); @@ -749,8 +743,6 @@ private: std::chrono::duration_cast(duration).count(), pretty_printed_throughput(_info->end_size, duration), _info->total_partitions, _info->total_keys_written); - backlog_tracker_adjust_charges(); - auto info = std::move(_info); _cf.get_compaction_manager().deregister_compaction(info); return std::move(*info); @@ -758,7 +750,6 @@ private: virtual std::string_view report_start_desc() const = 0; virtual std::string_view report_finish_desc() const = 0; - virtual void backlog_tracker_adjust_charges() { }; std::function max_purgeable_func() { if (!tombstone_expiration_enabled()) { @@ -960,7 +951,6 @@ public: class regular_compaction : public compaction { // sstable being currently written. mutable compaction_read_monitor_generator _monitor_generator; - std::vector _unused_sstables = {}; public: regular_compaction(column_family& cf, compaction_descriptor descriptor) : compaction(cf, std::move(descriptor)) @@ -988,19 +978,9 @@ public: return "Compacted"; } - void backlog_tracker_adjust_charges() override { - _monitor_generator.remove_sstables(); - auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); - for (auto& sst : _unused_sstables) { - tracker.add_sstable(sst); - } - _unused_sstables.clear(); - } - virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { auto sst = _sstable_creator(this_shard_id()); setup_new_sstable(sst); - _unused_sstables.push_back(sst); auto monitor = std::make_unique(sst, _cf, maximum_timestamp(), _sstable_level); sstable_writer_config cfg = make_sstable_writer_config(_info->type); @@ -1029,24 +1009,6 @@ public: _monitor_generator(std::move(sstable)); } private: - void backlog_tracker_incrementally_adjust_charges(std::vector exhausted_sstables) { - // - // Notify backlog tracker of an early sstable replacement triggered by incremental compaction approach. - // Backlog tracker will be told that the exhausted sstables aren't being compacted anymore, and the - // new sstables, which replaced the exhausted ones, are not partially written sstables and they can - // be added to tracker like any other regular sstable in the table's set. - // This way we prevent bogus calculation of backlog due to lack of charge adjustment whenever there's - // an early sstable replacement. - // - - _monitor_generator.remove_exhausted_sstables(exhausted_sstables); - auto& tracker = _cf.get_compaction_strategy().get_backlog_tracker(); - for (auto& sst : _unused_sstables) { - tracker.add_sstable(sst); - } - _unused_sstables.clear(); - } - void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) { // Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs, // meaning incremental compaction is disabled for this compaction. @@ -1079,7 +1041,7 @@ private: auto exhausted_ssts = std::vector(exhausted, _sstables.end()); _replacer(get_compaction_completion_desc(exhausted_ssts, std::move(_new_unused_sstables))); _sstables.erase(exhausted, _sstables.end()); - backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts)); + _monitor_generator.remove_exhausted_sstables(exhausted_ssts); } } @@ -1560,8 +1522,6 @@ public: return "Resharded"; } - void backlog_tracker_adjust_charges() override { } - compaction_writer create_compaction_writer(const dht::decorated_key& dk) override { auto shard = dht::shard_of(*_schema, dk.token()); auto sst = _sstable_creator(shard); diff --git a/database.hh b/database.hh index f45d65392e..fc1bb14668 100644 --- a/database.hh +++ b/database.hh @@ -546,6 +546,8 @@ private: void add_maintenance_sstable(sstables::shared_sstable sst); static void add_sstable_to_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable); static void remove_sstable_from_backlog_tracker(compaction_backlog_tracker& tracker, sstables::shared_sstable sstable); + // Update compaction backlog tracker with the same changes applied to the underlying sstable set. + void backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables); lw_shared_ptr new_memtable(); future try_flush_memtable_to_sstable(lw_shared_ptr memt, sstable_write_permit&& permit); // Caller must keep m alive. diff --git a/table.cc b/table.cc index ccfd5ac251..6c76748cd0 100644 --- a/table.cc +++ b/table.cc @@ -337,6 +337,16 @@ inline void table::remove_sstable_from_backlog_tracker(compaction_backlog_tracke tracker.remove_sstable(std::move(sstable)); } +void table::backlog_tracker_adjust_charges(const std::vector& old_sstables, const std::vector& new_sstables) { + auto& tracker = _compaction_strategy.get_backlog_tracker(); + for (auto& sst : new_sstables) { + tracker.add_sstable(sst); + } + for (auto& sst : old_sstables) { + tracker.remove_sstable(sst); + } +} + lw_shared_ptr table::do_add_sstable(lw_shared_ptr sstables, sstables::shared_sstable sstable, enable_backlog_tracker backlog_tracker) { @@ -784,6 +794,8 @@ table::update_sstable_lists_on_off_strategy_completion(const std::vector make(table& t, sstable_list_builder::permit_t permit, const sstables_t& old_maintenance, const sstables_t& new_main) { return std::make_unique(t, std::move(permit), old_maintenance, new_main); @@ -855,6 +867,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc& desc) { virtual void execute() override { _t._main_sstables = std::move(_new_sstables); _t.refresh_compound_sstable_set(); + _t.backlog_tracker_adjust_charges(_desc.old_sstables, _desc.new_sstables); } static std::unique_ptr make(table& t, sstable_list_builder::permit_t permit, sstables::compaction_completion_desc& d) { return std::make_unique(t, std::move(permit), d);