From 3acca0aa63c0f2ba4a91522468b5a5b2ee870ceb Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 23 Jun 2025 19:39:42 +0300 Subject: [PATCH 1/7] locator: tablets: tablet_metadata: clear_gently: optimize foreign ptr destruction Sort all tablet_map_ptr:s by shard_id and then destroy them on each shard to prevent long cross-shard task queues for foreign_ptr destructions. Signed-off-by: Benny Halevy --- locator/tablets.cc | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/locator/tablets.cc b/locator/tablets.cc index adb45b06cf..8e88146679 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -409,19 +409,25 @@ void tablet_metadata::drop_tablet_map(table_id id) { } future<> tablet_metadata::clear_gently() { - for (auto&& [id, map] : _tablets) { - const auto shard = map.get_owner_shard(); - co_await smp::submit_to(shard, [map = std::move(map)] () mutable { - auto map_ptr = map.release(); - // Others copies exist, we simply drop ours, no need to clear anything. - if (map_ptr.use_count() > 1) { - return make_ready_future<>(); - } - return const_cast(*map_ptr).clear_gently().finally([map_ptr = std::move(map_ptr)] { }); - }); + tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this)); + // First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs + // on this shard. We don't use sharded<> here since it will require a similar + // submit_to to each shard owner per tablet-map. + std::vector> tablet_maps_per_shard; + tablet_maps_per_shard.resize(smp::count); + for (auto& [_, map_ptr] : _tablets) { + tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr)); } _tablets.clear(); + // Now destroy the foreign tablet map pointers on each shard. + co_await smp::invoke_on_all([&] -> future<> { + for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) { + auto map = map_ptr.release(); + co_await utils::clear_gently(map); + } + }); + co_await utils::clear_gently(_table_groups); co_await utils::clear_gently(_base_table); From 493a2303da86d5d32084dc3e45b569bc34872ec0 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 24 Jun 2025 07:39:55 +0300 Subject: [PATCH 2/7] replica: database: get and expose a mutable locator::shared_token_metadata Prepare for next patch, the will use this shared_token_metadata to make mutable_token_metadata_ptr:s Signed-off-by: Benny Halevy --- replica/database.cc | 2 +- replica/database.hh | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/replica/database.cc b/replica/database.cc index 7568139d07..c7e905792e 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -355,7 +355,7 @@ database::view_update_read_concurrency_sem() { return *sem; } -database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, +database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory& scf, const abort_source& abort, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) , _user_types(std::make_shared(*this)) diff --git a/replica/database.hh b/replica/database.hh index 957f403407..33022739ac 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1599,7 +1599,7 @@ private: service::migration_notifier& _mnotifier; gms::feature_service& _feat; std::vector _listeners; - const locator::shared_token_metadata& _shared_token_metadata; + locator::shared_token_metadata& _shared_token_metadata; lang::manager& _lang_manager; reader_concurrency_semaphore_group _reader_concurrency_semaphores_group; @@ -1684,7 +1684,7 @@ public: // (keyspace/table definitions, column mappings etc.) future<> parse_system_tables(distributed&, sharded&); - database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, + database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&, const abort_source& abort, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */); database(database&&) = delete; @@ -1719,7 +1719,7 @@ public: return _compaction_manager; } - const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; } + locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; } locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); } const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); } From e0a19b981a14a1c3f6b553f9fbec67f2568944af Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 24 Jun 2025 07:28:54 +0300 Subject: [PATCH 3/7] token_metadata: move make_token_metadata_ptr into shared_token_metadata class So we can use the local shared_token_metadata instance for safe background destroy of token_metadata_impl:s. Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 4 ++-- locator/token_metadata.hh | 17 ++++++++++------- repair/repair.cc | 2 +- service/storage_service.cc | 7 +++---- service/storage_service.hh | 4 ++-- test/boost/storage_proxy_test.cc | 6 ++++-- 6 files changed, 22 insertions(+), 18 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e48c740db3..e991911e32 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -1216,7 +1216,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded pending_token_metadata_ptr; pending_token_metadata_ptr.resize(smp::count); - auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async()); + auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async()); auto& tm = *tmptr; // bump the token_metadata ring_version // to invalidate cached token/replication mappings @@ -1227,7 +1227,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded future<> { - pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async()); + pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async()); }); co_await stm.invoke_on_all([&] (shared_token_metadata& stm) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index efeb81c1e7..e19cfa0245 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -371,12 +371,7 @@ struct topology_change_info { using token_metadata_lock = semaphore_units<>; using token_metadata_lock_func = noncopyable_function() noexcept>; -template -mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) { - return make_lw_shared(std::forward(args)...); -} - -class shared_token_metadata { +class shared_token_metadata : public peering_sharded_service { mutable_token_metadata_ptr _shared; token_metadata_lock_func _lock_func; std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2); @@ -408,7 +403,7 @@ public: // used to construct the shared object as a sharded<> instance // lock_func returns semaphore_units<> explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg) - : _shared(make_token_metadata_ptr(std::move(cfg))) + : _shared(make_lw_shared(cfg)) , _lock_func(std::move(lock_func)) , _versions_barrier("shared_token_metadata::versions_barrier") { @@ -418,6 +413,14 @@ public: shared_token_metadata(const shared_token_metadata& x) = delete; shared_token_metadata(shared_token_metadata&& x) = default; + mutable_token_metadata_ptr make_token_metadata_ptr() { + return make_lw_shared(token_metadata::config{_shared->get_topology().get_config()}); + } + + mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) { + return make_lw_shared(std::move(tm)); + } + token_metadata_ptr get() const noexcept { return _shared; } diff --git a/repair/repair.cc b/repair/repair.cc index 6ac894d206..af1381ba97 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -2244,7 +2244,7 @@ future<> repair_service::replace_with_repair(std::unordered_mapupdate_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing); co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id()); auto source_dc = utils::optional_param(myloc.dc); diff --git a/service/storage_service.cc b/service/storage_service.cc index 319e7d69df..3932463169 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -740,9 +740,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) { auto saved_tmpr = get_token_metadata_ptr(); { auto tmlock = co_await get_token_metadata_lock(); - auto tmptr = make_token_metadata_ptr(token_metadata::config { - get_token_metadata().get_topology().get_config() - }); + auto tmptr = _shared_token_metadata.make_token_metadata_ptr(); tmptr->invalidate_cached_rings(); tmptr->set_version(_topology_state_machine._topology.version); @@ -3147,9 +3145,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt try { auto base_shard = this_shard_id(); pending_token_metadata_ptr[base_shard] = tmptr; + auto& sharded_token_metadata = _shared_token_metadata.container(); // clone a local copy of updated token_metadata on all other shards co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> { - pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async()); + pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async()); }); // Precalculate new effective_replication_map for all keyspaces diff --git a/service/storage_service.hh b/service/storage_service.hh index ee3ccd78da..bced4eeffd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -287,12 +287,12 @@ private: future<> snitch_reconfigured(); future get_mutable_token_metadata_ptr() noexcept { - return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) { + return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) { // bump the token_metadata ring_version // to invalidate cached token/replication mappings // when the modified token_metadata is committed. tm.invalidate_cached_rings(); - return make_ready_future(make_token_metadata_ptr(std::move(tm))); + return _shared_token_metadata.make_token_metadata_ptr(std::move(tm)); }); } diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index 9c004c0395..3be36a3108 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -52,9 +52,11 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { } }; + auto& stm = e.shared_token_metadata().local(); + { // Ring with minimum token - auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()}); + auto tmptr = stm.make_token_metadata_ptr(); const auto host_id = locator::host_id{utils::UUID(0, 1)}; tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal); tmptr->update_normal_tokens(std::unordered_set({dht::minimum_token()}), host_id).get(); @@ -69,7 +71,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { } { - auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()}); + auto tmptr = stm.make_token_metadata_ptr(); const auto id1 = locator::host_id{utils::UUID(0, 1)}; const auto id2 = locator::host_id{utils::UUID(0, 2)}; tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal); From 2b2cfaba6e3f32b480147fde4226b5606f8e2f56 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 24 Jun 2025 08:04:44 +0300 Subject: [PATCH 4/7] token_metadata: keep a reference to shared_token_metadata To be used by a following patch to gently clean and destroy the token_data_impl in the background. Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 10 +++++-- locator/token_metadata.hh | 12 +++++--- test/boost/tablets_test.cc | 10 +++++-- test/boost/token_metadata_test.cc | 47 ++++++++++++++++++++++--------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e991911e32..03af2198bc 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -834,8 +834,9 @@ token_metadata::token_metadata(std::unique_ptr impl) { } -token_metadata::token_metadata(config cfg) - : _impl(std::make_unique(cfg)) +token_metadata::token_metadata(shared_token_metadata& stm, config cfg) + : _shared_token_metadata(&stm) + , _impl(std::make_unique(std::move(cfg))) { } @@ -845,6 +846,10 @@ token_metadata::token_metadata(token_metadata&&) noexcept = default; token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default; +void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) { + _shared_token_metadata = &stm; +} + const std::vector& token_metadata::sorted_tokens() const { return _impl->sorted_tokens(); @@ -1154,6 +1159,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { _stale_versions_in_use = _versions_barrier.advance_and_await(); } + tmptr->set_shared_token_metadata(*this); _shared = std::move(tmptr); _shared->set_version_tracker(new_tracker(_shared->get_version())); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index e19cfa0245..11ccabaf3f 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -47,7 +47,7 @@ class abstract_replication_strategy; using token = dht::token; -class token_metadata; +class shared_token_metadata; class tablet_metadata; struct host_id_or_endpoint { @@ -166,6 +166,7 @@ private: }; class token_metadata final { + shared_token_metadata* _shared_token_metadata = nullptr; std::unique_ptr _impl; private: friend class token_metadata_ring_splitter; @@ -178,7 +179,7 @@ public: using version_t = service::topology::version_t; using version_tracker_t = version_tracker; - token_metadata(config cfg); + token_metadata(shared_token_metadata& stm, config cfg); explicit token_metadata(std::unique_ptr impl); token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr token_metadata& operator=(token_metadata&&) noexcept; @@ -355,6 +356,8 @@ public: friend class shared_token_metadata; private: void set_version_tracker(version_tracker_t tracker); + + void set_shared_token_metadata(shared_token_metadata& stm); }; struct topology_change_info { @@ -403,7 +406,7 @@ public: // used to construct the shared object as a sharded<> instance // lock_func returns semaphore_units<> explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg) - : _shared(make_lw_shared(cfg)) + : _shared(make_lw_shared(*this, cfg)) , _lock_func(std::move(lock_func)) , _versions_barrier("shared_token_metadata::versions_barrier") { @@ -414,10 +417,11 @@ public: shared_token_metadata(shared_token_metadata&& x) = default; mutable_token_metadata_ptr make_token_metadata_ptr() { - return make_lw_shared(token_metadata::config{_shared->get_topology().get_config()}); + return make_lw_shared(*this, token_metadata::config{_shared->get_topology().get_config()}); } mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) { + tm.set_shared_token_metadata(*this); return make_lw_shared(std::move(tm)); } diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index c2cd69ced8..4141940a70 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -1090,7 +1090,7 @@ SEASTAR_TEST_CASE(test_sharder) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); tokm.get_topology().add_or_update_endpoint(h1); std::vector tablet_ids; @@ -1305,7 +1305,13 @@ SEASTAR_TEST_CASE(test_intranode_sharding) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_host_id = h1; + tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location; + semaphore sem(1); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto tmptr = stm.make_token_metadata_ptr(); + auto& tokm = *tmptr; tokm.get_topology().add_or_update_endpoint(h1); auto leaving_replica = tablet_replica{h1, 5}; diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index ba92fe80e8..215a988f98 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -31,13 +31,11 @@ namespace { }; } - mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) { - return make_lw_shared(token_metadata::config { - topology::config { - .this_host_id = this_host_id, - .local_dc_rack = get_dc_rack(this_host_id) - } - }); + token_metadata::config create_token_metadata_config(host_id this_host_id) { + return token_metadata::config{topology::config{ + .this_host_id = this_host_id, + .local_dc_rack = get_dc_rack(this_host_id) + }}; } template @@ -55,7 +53,10 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy const auto t1 = dht::token::from_int64(10); const auto t2 = dht::token::from_int64(20); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_normal_tokens({t1}, e1_id).get(); @@ -75,7 +76,10 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { const auto e1_id = gen_id(1); const auto e2_id = gen_id(2); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_normal_tokens({t1}, e1_id).get(); @@ -103,7 +107,10 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -133,7 +140,10 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -165,7 +175,10 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e3_id = gen_id(3); const auto e4_id = gen_id(4); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -201,7 +214,10 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -254,7 +270,10 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { const auto e1_id1 = gen_id(1); const auto e1_id2 = gen_id(2); - auto token_metadata = create_token_metadata(e1_id2); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id2); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced); token_metadata->update_normal_tokens({t1}, e1_id1).get(); From 2c0bafb9348880dc854aeab889e66fc31589f27e Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 24 Jun 2025 09:03:33 +0300 Subject: [PATCH 5/7] token_metadata: clear_and_destroy_impl when destroyed We have a lot of places in the code where a token_metadata_ptr is kept in an automatic variable and destroyed when it leaves the scope. since it's a referenced counted lw_shared_ptr, the token_metadata object is rarely destroyed in those cases, but when it is, it doesn't go through clear_gently, and in particular its tablet_metadata is not cleared gently, leading to inefficient destruction of potentially many foreign_ptr:s. This patch calls clear_and_destroy_impl that gently clears and destroys the impl object in the background using the shared_token_metadata. Fixes #13381 Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 33 ++++++++++++++++++-- locator/token_metadata.hh | 8 +++++ test/boost/locator_topology_test.cc | 2 ++ test/boost/network_topology_strategy_test.cc | 10 ++++++ test/boost/service_level_controller_test.cc | 6 ++++ test/boost/tablets_test.cc | 3 ++ test/boost/token_metadata_test.cc | 8 +++++ 7 files changed, 68 insertions(+), 2 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 03af2198bc..7e48af803e 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -840,11 +840,20 @@ token_metadata::token_metadata(shared_token_metadata& stm, config cfg) { } -token_metadata::~token_metadata() = default; +token_metadata::~token_metadata() { + clear_and_dispose_impl(); +} token_metadata::token_metadata(token_metadata&&) noexcept = default; -token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default; +token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept { + if (this != &o) { + clear_and_dispose_impl(); + _shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr); + _impl = std::exchange(o._impl, nullptr); + } + return *this; +} void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) { _shared_token_metadata = &stm; @@ -1032,6 +1041,15 @@ token_metadata::clone_after_all_left() const noexcept { co_return token_metadata(co_await _impl->clone_after_all_left()); } +void token_metadata::clear_and_dispose_impl() noexcept { + if (!_shared_token_metadata) { + return; + } + if (auto impl = std::exchange(_impl, nullptr)) { + _shared_token_metadata->clear_and_dispose(std::move(impl)); + } +} + future<> token_metadata::clear_gently() noexcept { return _impl->clear_gently(); } @@ -1148,6 +1166,17 @@ version_tracker shared_token_metadata::new_tracker(token_metadata::version_t ver return tracker; } +future<> shared_token_metadata::stop() noexcept { + co_await _background_dispose_gate.close(); +} + +void shared_token_metadata::clear_and_dispose(std::unique_ptr impl) noexcept { + // Safe to drop the future since the gate is closed in stop() + if (auto gh = _background_dispose_gate.try_hold()) { + (void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {}); + } +} + void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 11ccabaf3f..29d83dfddf 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -358,6 +358,9 @@ private: void set_version_tracker(version_tracker_t tracker); void set_shared_token_metadata(shared_token_metadata& stm); + + // Clears and disposes the token metadata impl in the background, if present. + void clear_and_dispose_impl() noexcept; }; struct topology_change_info { @@ -375,6 +378,7 @@ using token_metadata_lock = semaphore_units<>; using token_metadata_lock_func = noncopyable_function() noexcept>; class shared_token_metadata : public peering_sharded_service { + named_gate _background_dispose_gate{"shared_token_metadata::background_dispose_gate"}; mutable_token_metadata_ptr _shared; token_metadata_lock_func _lock_func; std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2); @@ -416,6 +420,8 @@ public: shared_token_metadata(const shared_token_metadata& x) = delete; shared_token_metadata(shared_token_metadata&& x) = default; + future<> stop() noexcept; + mutable_token_metadata_ptr make_token_metadata_ptr() { return make_lw_shared(*this, token_metadata::config{_shared->get_topology().get_config()}); } @@ -474,6 +480,8 @@ public: // Must be called on shard 0. static future<> mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func); + void clear_and_dispose(std::unique_ptr impl) noexcept; + private: // for testing only, unsafe to be called without awaiting get_lock() first void mutate_token_metadata_for_test(seastar::noncopyable_function func); diff --git a/test/boost/locator_topology_test.cc b/test/boost/locator_topology_test.cc index 1256a33a49..4cc068a88a 100644 --- a/test/boost/locator_topology_test.cc +++ b/test/boost/locator_topology_test.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include "locator/types.hh" #include "test/lib/scylla_test_case.hh" @@ -213,6 +214,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) { .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) { tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, node1_shard_count); diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 7267f22d64..7a7012aa48 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -280,6 +280,7 @@ void simple_test() { tm_cfg.topo_cfg.this_endpoint = my_address; tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); std::vector ring_points = { { 1.0, inet_address("192.100.10.1") }, @@ -363,6 +364,7 @@ void heavy_origin_test() { locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_stm = deferred_stop(stm); std::vector dc_racks = {2, 4, 8}; std::vector dc_endpoints = {128, 256, 512}; @@ -476,6 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -567,6 +570,7 @@ static void test_random_balancing(sharded& snitch, gms::inet_address // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -897,6 +901,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { for (size_t run = 0; run < RUNS; ++run) { semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); std::unordered_set random_tokens; while (random_tokens.size() < nodes.size() * VNODES) { @@ -1043,6 +1048,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) { semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) { auto& topo = tm.get_topology(); generate_topology(topo, datacenters, nodes); @@ -1087,6 +1093,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) { tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location; semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { generate_topology(tm.get_topology(), datacenters, nodes); return make_ready_future(); @@ -1122,6 +1129,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { .local_dc_rack = ip1_dc_rack, } }); + auto stop_stm = deferred_stop(stm); // get_location() should work before any node is added @@ -1249,6 +1257,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -1401,6 +1410,7 @@ void test_complex_rack_aware_view_pairing_test(bool more_or_less) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { diff --git a/test/boost/service_level_controller_test.cc b/test/boost/service_level_controller_test.cc index 5375cedaaf..e1ad9ac6cc 100644 --- a/test/boost/service_level_controller_test.cc +++ b/test/boost/service_level_controller_test.cc @@ -14,7 +14,9 @@ #include #include +#include #include "seastarx.hh" + #include "service/qos/qos_common.hh" #include "test/lib/scylla_test_case.hh" #include "test/lib/test_utils.hh" @@ -107,6 +109,7 @@ SEASTAR_THREAD_TEST_CASE(subscriber_simple) { sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -180,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) { sl_options.workload = service_level_options::workload_type::interactive; scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg1", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -256,6 +260,7 @@ SEASTAR_THREAD_TEST_CASE(add_remove_bad_sequence) { sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg3", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -282,6 +287,7 @@ SEASTAR_THREAD_TEST_CASE(verify_unset_shares_in_cache_when_service_level_created sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index 4141940a70..a1aec7dde7 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -841,6 +841,7 @@ SEASTAR_TEST_CASE(test_get_shard) { .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + auto stop_stm = deferred_stop(stm); tablet_id tid(0); tablet_id tid1(0); @@ -1310,6 +1311,7 @@ SEASTAR_TEST_CASE(test_intranode_sharding) { tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location; semaphore sem(1); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto tmptr = stm.make_token_metadata_ptr(); auto& tokm = *tmptr; tokm.get_topology().add_or_update_endpoint(h1); @@ -3785,6 +3787,7 @@ static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_ tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id; locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); // Initialize the token_metadata stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 215a988f98..4567e26146 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -8,6 +8,7 @@ #include #include +#include #include "test/lib/scylla_test_case.hh" #include "test/lib/test_utils.hh" #include "locator/token_metadata.hh" @@ -56,6 +57,7 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -79,6 +81,7 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -110,6 +113,7 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -143,6 +147,7 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -178,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -217,6 +223,7 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); @@ -273,6 +280,7 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { semaphore sem(1); auto tm_cfg = create_token_metadata_config(e1_id2); shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced); token_metadata->update_normal_tokens({t1}, e1_id1).get(); From 4a3d14a031da2124fcb74738b2e45f4fb8ef568a Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 23 Jun 2025 20:51:22 +0300 Subject: [PATCH 6/7] test: cluster: test_tablets_merge: add test_tablet_split_merge_with_many_tables Reproduces #23284 Currently skipped in release mode since it requires the `short_tablet_stats_refresh_interval` interval. Ref #24641 Signed-off-by: Benny Halevy --- test/cluster/test_tablets_merge.py | 65 ++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/test/cluster/test_tablets_merge.py b/test/cluster/test_tablets_merge.py index 8b203eaf6b..8018870b7a 100644 --- a/test/cluster/test_tablets_merge.py +++ b/test/cluster/test_tablets_merge.py @@ -376,6 +376,71 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks) return tablet_count < old_tablet_count or None await wait_for(finished_merging, time.time() + 120) +# Reproduces #23284 +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2): + cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',] + config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']} + + servers = [] + rf = racks + for rack_id in range(0, racks): + rack = f'rack{rack_id+1}' + servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack})) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};") + await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, 200)]) + + async def check_logs(when): + for server in servers: + log = await manager.server_open_log(server.server_id) + matches = await log.grep("Too long queue accumulated for gossip") + if matches: + pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}") + + await check_logs("after creating tables") + + total_keys = 400 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, ks) + + async def finished_splitting(): + # FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits. + # (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled. + # Per-table hints (min_tablet_count) can be used to improve this. + tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + return tablet_count >= 16 or None + # Give enough time for split to happen in debug mode + await wait_for(finished_splitting, time.time() + 120) + + await check_logs("after split completion") + + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, ks) + await manager.api.keyspace_compaction(server.ip_addr, ks) + + async def finished_merging(): + tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + return tablet_count < old_tablet_count or None + await wait_for(finished_merging, time.time() + 120) + + await check_logs("after merge completion") + # Reproduces use-after-free when migration right after merge, but concurrently to background # merge completion handler. # See: https://github.com/scylladb/scylladb/issues/24045 From 6e4803a750e42ea7504ec7f69f2a4ed02782b3e4 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 25 Jun 2025 10:13:33 +0300 Subject: [PATCH 7/7] token_metadata_impl: clear_gently: release version tracker early No need to wait for all members to be cleared gently. We can release the version earlier since the held version may be awaited for in barriers. Signed-off-by: Benny Halevy --- locator/token_metadata.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 7e48af803e..ecd2acaccd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -357,6 +357,7 @@ future> token_metadata_impl::clone_only_tok } future<> token_metadata_impl::clear_gently() noexcept { + _version_tracker = {}; co_await utils::clear_gently(_token_to_endpoint_map); co_await utils::clear_gently(_normal_token_owners); co_await utils::clear_gently(_bootstrap_tokens);