From ef7207592013e1985b8287ded6baeaf0ccd4eda8 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 28 May 2024 15:40:47 -0300 Subject: [PATCH] compaction: wire storage free space into reshape procedure After this, TWCS reshape procedure can be changed to limit job to 10% of available space. Signed-off-by: Raphael S. Carvalho (cherry picked from commit 0ce8ee03f188ee52ae82b502d15bc06c8a76c0e8) --- compaction/compaction_manager.cc | 15 ++++++--- compaction/compaction_strategy.cc | 13 ++++++-- compaction/compaction_strategy.hh | 5 ++- compaction/compaction_strategy_impl.hh | 2 +- compaction/compaction_strategy_type.hh | 8 +++++ compaction/leveled_compaction_strategy.cc | 5 +-- compaction/leveled_compaction_strategy.hh | 2 +- compaction/size_tiered_compaction_strategy.cc | 3 +- compaction/size_tiered_compaction_strategy.hh | 2 +- compaction/task_manager_module.cc | 8 ++++- compaction/time_window_compaction_strategy.cc | 5 +-- compaction/time_window_compaction_strategy.hh | 2 +- test/boost/sstable_compaction_test.cc | 33 ++++++++++++------- 13 files changed, 73 insertions(+), 30 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 568b9c8f9e..74037058d9 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1379,13 +1379,20 @@ private: })); }; - auto get_next_job = [&] () -> std::optional { - auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), sstables::reshape_mode::strict); - return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt; + auto get_next_job = [&] () -> future> { + auto candidates = get_reshape_candidates(); + if (candidates.empty()) { + co_return std::nullopt; + } + // all sstables added to maintenance set share the same underlying storage. + auto& storage = candidates.front()->get_storage(); + sstables::reshape_config cfg = co_await sstables::make_reshape_config(storage, sstables::reshape_mode::strict); + auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), cfg); + co_return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt; }; std::exception_ptr err; - while (auto desc = get_next_job()) { + while (auto desc = co_await get_next_job()) { auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), desc->sstables); auto on_replace = compacting.update_on_sstable_replacement(); diff --git a/compaction/compaction_strategy.cc b/compaction/compaction_strategy.cc index ea9a603894..2b26b66404 100644 --- a/compaction/compaction_strategy.cc +++ b/compaction/compaction_strategy.cc @@ -83,7 +83,7 @@ reader_consumer_v2 compaction_strategy_impl::make_interposer_consumer(const muta } compaction_descriptor -compaction_strategy_impl::get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const { +compaction_strategy_impl::get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const { return compaction_descriptor(); } @@ -728,8 +728,8 @@ compaction_backlog_tracker compaction_strategy::make_backlog_tracker() const { } sstables::compaction_descriptor -compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const { - return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode); +compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const { + return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, cfg); } uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const { @@ -767,6 +767,13 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, return compaction_strategy(std::move(impl)); } +future make_reshape_config(const sstables::storage& storage, reshape_mode mode) { + co_return sstables::reshape_config{ + .mode = mode, + .free_storage_space = co_await storage.free_space() / smp::count, + }; +} + } namespace compaction { diff --git a/compaction/compaction_strategy.hh b/compaction/compaction_strategy.hh index ee7bff882b..4aebb0b8d5 100644 --- a/compaction/compaction_strategy.hh +++ b/compaction/compaction_strategy.hh @@ -30,6 +30,7 @@ class compaction_strategy_impl; class sstable; class sstable_set; struct compaction_descriptor; +class storage; class compaction_strategy { ::shared_ptr _compaction_strategy_impl; @@ -121,11 +122,13 @@ public: // // The caller should also pass a maximum number of SSTables which is the maximum amount of // SSTables that can be added into a single job. - compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const; + compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const; }; // Creates a compaction_strategy object from one of the strategies available. compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map& options); +future make_reshape_config(const sstables::storage& storage, reshape_mode mode); + } diff --git a/compaction/compaction_strategy_impl.hh b/compaction/compaction_strategy_impl.hh index 7be2eb31e3..99dfc32a8d 100644 --- a/compaction/compaction_strategy_impl.hh +++ b/compaction/compaction_strategy_impl.hh @@ -76,6 +76,6 @@ public: return false; } - virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const; + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const; }; } diff --git a/compaction/compaction_strategy_type.hh b/compaction/compaction_strategy_type.hh index 6da2e75953..462d586f9f 100644 --- a/compaction/compaction_strategy_type.hh +++ b/compaction/compaction_strategy_type.hh @@ -8,6 +8,8 @@ #pragma once +#include + namespace sstables { enum class compaction_strategy_type { @@ -18,4 +20,10 @@ enum class compaction_strategy_type { }; enum class reshape_mode { strict, relaxed }; + +struct reshape_config { + reshape_mode mode; + const uint64_t free_storage_space; +}; + } diff --git a/compaction/leveled_compaction_strategy.cc b/compaction/leveled_compaction_strategy.cc index 3b64ff8596..6c25210dc2 100644 --- a/compaction/leveled_compaction_strategy.cc +++ b/compaction/leveled_compaction_strategy.cc @@ -146,7 +146,8 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(table_state& } compaction_descriptor -leveled_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const { +leveled_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const { + auto mode = cfg.mode; std::array, leveled_manifest::MAX_LEVELS> level_info; auto is_disjoint = [schema] (const std::vector& sstables, unsigned tolerance) -> std::tuple { @@ -203,7 +204,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector input if (level_info[0].size() > offstrategy_threshold) { size_tiered_compaction_strategy stcs(_stcs_options); - return stcs.get_reshaping_job(std::move(level_info[0]), schema, mode); + return stcs.get_reshaping_job(std::move(level_info[0]), schema, cfg); } for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) { diff --git a/compaction/leveled_compaction_strategy.hh b/compaction/leveled_compaction_strategy.hh index d38ed07e0b..3d38286b54 100644 --- a/compaction/leveled_compaction_strategy.hh +++ b/compaction/leveled_compaction_strategy.hh @@ -74,7 +74,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const override; - virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const override; + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const override; }; } diff --git a/compaction/size_tiered_compaction_strategy.cc b/compaction/size_tiered_compaction_strategy.cc index 0939d50933..3fbc189f77 100644 --- a/compaction/size_tiered_compaction_strategy.cc +++ b/compaction/size_tiered_compaction_strategy.cc @@ -298,8 +298,9 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector input, schema_ptr schema, reshape_mode mode) const +size_tiered_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const { + auto mode = cfg.mode; size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4); size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold)); diff --git a/compaction/size_tiered_compaction_strategy.hh b/compaction/size_tiered_compaction_strategy.hh index a2bc7f3d28..0e9f3dcb79 100644 --- a/compaction/size_tiered_compaction_strategy.hh +++ b/compaction/size_tiered_compaction_strategy.hh @@ -96,7 +96,7 @@ public: virtual std::unique_ptr make_backlog_tracker() const override; - virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const override; + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const override; friend class ::size_tiered_backlog_tracker; }; diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index a0be0bbf07..aaf69622d5 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -617,7 +617,13 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(compacti | boost::adaptors::filtered([&filter = _filter] (const auto& sst) { return filter(sst); })); - auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), _mode); + if (reshape_candidates.empty()) { + break; + } + // all sstables were found in the same sstable_directory instance, so they share the same underlying storage. + auto& storage = reshape_candidates.front()->get_storage(); + auto cfg = co_await sstables::make_reshape_config(storage, _mode); + auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), cfg); if (desc.sstables.empty()) { break; } diff --git a/compaction/time_window_compaction_strategy.cc b/compaction/time_window_compaction_strategy.cc index 009ee54a31..3a8bee7c87 100644 --- a/compaction/time_window_compaction_strategy.cc +++ b/compaction/time_window_compaction_strategy.cc @@ -226,7 +226,8 @@ reader_consumer_v2 time_window_compaction_strategy::make_interposer_consumer(con } compaction_descriptor -time_window_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const { +time_window_compaction_strategy::get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const { + auto mode = cfg.mode; std::vector single_window; std::vector multi_window; @@ -299,7 +300,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector i } // reuse STCS reshape logic which will only compact similar-sized files, to increase overall efficiency // when reshaping time buckets containing a huge amount of files - auto desc = size_tiered_compaction_strategy(_stcs_options).get_reshaping_job(std::move(ssts), schema, mode); + auto desc = size_tiered_compaction_strategy(_stcs_options).get_reshaping_job(std::move(ssts), schema, cfg); if (!desc.sstables.empty()) { return desc; } diff --git a/compaction/time_window_compaction_strategy.hh b/compaction/time_window_compaction_strategy.hh index 3689af0c72..8defc8616e 100644 --- a/compaction/time_window_compaction_strategy.hh +++ b/compaction/time_window_compaction_strategy.hh @@ -168,7 +168,7 @@ public: return true; } - virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_mode mode) const override; + virtual compaction_descriptor get_reshaping_job(std::vector input, schema_ptr schema, reshape_config cfg) const override; }; } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index f28b102511..49427c36b7 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3499,6 +3499,15 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) { }); } +static compaction_descriptor get_reshaping_job(sstables::compaction_strategy& cs, const std::vector& input, + const schema_ptr& s, reshape_mode mode) { + reshape_config cfg { + .mode = mode, + .free_storage_space = std::numeric_limits::max(), + }; + return cs.get_reshaping_job(input, s, cfg); +} + SEASTAR_TEST_CASE(stcs_reshape_test) { return test_env::do_with_async([] (test_env& env) { simple_schema ss; @@ -3516,8 +3525,8 @@ SEASTAR_TEST_CASE(stcs_reshape_test) { auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, s->compaction_strategy_options()); - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size()); - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::relaxed).sstables.size()); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size()); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::relaxed).sstables.size()); }); } @@ -3539,7 +3548,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == 256); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == 256); } // all overlapping { @@ -3551,7 +3560,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold())); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold())); } // single sstable { @@ -3559,7 +3568,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) { auto key = keys[0].key(); sstables::test(sst).set_values_for_leveled_strategy(1 /* size */, 0 /* level */, 0 /* max ts */, key, key); - BOOST_REQUIRE(cs.get_reshaping_job({ sst }, s, reshape_mode::strict).sstables.size() == 0); + BOOST_REQUIRE(get_reshaping_job(cs, { sst }, s, reshape_mode::strict).sstables.size() == 0); } }); } @@ -3776,7 +3785,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), disjoint_sstable_count); + BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), disjoint_sstable_count); } { @@ -3789,7 +3798,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) { sstables.push_back(std::move(sst)); } - auto reshaping_count = cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(); + auto reshaping_count = get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(); BOOST_REQUIRE_GE(reshaping_count, disjoint_sstable_count - min_threshold + 1); BOOST_REQUIRE_LE(reshaping_count, disjoint_sstable_count); } @@ -3807,7 +3816,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), 0); + BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), 0); } { @@ -3820,7 +3829,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), uint64_t(s->max_compaction_threshold())); + BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), uint64_t(s->max_compaction_threshold())); } { @@ -3855,7 +3864,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) { } auto check_mode_correctness = [&] (reshape_mode mode) { - auto ret = cs.get_reshaping_job(sstables, s, mode); + auto ret = get_reshaping_job(cs, sstables, s, mode); BOOST_REQUIRE_EQUAL(ret.sstables.size(), uint64_t(s->max_compaction_threshold())); // fail if any file doesn't belong to set of small files bool has_big_sized_files = boost::algorithm::any_of(ret.sstables, [&] (const sstables::shared_sstable& sst) { @@ -3909,7 +3918,7 @@ SEASTAR_TEST_CASE(stcs_reshape_overlapping_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == disjoint_sstable_count); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == disjoint_sstable_count); } { @@ -3922,7 +3931,7 @@ SEASTAR_TEST_CASE(stcs_reshape_overlapping_test) { sstables.push_back(std::move(sst)); } - BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold())); + BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold())); } }); }