diff --git a/compaction/compaction.cc b/compaction/compaction.cc index ab74a19372..ce20e94db7 100644 --- a/compaction/compaction.cc +++ b/compaction/compaction.cc @@ -44,6 +44,7 @@ #include "dht/partition_filter.hh" #include "mutation_writer/shard_based_splitting_writer.hh" #include "mutation_writer/partition_based_splitting_writer.hh" +#include "mutation_writer/token_group_based_splitting_writer.hh" #include "mutation/mutation_source_metadata.hh" #include "mutation/mutation_fragment_stream_validator.hh" #include "utils/assert.hh" @@ -1933,6 +1934,7 @@ class resharding_compaction final : public compaction { }; std::vector _estimation_per_shard; std::vector _run_identifiers; + bool _reshard_vnodes; private: // return estimated partitions per sstable for a given shard uint64_t partitions_per_sstable(shard_id s) const { @@ -1945,7 +1947,11 @@ public: : compaction(table_s, std::move(descriptor), cdata, progress_monitor, use_backlog_tracker::no) , _estimation_per_shard(smp::count) , _run_identifiers(smp::count) + , _reshard_vnodes(descriptor.options.as().vnodes_resharding) { + if (_reshard_vnodes && !_owned_ranges) { + on_internal_error(clogger, "Resharding vnodes requires owned_ranges"); + } for (auto& sst : _sstables) { const auto& shards = sst->get_shards_for_this_sstable(); auto size = sst->bytes_on_disk(); @@ -1983,8 +1989,25 @@ public: } mutation_reader_consumer make_interposer_consumer(mutation_reader_consumer end_consumer) override { - return [end_consumer = std::move(end_consumer)] (mutation_reader reader) mutable -> future<> { - return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer)); + auto owned_ranges = _reshard_vnodes ? _owned_ranges : nullptr; + return [end_consumer = std::move(end_consumer), owned_ranges = std::move(owned_ranges)] (mutation_reader reader) mutable -> future<> { + if (owned_ranges) { + auto classify = [owned_ranges, it = owned_ranges->begin(), idx = mutation_writer::token_group_id(0)] (dht::token t) mutable -> mutation_writer::token_group_id { + dht::token_comparator cmp; + while (it != owned_ranges->end() && it->after(t, cmp)) { + clogger.debug("Token {} is after current range {}: advancing to the next range", t, *it); + ++it; + ++idx; + } + if (it == owned_ranges->end() || !it->contains(t, cmp)) { + on_internal_error(clogger, fmt::format("Token {} is outside of owned ranges", t)); + } + return idx; + }; + return mutation_writer::segregate_by_token_group(std::move(reader), std::move(classify), std::move(end_consumer)); + } else { + return mutation_writer::segregate_by_shard(std::move(reader), std::move(end_consumer)); + } }; } diff --git a/compaction/compaction_descriptor.hh b/compaction/compaction_descriptor.hh index 9c30b90f29..3f0a3988f9 100644 --- a/compaction/compaction_descriptor.hh +++ b/compaction/compaction_descriptor.hh @@ -87,6 +87,8 @@ public: drop_unfixable_sstables drop_unfixable = drop_unfixable_sstables::no; }; struct reshard { + // If set, resharding compaction will apply the owned_ranges to segregate sstables in vnode boundaries. + bool vnodes_resharding = false; }; struct reshape { }; @@ -115,8 +117,8 @@ public: return compaction_type_options(reshape{}); } - static compaction_type_options make_reshard() { - return compaction_type_options(reshard{}); + static compaction_type_options make_reshard(bool vnodes_resharding = false) { + return compaction_type_options(reshard{.vnodes_resharding = vnodes_resharding}); } static compaction_type_options make_regular() { diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 0a8e3d30c5..e41c170ab0 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -132,7 +132,7 @@ distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector so // A creator function must be passed that will create an SSTable object in the correct shard, // and an I/O priority must be specified. future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table, - compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, tasks::task_info parent_info) + compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding, tasks::task_info parent_info) { // Resharding doesn't like empty sstable sets, so bail early. There is nothing // to reshard in this shard. @@ -166,7 +166,7 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: auto erm = table.get_effective_replication_map(); // keep alive around compaction. compaction_descriptor desc(sstlist); - desc.options = compaction_type_options::make_reshard(); + desc.options = compaction_type_options::make_reshard(vnodes_resharding); desc.creator = creator; desc.sharder = &erm->get_sharder(*table.schema()); desc.owned_ranges = owned_ranges_ptr; @@ -906,7 +906,7 @@ future<> table_resharding_compaction_task_impl::run() { if (_owned_ranges_ptr) { local_owned_ranges_ptr = make_lw_shared(*_owned_ranges_ptr); } - auto task = co_await compaction_module.make_and_start_task(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), destinations); + auto task = co_await compaction_module.make_and_start_task(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, std::move(local_owned_ranges_ptr), _vnodes_resharding, destinations); co_await task->done(); })); @@ -926,12 +926,14 @@ shard_resharding_compaction_task_impl::shard_resharding_compaction_task_impl(tas replica::database& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr local_owned_ranges_ptr, + bool vnodes_resharding, std::vector& destinations) noexcept : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, "shard", std::move(keyspace), std::move(table), "", parent_id) , _dir(dir) , _db(db) , _creator(std::move(creator)) , _local_owned_ranges_ptr(std::move(local_owned_ranges_ptr)) + , _vnodes_resharding(vnodes_resharding) , _destinations(destinations) { _expected_workload = _destinations[this_shard_id()].size(); @@ -941,7 +943,7 @@ future<> shard_resharding_compaction_task_impl::run() { auto& table = _db.find_column_family(_status.keyspace, _status.table); auto info_vec = std::move(_destinations[this_shard_id()].info_vec); tasks::task_info info{_status.id, _status.shard}; - co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), info); + co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(_local_owned_ranges_ptr), _vnodes_resharding, info); co_await _dir.local().move_foreign_sstables(_dir); } diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index 06212a0b01..9c88a2b5dd 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -693,6 +693,7 @@ private: sharded& _db; compaction_sstable_creator_fn _creator; compaction::owned_ranges_ptr _owned_ranges_ptr; + bool _vnodes_resharding; public: table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, @@ -700,12 +701,14 @@ public: sharded& dir, sharded& db, compaction_sstable_creator_fn creator, - compaction::owned_ranges_ptr owned_ranges_ptr) noexcept + compaction::owned_ranges_ptr owned_ranges_ptr, + bool vnodes_resharding) noexcept : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), "table", std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) , _dir(dir) , _db(db) , _creator(std::move(creator)) , _owned_ranges_ptr(std::move(owned_ranges_ptr)) + , _vnodes_resharding(vnodes_resharding) {} protected: virtual future<> run() override; @@ -718,6 +721,7 @@ private: replica::database& _db; compaction_sstable_creator_fn _creator; compaction::owned_ranges_ptr _local_owned_ranges_ptr; + bool _vnodes_resharding; std::vector& _destinations; public: shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, @@ -728,6 +732,7 @@ public: replica::database& db, compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr local_owned_ranges_ptr, + bool vnodes_resharding, std::vector& destinations) noexcept; protected: virtual future<> run() override; diff --git a/readers/mutation_reader.hh b/readers/mutation_reader.hh index 6e3fb79e6e..8c94945c1d 100644 --- a/readers/mutation_reader.hh +++ b/readers/mutation_reader.hh @@ -137,6 +137,9 @@ public: // unless the reader is fast-forwarded to a new range. bool _end_of_stream = false; + // Set by fill buffer for segregating output by partition range. + std::optional _next_range; + schema_ptr _schema; reader_permit _permit; friend class mutation_reader; diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 2b12742022..749762de8e 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -101,9 +101,9 @@ distributed_loader::lock_table(global_table_ptr& table, sharded -distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) { +distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr, bool vnodes_resharding) { auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); - auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr)); + auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); co_await task->done(); } diff --git a/replica/distributed_loader.hh b/replica/distributed_loader.hh index b8664302fb..6b0a63c667 100644 --- a/replica/distributed_loader.hh +++ b/replica/distributed_loader.hh @@ -70,7 +70,7 @@ class distributed_loader { static future<> reshape(sharded& dir, sharded& db, compaction::reshape_mode mode, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, std::function filter); static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, - compaction::owned_ranges_ptr owned_ranges_ptr = nullptr); + compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false); static future<> process_sstable_dir(sharded& dir, sstables::sstable_directory::process_flags flags); static future<> lock_table(global_table_ptr&, sharded& dir); static future make_sstables_available(sstables::sstable_directory& dir, diff --git a/test/boost/sstable_directory_test.cc b/test/boost/sstable_directory_test.cc index 135f7acca2..430b7acedd 100644 --- a/test/boost/sstable_directory_test.cc +++ b/test/boost/sstable_directory_test.cc @@ -8,9 +8,11 @@ #include +#include #include #include #include +#include "dht/token.hh" #include "sstables/generation_type.hh" #undef SEASTAR_TESTING_MAIN #include @@ -41,8 +43,8 @@ public: auto gtable = co_await replica::get_table_on_all_shards(db, ks_name, cf_name); co_await replica::distributed_loader::lock_table(gtable, dir); } - static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr) { - return replica::distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator), std::move(owned_ranges_ptr)); + static future<> reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr = nullptr, bool vnodes_resharding = false) { + return replica::distributed_loader::reshard(dir, db, std::move(ks_name), std::move(table_name), std::move(creator), std::move(owned_ranges_ptr), vnodes_resharding); } }; @@ -73,13 +75,13 @@ make_sstable_for_this_shard(std::function sst_factor /// Arguments passed to the function are passed to table::make_sstable template sstables::shared_sstable -make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation) { +make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation, unsigned num_shards = smp::count) { // Unlike the previous helper, we'll assume we're in a thread here. It's less flexible // but the users are usually in a thread, and rewrite_toc_without_component requires // a thread. We could fix that, but deferring that for now. auto s = table.schema(); auto mt = make_lw_shared(s); - for (shard_id shard = 0; shard < smp::count; ++shard) { + for (shard_id shard = 0; shard < num_shards; ++shard) { auto key = tests::generate_partition_key(s, shard); mutation m(s, key); m.set_clustered_cell(clustering_key::make_empty(), bytes("c"), data_value(int32_t(0)), api::timestamp_type(0)); @@ -329,6 +331,26 @@ future<> verify_that_all_sstables_are_local(sharded& sstdir, THREADSAFE_BOOST_REQUIRE_EQUAL(count, expected_sstables); } +future<> verify_that_all_sstables_are_contained_in_ranges(sharded& sstdir, compaction::owned_ranges_ptr owned_ranges) { + auto all_ok = co_await sstdir.map_reduce0([owned_ranges] (sstable_directory& d) -> future { + bool ret = true; + co_await d.do_for_each_sstable([&ret, &owned_ranges] (sstables::shared_sstable sst) { + dht::token_range sst_range(sst->get_first_decorated_key().token(), sst->get_last_decorated_key().token()); + bool found = false; + for (const auto& r : *owned_ranges) { + if (r.contains(sst_range, dht::token_comparator{})) { + found = true; + break; + } + } + ret &= found; + return make_ready_future<>(); + }); + co_return ret; + }, true, std::logical_and<>{}); + THREADSAFE_BOOST_REQUIRE(all_ok); +} + // Test that all SSTables are seen as unshared, if the generation numbers match what their // shard-assignments expect SEASTAR_THREAD_TEST_CASE(sstable_directory_unshared_sstables_sanity_matched_generations) { @@ -863,4 +885,51 @@ SEASTAR_TEST_CASE(test_pending_log_garbage_collection) { }); } +SEASTAR_TEST_CASE(sstable_directory_test_reshard_vnodes) { + return do_with_cql_env_thread([] (cql_test_env& e) { + e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get(); + auto& cf = e.local_db().find_column_family("ks", "cf"); + + e.db().invoke_on_all([] (replica::database& db) { + auto& cf = db.find_column_family("ks", "cf"); + return cf.disable_auto_compaction(); + }).get(); + + unsigned num_sstables = 10 * smp::count; + + sharded sharded_gen; + sharded_gen.start().get(); + auto stop_generator = deferred_stop(sharded_gen); + + for (unsigned nr = 0; nr < num_sstables; ++nr) { + auto generation = sharded_gen.invoke_on(nr % smp::count, [] (auto& gen) { + return gen(); + }).get(); + make_sstable_for_all_shards(cf, sstables::sstable_state::upload, generation, smp::count); + } + + with_sstable_directory(e.db(), "ks", "cf", sstables::sstable_state::upload, [&] (sharded& sstdir) { + distributed_loader_for_tests::process_sstable_dir(sstdir, { .throw_on_missing_toc = true }).get(); + + sharded sharded_gen; + sharded_gen.start().get(); + auto stop_generator = deferred_stop(sharded_gen); + + auto make_sstable = [&e, &sharded_gen] (shard_id shard) { + auto generation = sharded_gen.invoke_on(shard, [] (auto& gen) { + return gen(); + }).get(); + auto& cf = e.local_db().find_column_family("ks", "cf"); + return cf.get_sstables_manager().make_sstable(cf.schema(), cf.get_storage_options(), generation, sstables::sstable_state::upload); + }; + + const auto& erm = e.local_db().find_keyspace("ks").get_static_effective_replication_map(); + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(e.local_db().get_keyspace_local_ranges(erm).get()); + bool vnodes_resharding = true; + distributed_loader_for_tests::reshard(sstdir, e.db(), "ks", "cf", std::move(make_sstable), owned_ranges_ptr, vnodes_resharding).get(); + verify_that_all_sstables_are_contained_in_ranges(sstdir, owned_ranges_ptr).get(); + }); + }); +} + BOOST_AUTO_TEST_SUITE_END()