From 8505389963a349b085ab9459265182f99afcd1c4 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 8 Mar 2024 11:58:24 +0100 Subject: [PATCH 01/10] replica: drop single_compaction_group_if_available Drop single_compaction_group_if_available as it's unused. --- replica/compaction_group.hh | 4 ---- replica/database.hh | 2 -- replica/table.cc | 4 ---- 3 files changed, 10 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index ad328f65d1..913bb1b9bc 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -253,10 +253,6 @@ public: // FIXME: Cannot return nullptr, signature can be changed to return storage_group&. storage_group* storage_group_for_id(const schema_ptr&, size_t i) const; - compaction_group* single_compaction_group_if_available() noexcept { - return _compaction_groups.size() == 1 ? &_compaction_groups.front() : nullptr; - } - // Caller must keep the current effective_replication_map_ptr valid // until the storage_group_manager finishes update_effective_replication_map virtual future<> update_effective_replication_map(const locator::effective_replication_map& erm) = 0; diff --git a/replica/database.hh b/replica/database.hh index cd4c72bd42..86165f8556 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -591,8 +591,6 @@ private: storage_group* storage_group_for_id(size_t i) const; std::unique_ptr make_storage_group_manager(); - // Return compaction group if table owns a single one. Otherwise, null is returned. - compaction_group* single_compaction_group_if_available() const noexcept; compaction_group* get_compaction_group(size_t id) const noexcept; // Select a compaction group from a given token. compaction_group& compaction_group_for_token(dht::token token) const noexcept; diff --git a/replica/table.cc b/replica/table.cc index 94fe11d43a..3c8f34b002 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -873,10 +873,6 @@ std::unique_ptr table::make_storage_group_manager() { return ret; } -compaction_group* table::single_compaction_group_if_available() const noexcept { - return _sg_manager->single_compaction_group_if_available(); -} - compaction_group* table::get_compaction_group(size_t id) const noexcept { return storage_group_for_id(id)->main_compaction_group().get(); } From 90d618d8c9d40ade97cfd3b0070807da04e8cc50 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 8 Mar 2024 12:31:54 +0100 Subject: [PATCH 02/10] replica: open code get_compaction_group in perform_cleanup_compaction Open code get_compaction_group in table::perform_cleanup_compaction as its definition won't be relevant once storage groups are allocated dynamically. --- replica/table.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replica/table.cc b/replica/table.cc index 3c8f34b002..cf5ae37b2b 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1785,7 +1785,7 @@ future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_o co_await flush(); } - auto& cg = *get_compaction_group(0); + auto& cg = *storage_group_for_id(0)->main_compaction_group().get(); co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state(), info); } From cf9913b0b759f21160e2ef2736a4d46982b72583 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 8 Mar 2024 13:02:18 +0100 Subject: [PATCH 03/10] compaction: pass compaction group id to reshape_compaction_group Pass compaction group id to shard_reshaping_compaction_task_impl::reshape_compaction_group. Modify table::as_table_state to return table_state of the given compaction group. --- compaction/task_manager_module.cc | 10 +++++----- compaction/task_manager_module.hh | 2 +- replica/database.hh | 2 +- replica/table.cc | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index eb3de57351..baf93b4edd 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -604,12 +604,12 @@ future<> shard_reshaping_compaction_task_impl::run() { } // reshape sstables individually within the compaction groups - for (auto& sstables_in_cg : sstables_grouped_by_compaction_group | boost::adaptors::map_values) { - co_await reshape_compaction_group(sstables_in_cg, table, info); + for (auto& sstables_in_cg : sstables_grouped_by_compaction_group) { + co_await reshape_compaction_group(sstables_in_cg.first, sstables_in_cg.second, table, info); } } -future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) { +future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(size_t compaction_group_id, std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info) { while (true) { auto reshape_candidates = boost::copy_range>(sstables_in_cg @@ -635,8 +635,8 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(std::uno desc.creator = _creator; try { - co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { - sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state(), progress_monitor); + co_await table.get_compaction_manager().run_custom_job(table.as_table_state(compaction_group_id), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, compaction_group_id] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { + sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state(compaction_group_id), progress_monitor); // update the sstables_in_cg set with new sstables and remove the reshaped ones for (auto& sst : sstlist) { sstables_in_cg.erase(sst); diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index c88c6130cb..b1c0764209 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -606,7 +606,7 @@ private: std::function _filter; uint64_t& _total_shard_size; - future<> reshape_compaction_group(std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info); + future<> reshape_compaction_group(size_t compaction_group_id, std::unordered_set& sstables_in_cg, replica::column_family& table, const tasks::task_info& info); public: shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, std::string keyspace, diff --git a/replica/database.hh b/replica/database.hh index 86165f8556..4796b256fa 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1206,7 +1206,7 @@ public: void enable_off_strategy_trigger(); // FIXME: get rid of it once no users. - compaction::table_state& as_table_state() const noexcept; + compaction::table_state& as_table_state(size_t id = 0) const noexcept; // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); diff --git a/replica/table.cc b/replica/table.cc index cf5ae37b2b..7a5776155c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -3326,9 +3326,9 @@ compaction::table_state& compaction_group::as_table_state() const noexcept { return *_table_state; } -compaction::table_state& table::as_table_state() const noexcept { +compaction::table_state& table::as_table_state(size_t id) const noexcept { // FIXME: kill it once we're done with all remaining users. - return get_compaction_group(0)->as_table_state(); + return get_compaction_group(id)->as_table_state(); } future<> table::parallel_foreach_table_state(std::function(table_state&)> action) { From 532653f118de3e5c833959e6c62d2743769e2974 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 8 Mar 2024 17:12:52 +0100 Subject: [PATCH 04/10] replica: replace table::as_table_state Replace table::as_table_state with table::try_get_table_state_with_static_sharding which throws if a table does not use static sharding. --- compaction/task_manager_module.cc | 9 +++++---- replica/database.hh | 8 ++++++-- replica/table.cc | 8 +++++--- test/boost/cql_query_large_test.cc | 2 +- test/boost/sstable_3_x_test.cc | 2 +- test/boost/sstable_compaction_test.cc | 4 ++-- test/boost/sstable_test.hh | 2 +- test/boost/view_complex_test.cc | 4 ++-- test/lib/test_services.cc | 6 +++--- test/perf/perf_fast_forward.cc | 4 ++-- test/perf/perf_sstable.hh | 2 +- 11 files changed, 29 insertions(+), 22 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index baf93b4edd..de45f16354 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -159,9 +159,9 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory:: // There is a semaphore inside the compaction manager in run_resharding_jobs. So we // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time - auto& t = table.as_table_state(); + auto& t = table.try_get_table_state_with_static_sharding(); co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> { + return table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) -> future<> { auto erm = table.get_effective_replication_map(); // keep alive around compaction. sstables::compaction_descriptor desc(sstlist); @@ -635,8 +635,9 @@ future<> shard_reshaping_compaction_task_impl::reshape_compaction_group(size_t c desc.creator = _creator; try { - co_await table.get_compaction_manager().run_custom_job(table.as_table_state(compaction_group_id), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, compaction_group_id] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { - sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state(compaction_group_id), progress_monitor); + auto& t = table.get_compaction_group(compaction_group_id)->as_table_state(); + co_await table.get_compaction_manager().run_custom_job(t, sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, sstlist = std::move(sstlist), desc = std::move(desc), &sstables_in_cg, &t] (sstables::compaction_data& info, sstables::compaction_progress_monitor& progress_monitor) mutable -> future<> { + sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, t, progress_monitor); // update the sstables_in_cg set with new sstables and remove the reshaped ones for (auto& sst : sstlist) { sstables_in_cg.erase(sst); diff --git a/replica/database.hh b/replica/database.hh index 4796b256fa..0033f60b94 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -114,6 +114,10 @@ namespace gms { class gossiper; } +namespace compaction { +class shard_reshaping_compaction_task_impl; +} + namespace db { class commitlog; class config; @@ -1196,6 +1200,7 @@ public: friend class distributed_loader; friend class table_populator; + friend class compaction::shard_reshaping_compaction_task_impl; private: timer<> _off_strategy_trigger; @@ -1205,8 +1210,7 @@ public: void update_off_strategy_trigger(); void enable_off_strategy_trigger(); - // FIXME: get rid of it once no users. - compaction::table_state& as_table_state(size_t id = 0) const noexcept; + compaction::table_state& try_get_table_state_with_static_sharding() const; // Safely iterate through table states, while performing async operations on them. future<> parallel_foreach_table_state(std::function(compaction::table_state&)> action); diff --git a/replica/table.cc b/replica/table.cc index 7a5776155c..5055db1274 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -3326,9 +3326,11 @@ compaction::table_state& compaction_group::as_table_state() const noexcept { return *_table_state; } -compaction::table_state& table::as_table_state(size_t id) const noexcept { - // FIXME: kill it once we're done with all remaining users. - return get_compaction_group(id)->as_table_state(); +compaction::table_state& table::try_get_table_state_with_static_sharding() const { + if (!uses_static_sharding()) { + throw std::runtime_error("Getting table state is allowed only with static sharding"); + } + return get_compaction_group(0)->as_table_state(); } future<> table::parallel_foreach_table_state(std::function(table_state&)> action) { diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 8060e7c682..b1397b7d90 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -124,7 +124,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { flush(e); e.db().invoke_on_all([] (replica::database& dbi) { return dbi.get_tables_metadata().parallel_for_each_table([&dbi] (table_id, lw_shared_ptr t) { - return dbi.get_compaction_manager().perform_major_compaction(t->as_table_state()); + return dbi.get_compaction_manager().perform_major_compaction(t->try_get_table_state_with_static_sharding()); }); }).get(); diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 43aad33be0..7f598f4371 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -3027,7 +3027,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt desc.replacer = replacer_fn_no_op(); auto cdata = compaction_manager::create_compaction_data(); compaction_progress_monitor progress_monitor; - sstables::compact_sstables(std::move(desc), cdata, cf->as_table_state(), progress_monitor).get(); + sstables::compact_sstables(std::move(desc), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get(); return compacted_sst->as_mutation_source().make_reader_v2(s, env.make_reader_permit(), query::full_partition_range, s->full_slice()); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index edfb00e09f..11403ec402 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4991,7 +4991,7 @@ static future<> run_incremental_compaction_test(sstables::offstrategy offstrateg ssts = {}; // releases references auto owned_ranges_ptr = make_lw_shared(std::move(owned_token_ranges)); run_compaction(t, std::move(owned_ranges_ptr)).get(); - BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty()); + BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty()); testlog.info("Cleanup has finished"); } @@ -5099,7 +5099,7 @@ SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) { ssts = {}; // releases references auto owned_ranges_ptr = make_lw_shared(std::move(owned_token_ranges)); t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get(); - BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty()); + BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->try_get_table_state_with_static_sharding()).empty()); testlog.info("Cleanup has finished"); } diff --git a/test/boost/sstable_test.hh b/test/boost/sstable_test.hh index fc12c53cfb..92e7b83c88 100644 --- a/test/boost/sstable_test.hh +++ b/test/boost/sstable_test.hh @@ -38,7 +38,7 @@ public: return _cf->add_sstable_and_update_cache(sstable, offstrategy); } auto new_sstables = { sstable }; - return _cf->as_table_state().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no); + return _cf->try_get_table_state_with_static_sharding().on_compaction_completion(sstables::compaction_completion_desc{ .new_sstables = new_sstables }, sstables::offstrategy::no); } future<> rebuild_sstable_list(compaction::table_state& table_s, const std::vector& new_sstables, diff --git a/test/boost/view_complex_test.cc b/test/boost/view_complex_test.cc index 4e1337bfbf..408b42955b 100644 --- a/test/boost/view_complex_test.cc +++ b/test/boost/view_complex_test.cc @@ -842,7 +842,7 @@ void test_commutative_row_deletion(cql_test_env& e, std::function&& mayb }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get(); } SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) { @@ -1078,7 +1078,7 @@ void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::func }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").try_get_table_state_with_static_sharding()).get(); eventually([&] { auto msg = e.execute_cql("select * from vcf limit 1").get(); assert_that(msg).is_rows().with_rows({{ diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index e0e0590a4e..73bfdbdb4a 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -70,10 +70,10 @@ public: return true; } const sstables::sstable_set& main_sstable_set() const override { - return table().as_table_state().main_sstable_set(); + return table().try_get_table_state_with_static_sharding().main_sstable_set(); } const sstables::sstable_set& maintenance_sstable_set() const override { - return table().as_table_state().maintenance_sstable_set(); + return table().try_get_table_state_with_static_sharding().maintenance_sstable_set(); } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return sstables::get_fully_expired_sstables(*this, sstables, query_time); @@ -105,7 +105,7 @@ public: } bool memtable_has_key(const dht::decorated_key& key) const override { return false; } future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { - return table().as_table_state().on_compaction_completion(std::move(desc), offstrategy); + return table().try_get_table_state_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy); } bool is_auto_compaction_disabled_by_user() const noexcept override { return table().is_auto_compaction_disabled_by_user(); diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index 219f1e7b4f..b9f8422e60 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -1762,7 +1762,7 @@ void populate(const std::vector& datasets, cql_test_env& env, const ta output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names()); - db.get_compaction_manager().run_with_compaction_disabled(cf.as_table_state(), [&] { + db.get_compaction_manager().run_with_compaction_disabled(cf.try_get_table_state_with_static_sharding(), [&] { return seastar::async([&] { auto gen = ds.make_generator(s, cfg); while (auto mopt = gen()) { @@ -1869,7 +1869,7 @@ auto make_compaction_disabling_guard(replica::database& db, std::vector pr; for (auto&& t : tables) { // FIXME: discarded future. - (void)db.get_compaction_manager().run_with_compaction_disabled(t->as_table_state(), [f = shared_future<>(pr.get_shared_future())] { + (void)db.get_compaction_manager().run_with_compaction_disabled(t->try_get_table_state_with_static_sharding(), [f = shared_future<>(pr.get_shared_future())] { return f.get_future(); }); } diff --git a/test/perf/perf_sstable.hh b/test/perf/perf_sstable.hh index 50cf521547..3acfd00c51 100644 --- a/test/perf/perf_sstable.hh +++ b/test/perf/perf_sstable.hh @@ -240,7 +240,7 @@ public: descriptor.replacer = sstables::replacer_fn_no_op(); auto cdata = compaction_manager::create_compaction_data(); compaction_progress_monitor progress_monitor; - auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->as_table_state(), progress_monitor).get(); + auto ret = sstables::compact_sstables(std::move(descriptor), cdata, cf->try_get_table_state_with_static_sharding(), progress_monitor).get(); auto end = perf_sstable_test_env::now(); auto partitions_per_sstable = _cfg.partitions / _cfg.sstables; From 561fb1dd09e80d4ebdff67e7b5ae15c3de00a734 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Tue, 2 Apr 2024 19:01:50 +0200 Subject: [PATCH 05/10] service: move to cleanup stage if allow_write_both_read_old fails If allow_write_both_read_old tablet transition stage fails, move to cleanup_target stage before reverting migration. It's a preparation for further patches which deallocate storage group of a tablet during cleanup. --- docs/dev/topology-over-raft.md | 2 +- service/topology_coordinator.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index 057aea43dd..40b77f7fc2 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -292,7 +292,7 @@ stateDiagram-v2 use_new --> cleanup cleanup --> end_migration end_migration --> [*] - allow_write_both_read_old --> revert_migration: error + allow_write_both_read_old --> cleanup_target: error write_both_read_old --> cleanup_target: error streaming --> cleanup_target: error write_both_read_new --> if_state: error diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index e1121f7ee8..f9f82f4c29 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1046,7 +1046,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case locator::tablet_transition_stage::allow_write_both_read_old: if (action_failed(tablet_state.barriers[trinfo.stage])) { if (check_excluded_replicas()) { - transition_to_with_barrier(locator::tablet_transition_stage::revert_migration); + transition_to_with_barrier(locator::tablet_transition_stage::cleanup_target); break; } } From 54fcb7be53eef06af5a369966915c543e9ae3640 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 3 Apr 2024 17:25:20 +0200 Subject: [PATCH 06/10] replica: handle reads of non-existing tablets gracefully In the following patches, storage groups (and so also sstables sets) will be allocated only for tablets that are located on this shard. Some layers may try to read non-existing sstable sets. Handle this case as if the sstables set was empty instead of calling on_internal_error. --- replica/tablets.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/replica/tablets.cc b/replica/tablets.cc index 4ec702e1da..35a52ec03e 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -568,7 +568,7 @@ public: auto token = pos.token(); if (!_cur_set || pos.token() >= _lowest_next_token) { auto idx = _tset.group_of(token); - if (!token.is_maximum()) { + if (!token.is_maximum() && _tset._sstable_set_ids.contains(idx)) { _cur_set = _tset.find_sstable_set(idx); } // Set the next token to point to the next engaged storage group. From c283746b32790702dfcca62de2a99292ec1bcfcd Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Mon, 15 Apr 2024 13:38:03 +0200 Subject: [PATCH 07/10] replica: add rwlock to storage_group_manager Add rwlock which prevents storage groups from being added/deleted while some other layers itereates over them (or their compaction groups). Add methods to iterate over storage groups with the lock held. --- replica/compaction_group.hh | 17 +++++++----- replica/database.hh | 2 -- replica/table.cc | 55 +++++++++++++++++++++++-------------- replica/tablets.cc | 16 +++++------ 4 files changed, 51 insertions(+), 39 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 913bb1b9bc..cef22e718e 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -8,6 +8,7 @@ #include #include +#include #include "database_fwd.hh" #include "compaction/compaction_descriptor.hh" @@ -233,10 +234,16 @@ protected: // The list entries are unlinked automatically when the storage group, they belong to, is removed. compaction_group_list _compaction_groups; storage_group_map _storage_groups; - + // Prevents _storage_groups from having its elements inserted or deleted while other layer iterates + // over them (or over _compaction_groups). + seastar::rwlock _lock; public: virtual ~storage_group_manager(); + seastar::rwlock& get_rwlock() noexcept { + return _lock; + } + const compaction_group_list& compaction_groups() const noexcept { return _compaction_groups; } @@ -244,12 +251,8 @@ public: return _compaction_groups; } - const storage_group_map& storage_groups() const noexcept { - return _storage_groups; - } - storage_group_map& storage_groups() noexcept { - return _storage_groups; - } + future<> for_each_storage_group_gently(std::function(size_t, storage_group&)> f); + void for_each_storage_group(std::function f) const; // FIXME: Cannot return nullptr, signature can be changed to return storage_group&. storage_group* storage_group_for_id(const schema_ptr&, size_t i) const; diff --git a/replica/database.hh b/replica/database.hh index 0033f60b94..59eead3681 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -606,8 +606,6 @@ private: compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept; // Returns a list of all compaction groups. compaction_group_list& compaction_groups() const noexcept; - // Returns a list of all storage groups. - const storage_group_map& storage_groups() const noexcept; // Safely iterate through compaction groups, while performing async operations on them. future<> parallel_foreach_compaction_group(std::function(compaction_group&)> action); diff --git a/replica/table.cc b/replica/table.cc index 5055db1274..630394a506 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -548,9 +548,22 @@ void table::enable_off_strategy_trigger() { storage_group_manager::~storage_group_manager() = default; +future<> storage_group_manager::for_each_storage_group_gently(std::function(size_t, storage_group&)> f) { + rwlock::holder shared_lock = co_await get_rwlock().hold_read_lock(); + for (auto& [id, sg]: _storage_groups) { + co_await f(id, *sg); + } +} + +void storage_group_manager::for_each_storage_group(std::function f) const { + for (auto& [id, sg]: _storage_groups) { + f(id, *sg); + } +} + storage_group* storage_group_manager::storage_group_for_id(const schema_ptr& s, size_t i) const { - auto it = storage_groups().find(i); - if (it == storage_groups().end()) [[unlikely]] { + auto it = _storage_groups.find(i); + if (it == _storage_groups.end()) [[unlikely]] { on_internal_error(tlogger, format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name())); } return it->second.get(); @@ -805,7 +818,7 @@ bool tablet_storage_group_manager::all_storage_groups_split() { return true; } - auto split_ready = std::ranges::all_of(storage_groups() | boost::adaptors::map_values, + auto split_ready = std::ranges::all_of(_storage_groups | boost::adaptors::map_values, std::bind(&storage_group::set_split_mode, std::placeholders::_1, std::ref(compaction_groups()))); // The table replica will say to coordinator that its split status is ready by @@ -834,9 +847,9 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com future<> tablet_storage_group_manager::split_all_storage_groups() { sstables::compaction_type_options::split opt = split_compaction_options(); - for (auto& storage_group : storage_groups() | boost::adaptors::map_values) { - co_await storage_group->split(compaction_groups(), opt); - } + co_await for_each_storage_group_gently([this, opt] (size_t i, storage_group& storage_group) { + return storage_group.split(compaction_groups(), opt); + }); } future<> table::split_all_storage_groups() { @@ -849,7 +862,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id return make_ready_future<>(); } - auto& sg = storage_groups()[idx]; + auto& sg = _storage_groups[idx]; if (!sg) { on_internal_error(tlogger, format("Tablet {} of table {}.{} is not allocated in this shard", idx, schema()->ks_name(), schema()->cf_name())); @@ -908,8 +921,8 @@ utils::chunked_vector tablet_storage_group_manager::compactio size_t candidate_end = tr.end() ? storage_group_id_for_token(tr.end()->value()) : (tablet_count() - 1); while (candidate_start <= candidate_end) { - auto it = storage_groups().find(candidate_start++); - if (it == storage_groups().end()) { + auto it = _storage_groups.find(candidate_start++); + if (it == _storage_groups.end()) { continue; } auto& sg = it->second; @@ -961,12 +974,8 @@ compaction_group_list& table::compaction_groups() const noexcept { return _sg_manager->compaction_groups(); } -const storage_group_map& table::storage_groups() const noexcept { - return _sg_manager->storage_groups(); -} - future<> table::parallel_foreach_compaction_group(std::function(compaction_group&)> action) { - // TODO: place a barrier here when we allow dynamic groups. + rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock(); co_await coroutine::parallel_for_each(compaction_groups(), [&] (compaction_group& cg) { return action(cg); }); @@ -2020,13 +2029,13 @@ locator::table_load_stats table::table_load_stats(std::functionsplit_ready_seq_number(); - for (auto& [id, sg] : storage_groups()) { + _sg_manager->for_each_storage_group([&] (size_t id, storage_group& sg) { locator::global_tablet_id gid { _schema->id(), locator::tablet_id(id) }; if (!tablet_filter(gid)) { - continue; + return; } - stats.size_in_bytes += sg->live_disk_space_used(); - } + stats.size_in_bytes += sg.live_disk_space_used(); + }); return stats; } @@ -2054,7 +2063,7 @@ future<> tablet_storage_group_manager::handle_tablet_split_completion(const loca // Stop the released main compaction groups asynchronously future<> stop_fut = make_ready_future<>(); - for (auto& [id, sg] : storage_groups()) { + for (auto& [id, sg] : _storage_groups) { if (!sg->main_compaction_group()->empty()) { on_internal_error(tlogger, format("Found that storage of group {} for table {} wasn't split correctly, " \ "therefore groups cannot be remapped with the new tablet count.", @@ -2099,12 +2108,16 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo if (new_tablet_count > old_tablet_count) { tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); - return handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); + co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); } - return make_ready_future(); + co_return; } future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { + // Exclusive lock is meant to protect storage groups, but we hold it here to prevent preemption + // between erm and storage groups updates as they need to be consistent. + rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock(); + auto old_erm = std::exchange(_erm, std::move(erm)); if (uses_tablets()) { diff --git a/replica/tablets.cc b/replica/tablets.cc index 35a52ec03e..68dd51c053 100644 --- a/replica/tablets.cc +++ b/replica/tablets.cc @@ -367,15 +367,13 @@ public: : _schema(std::move(s)) , _tablet_map(tmap.tablet_count()) { - for (const auto& [id, sg] : sgm.storage_groups()) { - if (sg) { - auto set = sg->make_sstable_set(); - _size += set->size(); - _bytes_on_disk += set->bytes_on_disk(); - _sstable_sets[id] = std::move(set); - _sstable_set_ids.insert(id); - } - } + sgm.for_each_storage_group([this] (size_t id, storage_group& sg) { + auto set = sg.make_sstable_set(); + _size += set->size(); + _bytes_on_disk += set->bytes_on_disk(); + _sstable_sets[id] = std::move(set); + _sstable_set_ids.insert(id); + }); } tablet_sstable_set(const tablet_sstable_set& o) From 6e1e082e8c2e6444eba6542fb740fb12537000bb Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Thu, 25 Apr 2024 17:30:33 +0200 Subject: [PATCH 08/10] replica: refresh snapshot in compaction_group::cleanup During compaction_group::cleanup sstables set is updated, but row_cache::_underlaying still keeps a shared ptr to the old set. Due to that descriptors to deleted sstables aren't closed. Refresh snapshot in order to store new sstables set in _underlying mutation source. --- replica/table.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/replica/table.cc b/replica/table.cc index 630394a506..2f7495548b 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -3423,6 +3423,7 @@ future<> compaction_group::cleanup() { tlogger.debug("Invalidating range {} for compaction group {} of table {} during cleanup.", p_range, group_id(), _t.schema()->ks_name(), _t.schema()->cf_name()); co_await _t._cache.invalidate(std::move(updater), p_range); + _t._cache.refresh_snapshot(); } future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { From b4371a0ea02063ed039e89356d030b82c38aa2ec Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 6 Mar 2024 15:20:55 +0100 Subject: [PATCH 09/10] replica: allocate storage groups dynamically Currently empty storage_groups are allocated for tablets that are not on this shard. Allocate storage groups dynamically, i.e.: - on table creation allocate only storage groups that are on this shard; - allocate a storage group for tablet that is moved to this shard; - deallocate storage group for tablet that is cleaned up. Stop compaction group before it's deallocated. Add a flag to table::cleanup_tablet deciding whether to deallocate sgs and use it in commitlog tests. --- replica/compaction_group.hh | 1 + replica/database.hh | 8 +++ replica/table.cc | 92 ++++++++++++++++++++++------ test/boost/commitlog_cleanup_test.cc | 4 +- 4 files changed, 84 insertions(+), 21 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index cef22e718e..61ffcc80ba 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -253,6 +253,7 @@ public: future<> for_each_storage_group_gently(std::function(size_t, storage_group&)> f); void for_each_storage_group(std::function f) const; + void remove_storage_group(size_t id); // FIXME: Cannot return nullptr, signature can be changed to return storage_group&. storage_group* storage_group_for_id(const schema_ptr&, size_t i) const; diff --git a/replica/database.hh b/replica/database.hh index 59eead3681..df01578558 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -838,7 +838,15 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } future<> update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; +private: + future<> clear_inactive_reads_for_tablet(database& db, storage_group* sg); + future<> stop_compaction_groups(storage_group* sg); + future<> flush_compaction_groups(storage_group* sg); + future<> cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg); +public: future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id); + // For tests only. + future<> cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; shard_id shard_of(const mutation& m) const { diff --git a/replica/table.cc b/replica/table.cc index 2f7495548b..1fac797a3f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -561,10 +561,18 @@ void storage_group_manager::for_each_storage_group(std::functionks_name(), s->cf_name())); + throw std::out_of_range(format("Storage wasn't found for tablet {} of table {}.{}", i, s->ks_name(), s->cf_name())); } return it->second.get(); } @@ -674,10 +682,9 @@ public: auto shard = tmap.get_shard(tid, _my_host_id); if (shard && *shard == this_shard_id()) { tlogger.debug("Tablet with id {} and range {} present for {}.{}", tid, range, schema()->ks_name(), schema()->cf_name()); + auto cg = std::make_unique(_t, tid.value(), std::move(range)); + ret[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); } - // FIXME: don't allocate compaction groups for tablets that aren't present in this shard. - auto cg = std::make_unique(_t, tid.value(), std::move(range)); - ret[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); } _storage_groups = std::move(ret); } @@ -1014,11 +1021,14 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no future<> table::do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction) { + compaction_group& cg = compaction_group_for_sstable(sst); + // Hold gate to make share compaction group is alive. + auto holder = cg.async_gate().hold(); + auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1); co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept { // FIXME: this is not really noexcept, but we need to provide strong exception guarantees. // atomically load all opened sstables into column family. - compaction_group& cg = compaction_group_for_sstable(sst); if (!offstrategy) { add_sstable(cg, sst); } else { @@ -2109,6 +2119,25 @@ future<> tablet_storage_group_manager::update_effective_replication_map(const lo tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", schema()->ks_name(), schema()->cf_name(), old_tablet_count, new_tablet_count); co_await handle_tablet_split_completion(*old_tablet_map, *new_tablet_map); + co_return; + } + + // Allocate storage group if tablet is migrating in. + auto this_replica = locator::tablet_replica{ + .host = erm.get_token_metadata().get_my_id(), + .shard = this_shard_id() + }; + auto tablet_migrates_in = [this_replica] (locator::tablet_transition_info& transition_info) { + return transition_info.stage == locator::tablet_transition_stage::allow_write_both_read_old && transition_info.pending_replica == this_replica; + }; + for (auto& transition : new_tablet_map->transitions()) { + auto tid = transition.first; + auto transition_info = transition.second; + if (!_storage_groups.contains(tid.value()) && tablet_migrates_in(transition_info)) { + auto range = new_tablet_map->get_token_range(tid); + auto cg = std::make_unique(_t, tid.value(), std::move(range)); + _storage_groups[tid.value()] = std::make_unique(std::move(cg), &_compaction_groups); + } } co_return; } @@ -3426,23 +3455,27 @@ future<> compaction_group::cleanup() { _t._cache.refresh_snapshot(); } -future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { - auto holder = async_gate().hold(); - - auto* sg = storage_group_for_id(tid.value()); - +future<> table::clear_inactive_reads_for_tablet(database& db, storage_group* sg) { for (auto& cg_ptr : sg->compaction_groups()) { - if (!cg_ptr) { - throw std::runtime_error(format("Cannot cleanup tablet {} of table {}.{} because it is not allocated in this shard", - tid, _schema->ks_name(), _schema->cf_name())); - } - co_await db.clear_inactive_reads_for_tablet(_schema->id(), cg_ptr->token_range()); + } +} - // Synchronizes with in-flight writes if any, and also takes care of flushing if needed. - // FIXME: to be able to stop group and provide guarantee above, we must first be able to reallocate a new group if tablet is migrated back. - //co_await _cg.stop(); +future<> table::stop_compaction_groups(storage_group* sg) { + // Synchronizes with in-flight writes if any, and also takes care of flushing if needed. + for (auto& cg_ptr : sg->compaction_groups()) { + co_await cg_ptr->stop(); + } +} + +future<> table::flush_compaction_groups(storage_group* sg) { + for (auto& cg_ptr : sg->compaction_groups()) { co_await cg_ptr->flush(); + } +} + +future<> table::cleanup_compaction_groups(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid, storage_group* sg) { + for (auto& cg_ptr : sg->compaction_groups()) { co_await cg_ptr->cleanup(); // FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation, // and can cover some mutations which weren't cleaned, causing them to be lost during replay. @@ -3460,8 +3493,29 @@ future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locato } tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name()); +} - // FIXME: Deallocate compaction group in this shard +future<> table::cleanup_tablet(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { + auto holder = async_gate().hold(); + rwlock::holder exclusive_lock = co_await _sg_manager->get_rwlock().hold_write_lock(); + auto* sg = storage_group_for_id(tid.value()); + + co_await clear_inactive_reads_for_tablet(db, sg); + // compaction_group::stop takes care of flushing. + co_await stop_compaction_groups(sg); + co_await cleanup_compaction_groups(db, sys_ks, tid, sg); + _sg_manager->remove_storage_group(tid.value()); +} + +future<> table::cleanup_tablet_without_deallocation(database& db, db::system_keyspace& sys_ks, locator::tablet_id tid) { + auto holder = async_gate().hold(); + // Hold shared lock to keep storage group alive. + rwlock::holder shared_lock = co_await _sg_manager->get_rwlock().hold_read_lock(); + auto* sg = storage_group_for_id(tid.value()); + + co_await clear_inactive_reads_for_tablet(db, sg); + co_await flush_compaction_groups(sg); + co_await cleanup_compaction_groups(db, sys_ks, tid, sg); } } // namespace replica diff --git a/test/boost/commitlog_cleanup_test.cc b/test/boost/commitlog_cleanup_test.cc index e156a792bc..942b852b5b 100644 --- a/test/boost/commitlog_cleanup_test.cc +++ b/test/boost/commitlog_cleanup_test.cc @@ -134,7 +134,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanups) { // Cleanup the tablet. e.db().invoke_on_all([&] (replica::database& db) { - return db.find_column_family("ks", "cf").cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)); + return db.find_column_family("ks", "cf").cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0)); }).get(); BOOST_REQUIRE_EQUAL(get_num_rows(), 0); @@ -179,7 +179,7 @@ SEASTAR_TEST_CASE(test_commitlog_cleanup_record_gc) { }; auto cleanup_tablet = [&] (std::string cf) { auto& db = e.local_db(); - db.find_column_family("ks", cf).cleanup_tablet(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get(); + db.find_column_family("ks", cf).cleanup_tablet_without_deallocation(db, e.get_system_keyspace().local(), locator::tablet_id(0)).get(); }; auto get_num_records = [&] { auto res = e.execute_cql("select * from system.commitlog_cleanups;").get(); From 51fdda4199c5375612364ec37925b00592d792a4 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 6 Mar 2024 17:13:25 +0100 Subject: [PATCH 10/10] test: add test for back and forth tablets migration --- .../topology_custom/test_tablets_migration.py | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/topology_custom/test_tablets_migration.py b/test/topology_custom/test_tablets_migration.py index 25c2a73a85..e24ed7fc7c 100644 --- a/test/topology_custom/test_tablets_migration.py +++ b/test/topology_custom/test_tablets_migration.py @@ -242,3 +242,51 @@ async def test_node_failure_during_tablet_migration(manager: ManagerClient, fail assert len(replicas) == 1 for r in replicas[0].replicas: assert r[0] != host_ids[failer.fail_idx] + +@pytest.mark.asyncio +async def test_tablet_back_and_forth_migration(manager: ManagerClient): + logger.info("Bootstrapping cluster") + cfg = {'enable_user_defined_functions': False, 'experimental_features': ['tablets', 'consistent-topology-changes']} + host_ids = [] + servers = [] + + async def make_server(): + s = await manager.server_add(config=cfg) + servers.append(s) + host_ids.append(await manager.get_host_id(s.server_id)) + await manager.api.disable_tablet_balancing(s.ip_addr) + + async def assert_rows(num): + res = await cql.run_async(f"SELECT * FROM test.test") + assert len(res) == num + + await make_server() + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + await make_server() + + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({1}, {1});") + await assert_rows(1) + + replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + logger.info(f"Tablet is on [{replicas}]") + assert len(replicas) == 1 and len(replicas[0].replicas) == 1 + + old_replica = replicas[0].replicas[0] + assert old_replica[0] != host_ids[1] + new_replica = (host_ids[1], 0) + + logger.info(f"Moving tablet {old_replica} -> {new_replica}") + manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + + await assert_rows(1) + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({2}, {2});") + await assert_rows(2) + + logger.info(f"Moving tablet {new_replica} -> {old_replica}") + manager.api.move_tablet(servers[0].ip_addr, "test", "test", new_replica[0], new_replica[1], old_replica[0], old_replica[1], 0) + + await assert_rows(2) + await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({3}, {3});") + await assert_rows(3)