diff --git a/database.hh b/database.hh index f3f16dfe32..0bf251ea86 100644 --- a/database.hh +++ b/database.hh @@ -120,6 +120,7 @@ namespace sstables { class sstable; class entry_descriptor; class compaction_descriptor; +class compaction_completion_desc; class foreign_sstable_open_info; class sstables_manager; @@ -608,8 +609,7 @@ private: const std::vector& old_sstables); // Rebuilds the sstable set right away and schedule deletion of old sstables. - void on_compaction_completion(const std::vector& new_sstables, - const std::vector& sstables_to_remove); + void on_compaction_completion(sstables::compaction_completion_desc& desc); void rebuild_statistics(); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 43d565a93e..a9a78a994a 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -829,7 +829,7 @@ private: _compacting->erase(sst); }); auto exhausted_ssts = std::vector(exhausted, _sstables.end()); - _replacer(exhausted_ssts, std::move(_new_unused_sstables)); + _replacer(compaction_completion_desc{exhausted_ssts, std::move(_new_unused_sstables)}); _sstables.erase(exhausted, _sstables.end()); backlog_tracker_incrementally_adjust_charges(std::move(exhausted_ssts)); } @@ -839,7 +839,7 @@ private: if (!_sstables.empty()) { std::vector sstables_compacted; std::move(_sstables.begin(), _sstables.end(), std::back_inserter(sstables_compacted)); - _replacer(std::move(sstables_compacted), std::move(_new_unused_sstables)); + _replacer(compaction_completion_desc{std::move(sstables_compacted), std::move(_new_unused_sstables)}); } } diff --git a/sstables/compaction.hh b/sstables/compaction.hh index a8627474a3..5df2dc59be 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -189,8 +189,13 @@ namespace sstables { } }; + struct compaction_completion_desc { + std::vector input_sstables; + std::vector output_sstables; + }; + // Replaces old sstable(s) by new one(s) which contain all non-expired data. - using replacer_fn = std::function removed, std::vector added)>; + using replacer_fn = std::function; // Compact a list of N sstables into M sstables. // Returns info about the finished compaction, which includes vector to new sstables. diff --git a/table.cc b/table.cc index bb735b955a..e982006935 100644 --- a/table.cc +++ b/table.cc @@ -1174,8 +1174,7 @@ table::rebuild_sstable_list(const std::vector& new_sst // Note: must run in a seastar thread void -table::on_compaction_completion(const std::vector& new_sstables, - const std::vector& sstables_to_remove) { +table::on_compaction_completion(sstables::compaction_completion_desc& desc) { // Build a new list of _sstables: We remove from the existing list the // tables we compacted (by now, there might be more sstables flushed // later), and we add the new tables generated by the compaction. @@ -1188,7 +1187,7 @@ table::on_compaction_completion(const std::vector& new // unbounded time, because all shards must agree on the deletion). // make sure all old sstables belong *ONLY* to current shard before we proceed to their deletion. - for (auto& sst : sstables_to_remove) { + for (auto& sst : desc.input_sstables) { auto shards = sst->get_shards_for_this_sstable(); if (shards.size() > 1) { throw std::runtime_error(format("A regular compaction for {}.{} INCORRECTLY used shared sstable {}. Only resharding work with those!", @@ -1202,9 +1201,9 @@ table::on_compaction_completion(const std::vector& new auto new_compacted_but_not_deleted = _sstables_compacted_but_not_deleted; // rebuilding _sstables_compacted_but_not_deleted first to make the entire rebuild operation exception safe. - new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), sstables_to_remove.begin(), sstables_to_remove.end()); + new_compacted_but_not_deleted.insert(new_compacted_but_not_deleted.end(), desc.input_sstables.begin(), desc.input_sstables.end()); - rebuild_sstable_list(new_sstables, sstables_to_remove); + rebuild_sstable_list(desc.output_sstables, desc.input_sstables); // refresh underlying data source in row cache to prevent it from holding reference // to sstables files that are about to be deleted. @@ -1214,7 +1213,7 @@ table::on_compaction_completion(const std::vector& new rebuild_statistics(); - auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove] { + auto f = seastar::with_gate(_sstable_deletion_gate, [this, sstables_to_remove = desc.input_sstables] { return with_semaphore(_sstable_deletion_sem, 1, [sstables_to_remove = std::move(sstables_to_remove)] { return sstables::delete_atomically(std::move(sstables_to_remove)); }); @@ -1232,7 +1231,7 @@ table::on_compaction_completion(const std::vector& new // or they could stay forever in the set, resulting in deleted files remaining // opened and disk space not being released until shutdown. std::unordered_set s( - sstables_to_remove.begin(), sstables_to_remove.end()); + desc.input_sstables.begin(), desc.input_sstables.end()); auto e = boost::range::remove_if(_sstables_compacted_but_not_deleted, [&] (sstables::shared_sstable sst) -> bool { return s.count(sst); }); @@ -1289,13 +1288,12 @@ table::compact_sstables(sstables::compaction_descriptor descriptor) { sst->set_unshared(); return sst; }; - auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (std::vector old_ssts, - std::vector new_ssts) { - _compaction_strategy.notify_completion(old_ssts, new_ssts); - _compaction_manager.propagate_replacement(this, old_ssts, new_ssts); - this->on_compaction_completion(new_ssts, old_ssts); + auto replace_sstables = [this, release_exhausted = descriptor.release_exhausted] (sstables::compaction_completion_desc desc) { + _compaction_strategy.notify_completion(desc.input_sstables, desc.output_sstables); + _compaction_manager.propagate_replacement(this, desc.input_sstables, desc.output_sstables); + this->on_compaction_completion(desc); if (release_exhausted) { - release_exhausted(old_ssts); + release_exhausted(desc.input_sstables); } }; diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 0eac2ff07c..4223c2e35f 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -5094,7 +5094,9 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) { | boost::adaptors::transformed([] (auto& sst) { return sst->generation(); })); auto expected_sst = sstable_run.begin(); auto closed_sstables_tracker = sstable_run.begin(); - auto replacer = [&] (auto old_sstables, auto new_sstables) { + auto replacer = [&] (sstables::compaction_completion_desc desc) { + auto old_sstables = std::move(desc.input_sstables); + auto new_sstables = std::move(desc.output_sstables); BOOST_REQUIRE(expected_sst != sstable_run.end()); if (incremental_enabled) { do_incremental_replace(std::move(old_sstables), std::move(new_sstables), expected_sst, closed_sstables_tracker); @@ -5627,8 +5629,9 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) { cf->add_sstable_and_update_cache(expired_sst).get(); BOOST_REQUIRE(is_partition_dead(alpha)); - auto replacer = [&] (std::vector old_sstables, std::vector new_sstables) { - + auto replacer = [&] (sstables::compaction_completion_desc desc) { + auto old_sstables = std::move(desc.input_sstables); + auto new_sstables = std::move(desc.output_sstables); // expired_sst is exhausted, and new sstable is written with mut 2. BOOST_REQUIRE(old_sstables.size() == 1); BOOST_REQUIRE(old_sstables.front() == expired_sst); diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index 7c8a002b95..3d60c7a3b3 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -226,7 +226,7 @@ public: }; inline auto replacer_fn_no_op() { - return [](std::vector removed, std::vector added) -> void {}; + return [](sstables::compaction_completion_desc desc) -> void {}; } inline sstring get_test_dir(const sstring& name, const sstring& ks, const sstring& cf)