compaction: wire storage free space into reshape procedure

After this, TWCS reshape procedure can be changed to limit job
to 10% of available space.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
(cherry picked from commit 0ce8ee03f1)
This commit is contained in:
Raphael S. Carvalho
2024-05-28 15:40:47 -03:00
committed by Mergify
parent 37f1af2646
commit ef72075920
13 changed files with 73 additions and 30 deletions

View File

@@ -1379,13 +1379,20 @@ private:
}));
};
auto get_next_job = [&] () -> std::optional<sstables::compaction_descriptor> {
auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), sstables::reshape_mode::strict);
return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
auto get_next_job = [&] () -> future<std::optional<sstables::compaction_descriptor>> {
auto candidates = get_reshape_candidates();
if (candidates.empty()) {
co_return std::nullopt;
}
// all sstables added to maintenance set share the same underlying storage.
auto& storage = candidates.front()->get_storage();
sstables::reshape_config cfg = co_await sstables::make_reshape_config(storage, sstables::reshape_mode::strict);
auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), cfg);
co_return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
};
std::exception_ptr err;
while (auto desc = get_next_job()) {
while (auto desc = co_await get_next_job()) {
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), desc->sstables);
auto on_replace = compacting.update_on_sstable_replacement();

View File

@@ -83,7 +83,7 @@ reader_consumer_v2 compaction_strategy_impl::make_interposer_consumer(const muta
}
compaction_descriptor
compaction_strategy_impl::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const {
compaction_strategy_impl::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
return compaction_descriptor();
}
@@ -728,8 +728,8 @@ compaction_backlog_tracker compaction_strategy::make_backlog_tracker() const {
}
sstables::compaction_descriptor
compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const {
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, mode);
compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
return _compaction_strategy_impl->get_reshaping_job(std::move(input), schema, cfg);
}
uint64_t compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr schema) const {
@@ -767,6 +767,13 @@ compaction_strategy make_compaction_strategy(compaction_strategy_type strategy,
return compaction_strategy(std::move(impl));
}
future<reshape_config> make_reshape_config(const sstables::storage& storage, reshape_mode mode) {
co_return sstables::reshape_config{
.mode = mode,
.free_storage_space = co_await storage.free_space() / smp::count,
};
}
}
namespace compaction {

View File

@@ -30,6 +30,7 @@ class compaction_strategy_impl;
class sstable;
class sstable_set;
struct compaction_descriptor;
class storage;
class compaction_strategy {
::shared_ptr<compaction_strategy_impl> _compaction_strategy_impl;
@@ -121,11 +122,13 @@ public:
//
// The caller should also pass a maximum number of SSTables which is the maximum amount of
// SSTables that can be added into a single job.
compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const;
compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const;
};
// Creates a compaction_strategy object from one of the strategies available.
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options);
future<reshape_config> make_reshape_config(const sstables::storage& storage, reshape_mode mode);
}

View File

@@ -76,6 +76,6 @@ public:
return false;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const;
};
}

View File

@@ -8,6 +8,8 @@
#pragma once
#include <cstdint>
namespace sstables {
enum class compaction_strategy_type {
@@ -18,4 +20,10 @@ enum class compaction_strategy_type {
};
enum class reshape_mode { strict, relaxed };
struct reshape_config {
reshape_mode mode;
const uint64_t free_storage_space;
};
}

View File

@@ -146,7 +146,8 @@ int64_t leveled_compaction_strategy::estimated_pending_compactions(table_state&
}
compaction_descriptor
leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const {
leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
auto mode = cfg.mode;
std::array<std::vector<shared_sstable>, leveled_manifest::MAX_LEVELS> level_info;
auto is_disjoint = [schema] (const std::vector<shared_sstable>& sstables, unsigned tolerance) -> std::tuple<bool, unsigned> {
@@ -203,7 +204,7 @@ leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input
if (level_info[0].size() > offstrategy_threshold) {
size_tiered_compaction_strategy stcs(_stcs_options);
return stcs.get_reshaping_job(std::move(level_info[0]), schema, mode);
return stcs.get_reshaping_job(std::move(level_info[0]), schema, cfg);
}
for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) {

View File

@@ -74,7 +74,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const override;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const override;
};
}

View File

@@ -298,8 +298,9 @@ size_tiered_compaction_strategy::most_interesting_bucket(const std::vector<sstab
}
compaction_descriptor
size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const
size_tiered_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const
{
auto mode = cfg.mode;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));

View File

@@ -96,7 +96,7 @@ public:
virtual std::unique_ptr<compaction_backlog_tracker::impl> make_backlog_tracker() const override;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const override;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const override;
friend class ::size_tiered_backlog_tracker;
};

View File

@@ -617,7 +617,13 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(compacti
| boost::adaptors::filtered([&filter = _filter] (const auto& sst) {
return filter(sst);
}));
auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), _mode);
if (reshape_candidates.empty()) {
break;
}
// all sstables were found in the same sstable_directory instance, so they share the same underlying storage.
auto& storage = reshape_candidates.front()->get_storage();
auto cfg = co_await sstables::make_reshape_config(storage, _mode);
auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), cfg);
if (desc.sstables.empty()) {
break;
}

View File

@@ -226,7 +226,8 @@ reader_consumer_v2 time_window_compaction_strategy::make_interposer_consumer(con
}
compaction_descriptor
time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const {
time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
auto mode = cfg.mode;
std::vector<shared_sstable> single_window;
std::vector<shared_sstable> multi_window;
@@ -299,7 +300,7 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
}
// reuse STCS reshape logic which will only compact similar-sized files, to increase overall efficiency
// when reshaping time buckets containing a huge amount of files
auto desc = size_tiered_compaction_strategy(_stcs_options).get_reshaping_job(std::move(ssts), schema, mode);
auto desc = size_tiered_compaction_strategy(_stcs_options).get_reshaping_job(std::move(ssts), schema, cfg);
if (!desc.sstables.empty()) {
return desc;
}

View File

@@ -168,7 +168,7 @@ public:
return true;
}
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_mode mode) const override;
virtual compaction_descriptor get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, reshape_config cfg) const override;
};
}

View File

@@ -3499,6 +3499,15 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
});
}
static compaction_descriptor get_reshaping_job(sstables::compaction_strategy& cs, const std::vector<shared_sstable>& input,
const schema_ptr& s, reshape_mode mode) {
reshape_config cfg {
.mode = mode,
.free_storage_space = std::numeric_limits<uint64_t>::max(),
};
return cs.get_reshaping_job(input, s, cfg);
}
SEASTAR_TEST_CASE(stcs_reshape_test) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
@@ -3516,8 +3525,8 @@ SEASTAR_TEST_CASE(stcs_reshape_test) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered,
s->compaction_strategy_options());
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size());
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::relaxed).sstables.size());
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size());
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::relaxed).sstables.size());
});
}
@@ -3539,7 +3548,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == 256);
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == 256);
}
// all overlapping
{
@@ -3551,7 +3560,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold()));
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold()));
}
// single sstable
{
@@ -3559,7 +3568,7 @@ SEASTAR_TEST_CASE(lcs_reshape_test) {
auto key = keys[0].key();
sstables::test(sst).set_values_for_leveled_strategy(1 /* size */, 0 /* level */, 0 /* max ts */, key, key);
BOOST_REQUIRE(cs.get_reshaping_job({ sst }, s, reshape_mode::strict).sstables.size() == 0);
BOOST_REQUIRE(get_reshaping_job(cs, { sst }, s, reshape_mode::strict).sstables.size() == 0);
}
});
}
@@ -3776,7 +3785,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), disjoint_sstable_count);
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), disjoint_sstable_count);
}
{
@@ -3789,7 +3798,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
sstables.push_back(std::move(sst));
}
auto reshaping_count = cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size();
auto reshaping_count = get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size();
BOOST_REQUIRE_GE(reshaping_count, disjoint_sstable_count - min_threshold + 1);
BOOST_REQUIRE_LE(reshaping_count, disjoint_sstable_count);
}
@@ -3807,7 +3816,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), 0);
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), 0);
}
{
@@ -3820,7 +3829,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE_EQUAL(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size(), uint64_t(s->max_compaction_threshold()));
BOOST_REQUIRE_EQUAL(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size(), uint64_t(s->max_compaction_threshold()));
}
{
@@ -3855,7 +3864,7 @@ SEASTAR_TEST_CASE(twcs_reshape_with_disjoint_set_test) {
}
auto check_mode_correctness = [&] (reshape_mode mode) {
auto ret = cs.get_reshaping_job(sstables, s, mode);
auto ret = get_reshaping_job(cs, sstables, s, mode);
BOOST_REQUIRE_EQUAL(ret.sstables.size(), uint64_t(s->max_compaction_threshold()));
// fail if any file doesn't belong to set of small files
bool has_big_sized_files = boost::algorithm::any_of(ret.sstables, [&] (const sstables::shared_sstable& sst) {
@@ -3909,7 +3918,7 @@ SEASTAR_TEST_CASE(stcs_reshape_overlapping_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == disjoint_sstable_count);
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == disjoint_sstable_count);
}
{
@@ -3922,7 +3931,7 @@ SEASTAR_TEST_CASE(stcs_reshape_overlapping_test) {
sstables.push_back(std::move(sst));
}
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold()));
BOOST_REQUIRE(get_reshaping_job(cs, sstables, s, reshape_mode::strict).sstables.size() == uint64_t(s->max_compaction_threshold()));
}
});
}