From f2ff701489995fdc84ea7ca0addbec2918c35be4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 6 Feb 2024 19:27:04 +0200 Subject: [PATCH] table: coroutinize update_effective_replication_map It's better to wait on deregistering the old main compaction_groups:s in handle_tablet_split_completion rather than leaving work in the background. Especially since their respective storage_groups are being destroyed by handle_tablet_split_completion. handle_tablet_split_completion keeps a continuation chain for all non-ready compaction_group stop fibers. and returns it so that update_effective_replication_map can await it, leaving no cleanup work in the background. Signed-off-by: Benny Halevy --- replica/database.hh | 4 ++-- replica/table.cc | 25 ++++++++++++++++--------- service/storage_service.cc | 4 ++-- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/replica/database.hh b/replica/database.hh index c7a46b1aeb..fcd3ce4e81 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -593,7 +593,7 @@ private: // Called when coordinator executes tablet splitting, i.e. commit the new tablet map with // each tablet split into two, so this replica will remap all of its compaction groups // that were previously split. - void handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap); + future<> handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap); sstables::compaction_type_options::split split_compaction_options() const noexcept; @@ -846,7 +846,7 @@ public: void set_schema(schema_ptr); db::commitlog* commitlog() const; const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } - void update_effective_replication_map(locator::effective_replication_map_ptr); + future<> update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; diff --git a/replica/table.cc b/replica/table.cc index 5d69d7d366..116187b231 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1903,7 +1903,7 @@ locator::table_load_stats table::table_load_stats(std::function table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) { auto table_id = _schema->id(); storage_group_vector new_storage_groups; new_storage_groups.resize(new_tmap.tablet_count()); @@ -1924,6 +1924,8 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato table_id, new_tmap.tablet_count(), old_tablet_count*split_size)); } + // Stop the released main compaction groups asynchronously + future<> stop_fut = make_ready_future<>(); for (unsigned id = 0; id < _storage_groups.size(); id++) { auto& sg = _storage_groups[id]; if (!sg) { @@ -1934,6 +1936,16 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato "therefore groups cannot be remapped with the new tablet count.", id, table_id)); } + // Remove old main groups, they're unused, but they need to be deregistered properly + auto cg_ptr = std::move(sg->main_compaction_group()); + auto f = cg_ptr->stop(); + if (!f.available() || f.failed()) [[unlikely]] { + stop_fut = stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable { + return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) { + tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex)); + }); + }); + } unsigned first_new_id = id << growth_factor; auto split_ready_groups = std::move(*sg).split_ready_compaction_groups(); if (split_ready_groups.size() != split_size) { @@ -1951,15 +1963,10 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato auto old_groups = std::exchange(_storage_groups, std::move(new_storage_groups)); - // Remove old main groups in background, they're unused, but they need to be deregistered properly - (void) do_with(std::move(old_groups), _async_gate.hold(), [] (storage_group_vector& groups, gate::holder&) { - return do_for_each(groups, [] (std::unique_ptr& sg) { - return sg->main_compaction_group()->stop(); - }); - }); + return stop_fut; } -void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { +future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) { auto old_erm = std::exchange(_erm, std::move(erm)); if (uses_tablets()) { @@ -1974,7 +1981,7 @@ void table::update_effective_replication_map(locator::effective_replication_map_ if (new_tablet_count > old_tablet_count) { tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets", _schema->ks_name(), _schema->cf_name(), old_tablet_count, new_tablet_count); - handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id)); + co_await handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id)); } } if (old_erm) { diff --git a/service/storage_service.cc b/service/storage_service.cc index f2bfa80ef2..5e20af1034 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2864,7 +2864,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt // Apply changes on all shards try { - co_await container().invoke_on_all([&] (storage_service& ss) { + co_await container().invoke_on_all([&] (storage_service& ss) -> future<> { ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()])); auto& db = ss._db.local(); @@ -2878,7 +2878,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto& table_erms = pending_table_erms[this_shard_id()]; for (auto it = table_erms.begin(); it != table_erms.end(); ) { auto& cf = db.find_column_family(it->first); - cf.update_effective_replication_map(std::move(it->second)); + co_await cf.update_effective_replication_map(std::move(it->second)); if (cf.uses_tablets()) { register_tablet_split_candidate(it->first); }