compaction: resharding_compaction: add vnodes_resharding option

In this mode, the output sstables generated by resharding
compaction are segregated by token range, based on the keyspace
vnode-based owned token ranges vector.

A basic unit test was also added to sstable_directory_test.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2026-02-16 13:10:30 +02:00
committed by Nikos Dragazis
parent d153a95943
commit d1c6141407
8 changed files with 120 additions and 16 deletions

View File

@@ -44,6 +44,7 @@
#include "dht/partition_filter.hh"
#include "mutation_writer/shard_based_splitting_writer.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "mutation_writer/token_group_based_splitting_writer.hh"
#include "mutation/mutation_source_metadata.hh"
#include "mutation/mutation_fragment_stream_validator.hh"
#include "utils/assert.hh"
@@ -1933,6 +1934,7 @@ class resharding_compaction final : public compaction {
};
std::vector<estimated_values> _estimation_per_shard;
std::vector<sstables::run_id> _run_identifiers;
bool _reshard_vnodes;
private:
// return estimated partitions per sstable for a given shard
uint64_t partitions_per_sstable(shard_id s) const {
@@ -1945,7 +1947,11 @@ public:
: compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no)
, _estimation_per_shard(smp::count)
, _run_identifiers(smp::count)
, _reshard_vnodes(descriptor.options.as<compaction_type_options::reshard>().vnodes_resharding)
{
if (_reshard_vnodes && !_owned_ranges) {
on_internal_error(clogger, "Resharding vnodes requires owned_ranges");
}
for (auto& sst : _sstables) {
const auto& shards = sst->get_shards_for_this_sstable();
auto size = sst->bytes_on_disk();
@@ -1983,8 +1989,25 @@ public:
}
mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override {
return [end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> {
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
auto owned_ranges = _reshard_vnodes ? _owned_ranges : nullptr;
return [end_consumer = std::move(end_consumer), owned_ranges = std::move(owned_ranges)] (mutation_reader reader) mutable -> future<> {
if (owned_ranges) {
auto classify = [owned_ranges, it = owned_ranges->begin(), idx = mutation_writer::token_group_id(0)] (dht::token t) mutable -> mutation_writer::token_group_id {
dht::token_comparator cmp;
while (it != owned_ranges->end() && it->after(t, cmp)) {
clogger.debug("Token {} is after current range {}: advancing to the next range", t, *it);
++it;
++idx;
}
if (it == owned_ranges->end() || !it->contains(t, cmp)) {
on_internal_error(clogger, fmt::format("Token {} is outside of owned ranges", t));
}
return idx;
};
return mutation_writer::segregate_by_token_group(std::move(reader), std::move(classify), std::move(end_consumer));
} else {
return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer));
}
};
}

View File

@@ -87,6 +87,8 @@ public:
drop_unfixable_sstables drop_unfixable = drop_unfixable_sstables::no;
};
struct reshard {
// If set, resharding compaction will apply the owned_ranges to segregate sstables in vnode boundaries.
bool vnodes_resharding = false;
};
struct reshape {
};
@@ -115,8 +117,8 @@ public:
return compaction_type_options(reshape{});
}
static compaction_type_options make_reshard() {
return compaction_type_options(reshard{});
static compaction_type_options make_reshard(bool vnodes_resharding = false) {
return compaction_type_options(reshard{.vnodes_resharding = vnodes_resharding});
}
static compaction_type_options make_regular() {

View File

@@ -132,7 +132,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, tasks::task_info parent_info)
compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
@@ -166,7 +166,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
compaction_descriptor desc(sstlist);
desc.options = compaction_type_options::make_reshard();
desc.options = compaction_type_options::make_reshard(vnodes_resharding);
desc.creator = creator;
desc.sharder = &erm->get_sharder(*table.schema());
desc.owned_ranges = owned_ranges_ptr;
@@ -906,7 +906,7 @@ future<> table_resharding_compaction_task_impl::run() {
if (_owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
}
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), destinations);
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), _vnodes_resharding, destinations);
co_await task->done();
}));
@@ -926,12 +926,14 @@ shard_resharding_compaction_task_impl::shard_resharding_compaction_task_impl(tas
replica::database& db,
compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr local_owned_ranges_ptr,
bool vnodes_resharding,
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), std::move(table), "", parent_id)
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _local_owned_ranges_ptr(std::move(local_owned_ranges_ptr))
, _vnodes_resharding(vnodes_resharding)
, _destinations(destinations)
{
_expected_workload = _destinations[this_shard_id()].size();
@@ -941,7 +943,7 @@ future<> shard_resharding_compaction_task_impl::run() {
auto& table = _db.find_column_family(_status.keyspace, _status.table);
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
tasks::task_info info{_status.id, _status.shard};
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), info);
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), _vnodes_resharding, info);
co_await _dir.local().move_foreign_sstables(_dir);
}

View File

@@ -693,6 +693,7 @@ private:
sharded<replica::database>& _db;
compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _owned_ranges_ptr;
bool _vnodes_resharding;
public:
table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
@@ -700,12 +701,14 @@ public:
sharded<sstables::sstable_directory>& dir,
sharded<replica::database>& db,
compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr) noexcept
compaction::owned_ranges_ptr owned_ranges_ptr,
bool vnodes_resharding) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
, _vnodes_resharding(vnodes_resharding)
{}
protected:
virtual future<> run() override;
@@ -718,6 +721,7 @@ private:
replica::database& _db;
compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _local_owned_ranges_ptr;
bool _vnodes_resharding;
std::vector<replica::reshard_shard_descriptor>& _destinations;
public:
shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
@@ -728,6 +732,7 @@ public:
replica::database& db,
compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr local_owned_ranges_ptr,
bool vnodes_resharding,
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept;
protected:
virtual future<> run() override;

View File

@@ -137,6 +137,9 @@ public:
// unless the reader is fast-forwarded to a new range.
bool _end_of_stream = false;
// Set by fill buffer for segregating output by partition range.
std::optional<dht::partition_range> _next_range;
schema_ptr _schema;
reader_permit _permit;
friend class mutation_reader;

View File

@@ -101,9 +101,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded<sstables::sstabl
// - The second part calls each shard's distributed object to reshard the SSTables they were
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) {
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) {
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr));
auto task = co_await compaction_module.make_and_start_task<compaction::table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding);
co_await task->done();
}

View File

@@ -70,7 +70,7 @@ class distributed_loader {
static future<> reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, compaction::reshape_mode mode,
sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function<bool (const sstables::shared_sstable&)> filter);
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr);
compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false);
static future<> process_sstable_dir(sharded<sstables::sstable_directory>& dir, sstables::sstable_directory::process_flags flags);
static future<> lock_table(global_table_ptr&, sharded<sstables::sstable_directory>& dir);
static future<size_t> make_sstables_available(sstables::sstable_directory& dir,

View File

@@ -8,9 +8,11 @@
#include <fmt/format.h>
#include <functional>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/util/file.hh>
#include "dht/token.hh"
#include "sstables/generation_type.hh"
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
@@ -41,8 +43,8 @@ public:
auto gtable = co_await replica::get_table_on_all_shards(db, ks_name, cf_name);
co_await replica::distributed_loader::lock_table(gtable, dir);
}
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr) {
return replica::distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator), std::move(owned_ranges_ptr));
static future<> reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false) {
return replica::distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding);
}
};
@@ -73,13 +75,13 @@ make_sstable_for_this_shard(std::function<sstables::shared_sstable()> sst_factor
/// Arguments passed to the function are passed to table::make_sstable
template <typename... Args>
sstables::shared_sstable
make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation) {
make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation, unsigned num_shards = smp::count) {
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
// but the users are usually in a thread, and rewrite_toc_without_component requires
// a thread. We could fix that, but deferring that for now.
auto s = table.schema();
auto mt = make_lw_shared<replica::memtable>(s);
for (shard_id shard = 0; shard < smp::count; ++shard) {
for (shard_id shard = 0; shard < num_shards; ++shard) {
auto key = tests::generate_partition_key(s, shard);
mutation m(s, key);
m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0));
@@ -329,6 +331,26 @@ future<> verify_that_all_sstables_are_local(sharded<sstable_directory>& sstdir,
THREADSAFE_BOOST_REQUIRE_EQUAL(count, expected_sstables);
}
future<> verify_that_all_sstables_are_contained_in_ranges(sharded<sstable_directory>& sstdir, compaction::owned_ranges_ptr owned_ranges) {
auto all_ok = co_await sstdir.map_reduce0([owned_ranges] (sstable_directory& d) -> future<bool> {
bool ret = true;
co_await d.do_for_each_sstable([&ret, &owned_ranges] (sstables::shared_sstable sst) {
dht::token_range sst_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token());
bool found = false;
for (const auto& r : *owned_ranges) {
if (r.contains(sst_range, dht::token_comparator{})) {
found = true;
break;
}
}
ret &= found;
return make_ready_future<>();
});
co_return ret;
}, true, std::logical_and<>{});
THREADSAFE_BOOST_REQUIRE(all_ok);
}
// Test that all SSTables are seen as unshared, if the generation numbers match what their
// shard-assignments expect
SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) {
@@ -863,4 +885,51 @@ SEASTAR_TEST_CASE(test_pending_log_garbage_collection) {
});
}
SEASTAR_TEST_CASE(sstable_directory_test_reshard_vnodes) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
auto& cf = e.local_db().find_column_family("ks", "cf");
e.db().invoke_on_all([] (replica::database& db) {
auto& cf = db.find_column_family("ks", "cf");
return cf.disable_auto_compaction();
}).get();
unsigned num_sstables = 10 * smp::count;
sharded<sstables::sstable_generation_generator> sharded_gen;
sharded_gen.start().get();
auto stop_generator = deferred_stop(sharded_gen);
for (unsigned nr = 0; nr < num_sstables; ++nr) {
auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) {
return gen();
}).get();
make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation, smp::count);
}
with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&] (sharded<sstables::sstable_directory>& sstdir) {
distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get();
sharded<sstables::sstable_generation_generator> sharded_gen;
sharded_gen.start().get();
auto stop_generator = deferred_stop(sharded_gen);
auto make_sstable = [&e, &sharded_gen] (shard_id shard) {
auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) {
return gen();
}).get();
auto& cf = e.local_db().find_column_family("ks", "cf");
return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload);
};
const auto& erm = e.local_db().find_keyspace("ks").get_static_effective_replication_map();
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(e.local_db().get_keyspace_local_ranges(erm).get());
bool vnodes_resharding = true;
distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable), owned_ranges_ptr, vnodes_resharding).get();
verify_that_all_sstables_are_contained_in_ranges(sstdir, owned_ranges_ptr).get();
});
});
}
BOOST_AUTO_TEST_SUITE_END()