diff --git a/locator/tablets.cc b/locator/tablets.cc index adb45b06cf..8e88146679 100644 --- a/locator/tablets.cc +++ b/locator/tablets.cc @@ -409,19 +409,25 @@ void tablet_metadata::drop_tablet_map(table_id id) { } future<> tablet_metadata::clear_gently() { - for (auto&& [id, map] : _tablets) { - const auto shard = map.get_owner_shard(); - co_await smp::submit_to(shard, [map = std::move(map)] () mutable { - auto map_ptr = map.release(); - // Others copies exist, we simply drop ours, no need to clear anything. - if (map_ptr.use_count() > 1) { - return make_ready_future<>(); - } - return const_cast(*map_ptr).clear_gently().finally([map_ptr = std::move(map_ptr)] { }); - }); + tablet_logger.debug("tablet_metadata::clear_gently {}", fmt::ptr(this)); + // First, Sort the tablet maps per shard to avoid destruction of all foreign tablet map ptrs + // on this shard. We don't use sharded<> here since it will require a similar + // submit_to to each shard owner per tablet-map. + std::vector> tablet_maps_per_shard; + tablet_maps_per_shard.resize(smp::count); + for (auto& [_, map_ptr] : _tablets) { + tablet_maps_per_shard[map_ptr.get_owner_shard()].emplace_back(std::move(map_ptr)); } _tablets.clear(); + // Now destroy the foreign tablet map pointers on each shard. + co_await smp::invoke_on_all([&] -> future<> { + for (auto& map_ptr : tablet_maps_per_shard[this_shard_id()]) { + auto map = map_ptr.release(); + co_await utils::clear_gently(map); + } + }); + co_await utils::clear_gently(_table_groups); co_await utils::clear_gently(_base_table); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index e48c740db3..ecd2acaccd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -357,6 +357,7 @@ future> token_metadata_impl::clone_only_tok } future<> token_metadata_impl::clear_gently() noexcept { + _version_tracker = {}; co_await utils::clear_gently(_token_to_endpoint_map); co_await utils::clear_gently(_normal_token_owners); co_await utils::clear_gently(_bootstrap_tokens); @@ -834,16 +835,30 @@ token_metadata::token_metadata(std::unique_ptr impl) { } -token_metadata::token_metadata(config cfg) - : _impl(std::make_unique(cfg)) +token_metadata::token_metadata(shared_token_metadata& stm, config cfg) + : _shared_token_metadata(&stm) + , _impl(std::make_unique(std::move(cfg))) { } -token_metadata::~token_metadata() = default; +token_metadata::~token_metadata() { + clear_and_dispose_impl(); +} token_metadata::token_metadata(token_metadata&&) noexcept = default; -token_metadata& token_metadata::token_metadata::operator=(token_metadata&&) noexcept = default; +token_metadata& token_metadata::token_metadata::operator=(token_metadata&& o) noexcept { + if (this != &o) { + clear_and_dispose_impl(); + _shared_token_metadata = std::exchange(o._shared_token_metadata, nullptr); + _impl = std::exchange(o._impl, nullptr); + } + return *this; +} + +void token_metadata::set_shared_token_metadata(shared_token_metadata& stm) { + _shared_token_metadata = &stm; +} const std::vector& token_metadata::sorted_tokens() const { @@ -1027,6 +1042,15 @@ token_metadata::clone_after_all_left() const noexcept { co_return token_metadata(co_await _impl->clone_after_all_left()); } +void token_metadata::clear_and_dispose_impl() noexcept { + if (!_shared_token_metadata) { + return; + } + if (auto impl = std::exchange(_impl, nullptr)) { + _shared_token_metadata->clear_and_dispose(std::move(impl)); + } +} + future<> token_metadata::clear_gently() noexcept { return _impl->clear_gently(); } @@ -1143,6 +1167,17 @@ version_tracker shared_token_metadata::new_tracker(token_metadata::version_t ver return tracker; } +future<> shared_token_metadata::stop() noexcept { + co_await _background_dispose_gate.close(); +} + +void shared_token_metadata::clear_and_dispose(std::unique_ptr impl) noexcept { + // Safe to drop the future since the gate is closed in stop() + if (auto gh = _background_dispose_gate.try_hold()) { + (void)impl->clear_gently().finally([i = std::move(impl), gh = std::move(gh)] {}); + } +} + void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { if (_shared->get_ring_version() >= tmptr->get_ring_version()) { on_internal_error(tlogger, format("shared_token_metadata: must not set non-increasing ring_version: {} -> {}", _shared->get_ring_version(), tmptr->get_ring_version())); @@ -1154,6 +1189,7 @@ void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept { _stale_versions_in_use = _versions_barrier.advance_and_await(); } + tmptr->set_shared_token_metadata(*this); _shared = std::move(tmptr); _shared->set_version_tracker(new_tracker(_shared->get_version())); @@ -1216,7 +1252,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded pending_token_metadata_ptr; pending_token_metadata_ptr.resize(smp::count); - auto tmptr = make_token_metadata_ptr(co_await stm.local().get()->clone_async()); + auto tmptr = stm.local().make_token_metadata_ptr(co_await stm.local().get()->clone_async()); auto& tm = *tmptr; // bump the token_metadata ring_version // to invalidate cached token/replication mappings @@ -1227,7 +1263,7 @@ future<> shared_token_metadata::mutate_on_all_shards(sharded future<> { - pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tm.clone_async()); + pending_token_metadata_ptr[this_shard_id()] = stm.local().make_token_metadata_ptr(co_await tm.clone_async()); }); co_await stm.invoke_on_all([&] (shared_token_metadata& stm) { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index efeb81c1e7..29d83dfddf 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -47,7 +47,7 @@ class abstract_replication_strategy; using token = dht::token; -class token_metadata; +class shared_token_metadata; class tablet_metadata; struct host_id_or_endpoint { @@ -166,6 +166,7 @@ private: }; class token_metadata final { + shared_token_metadata* _shared_token_metadata = nullptr; std::unique_ptr _impl; private: friend class token_metadata_ring_splitter; @@ -178,7 +179,7 @@ public: using version_t = service::topology::version_t; using version_tracker_t = version_tracker; - token_metadata(config cfg); + token_metadata(shared_token_metadata& stm, config cfg); explicit token_metadata(std::unique_ptr impl); token_metadata(token_metadata&&) noexcept; // Can't use "= default;" - hits some static_assert in unique_ptr token_metadata& operator=(token_metadata&&) noexcept; @@ -355,6 +356,11 @@ public: friend class shared_token_metadata; private: void set_version_tracker(version_tracker_t tracker); + + void set_shared_token_metadata(shared_token_metadata& stm); + + // Clears and disposes the token metadata impl in the background, if present. + void clear_and_dispose_impl() noexcept; }; struct topology_change_info { @@ -371,12 +377,8 @@ struct topology_change_info { using token_metadata_lock = semaphore_units<>; using token_metadata_lock_func = noncopyable_function() noexcept>; -template -mutable_token_metadata_ptr make_token_metadata_ptr(Args... args) { - return make_lw_shared(std::forward(args)...); -} - -class shared_token_metadata { +class shared_token_metadata : public peering_sharded_service { + named_gate _background_dispose_gate{"shared_token_metadata::background_dispose_gate"}; mutable_token_metadata_ptr _shared; token_metadata_lock_func _lock_func; std::chrono::steady_clock::duration _stall_detector_threshold = std::chrono::seconds(2); @@ -408,7 +410,7 @@ public: // used to construct the shared object as a sharded<> instance // lock_func returns semaphore_units<> explicit shared_token_metadata(token_metadata_lock_func lock_func, token_metadata::config cfg) - : _shared(make_token_metadata_ptr(std::move(cfg))) + : _shared(make_lw_shared(*this, cfg)) , _lock_func(std::move(lock_func)) , _versions_barrier("shared_token_metadata::versions_barrier") { @@ -418,6 +420,17 @@ public: shared_token_metadata(const shared_token_metadata& x) = delete; shared_token_metadata(shared_token_metadata&& x) = default; + future<> stop() noexcept; + + mutable_token_metadata_ptr make_token_metadata_ptr() { + return make_lw_shared(*this, token_metadata::config{_shared->get_topology().get_config()}); + } + + mutable_token_metadata_ptr make_token_metadata_ptr(token_metadata&& tm) { + tm.set_shared_token_metadata(*this); + return make_lw_shared(std::move(tm)); + } + token_metadata_ptr get() const noexcept { return _shared; } @@ -467,6 +480,8 @@ public: // Must be called on shard 0. static future<> mutate_on_all_shards(sharded& stm, seastar::noncopyable_function (token_metadata&)> func); + void clear_and_dispose(std::unique_ptr impl) noexcept; + private: // for testing only, unsafe to be called without awaiting get_lock() first void mutate_token_metadata_for_test(seastar::noncopyable_function func); diff --git a/repair/repair.cc b/repair/repair.cc index 6ac894d206..af1381ba97 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -2244,7 +2244,7 @@ future<> repair_service::replace_with_repair(std::unordered_mapupdate_topology(tmptr->get_my_id(), myloc, locator::node::state::replacing); co_await cloned_tmptr->update_normal_tokens(replacing_tokens, tmptr->get_my_id()); auto source_dc = utils::optional_param(myloc.dc); diff --git a/replica/database.cc b/replica/database.cc index 7568139d07..c7e905792e 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -355,7 +355,7 @@ database::view_update_read_concurrency_sem() { return *sem; } -database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, +database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory& scf, const abort_source& abort, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) , _user_types(std::make_shared(*this)) diff --git a/replica/database.hh b/replica/database.hh index 957f403407..33022739ac 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1599,7 +1599,7 @@ private: service::migration_notifier& _mnotifier; gms::feature_service& _feat; std::vector _listeners; - const locator::shared_token_metadata& _shared_token_metadata; + locator::shared_token_metadata& _shared_token_metadata; lang::manager& _lang_manager; reader_concurrency_semaphore_group _reader_concurrency_semaphores_group; @@ -1684,7 +1684,7 @@ public: // (keyspace/table definitions, column mappings etc.) future<> parse_system_tables(distributed&, sharded&); - database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, + database(const db::config&, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, sstable_compressor_factory&, const abort_source& abort, utils::cross_shard_barrier barrier = utils::cross_shard_barrier(utils::cross_shard_barrier::solo{}) /* for single-shard usage */); database(database&&) = delete; @@ -1719,7 +1719,7 @@ public: return _compaction_manager; } - const locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; } + locator::shared_token_metadata& get_shared_token_metadata() const { return _shared_token_metadata; } locator::token_metadata_ptr get_token_metadata_ptr() const { return _shared_token_metadata.get(); } const locator::token_metadata& get_token_metadata() const { return *_shared_token_metadata.get(); } diff --git a/service/storage_service.cc b/service/storage_service.cc index 319e7d69df..3932463169 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -740,9 +740,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) { auto saved_tmpr = get_token_metadata_ptr(); { auto tmlock = co_await get_token_metadata_lock(); - auto tmptr = make_token_metadata_ptr(token_metadata::config { - get_token_metadata().get_topology().get_config() - }); + auto tmptr = _shared_token_metadata.make_token_metadata_ptr(); tmptr->invalidate_cached_rings(); tmptr->set_version(_topology_state_machine._topology.version); @@ -3147,9 +3145,10 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt try { auto base_shard = this_shard_id(); pending_token_metadata_ptr[base_shard] = tmptr; + auto& sharded_token_metadata = _shared_token_metadata.container(); // clone a local copy of updated token_metadata on all other shards co_await smp::invoke_on_others(base_shard, [&, tmptr] () -> future<> { - pending_token_metadata_ptr[this_shard_id()] = make_token_metadata_ptr(co_await tmptr->clone_async()); + pending_token_metadata_ptr[this_shard_id()] = sharded_token_metadata.local().make_token_metadata_ptr(co_await tmptr->clone_async()); }); // Precalculate new effective_replication_map for all keyspaces diff --git a/service/storage_service.hh b/service/storage_service.hh index ee3ccd78da..bced4eeffd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -287,12 +287,12 @@ private: future<> snitch_reconfigured(); future get_mutable_token_metadata_ptr() noexcept { - return _shared_token_metadata.get()->clone_async().then([] (token_metadata tm) { + return _shared_token_metadata.get()->clone_async().then([this] (token_metadata tm) { // bump the token_metadata ring_version // to invalidate cached token/replication mappings // when the modified token_metadata is committed. tm.invalidate_cached_rings(); - return make_ready_future(make_token_metadata_ptr(std::move(tm))); + return _shared_token_metadata.make_token_metadata_ptr(std::move(tm)); }); } diff --git a/test/boost/locator_topology_test.cc b/test/boost/locator_topology_test.cc index 1256a33a49..4cc068a88a 100644 --- a/test/boost/locator_topology_test.cc +++ b/test/boost/locator_topology_test.cc @@ -14,6 +14,7 @@ #include #include #include +#include #include "locator/types.hh" #include "test/lib/scylla_test_case.hh" @@ -213,6 +214,7 @@ SEASTAR_THREAD_TEST_CASE(test_load_sketch) { .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) { tm.update_topology(host1, locator::endpoint_dc_rack::default_location, node::state::normal, node1_shard_count); diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 7267f22d64..7a7012aa48 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -280,6 +280,7 @@ void simple_test() { tm_cfg.topo_cfg.this_endpoint = my_address; tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); std::vector ring_points = { { 1.0, inet_address("192.100.10.1") }, @@ -363,6 +364,7 @@ void heavy_origin_test() { locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, locator::token_metadata::config{locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_stm = deferred_stop(stm); std::vector dc_racks = {2, 4, 8}; std::vector dc_endpoints = {128, 256, 512}; @@ -476,6 +478,7 @@ SEASTAR_THREAD_TEST_CASE(NetworkTopologyStrategy_tablets_test) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -567,6 +570,7 @@ static void test_random_balancing(sharded& snitch, gms::inet_address // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -897,6 +901,7 @@ SEASTAR_THREAD_TEST_CASE(testCalculateEndpoints) { for (size_t run = 0; run < RUNS; ++run) { semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); std::unordered_set random_tokens; while (random_tokens.size() < nodes.size() * VNODES) { @@ -1043,6 +1048,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_compare_endpoints) { semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) { auto& topo = tm.get_topology(); generate_topology(topo, datacenters, nodes); @@ -1087,6 +1093,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_sort_by_proximity) { tm_cfg.topo_cfg.local_dc_rack = locator::endpoint_dc_rack::default_location; semaphore sem(1); shared_token_metadata stm([&sem] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { generate_topology(tm.get_topology(), datacenters, nodes); return make_ready_future(); @@ -1122,6 +1129,7 @@ SEASTAR_THREAD_TEST_CASE(test_topology_tracks_local_node) { .local_dc_rack = ip1_dc_rack, } }); + auto stop_stm = deferred_stop(stm); // get_location() should work before any node is added @@ -1249,6 +1257,7 @@ SEASTAR_THREAD_TEST_CASE(tablets_simple_rack_aware_view_pairing_test) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { @@ -1401,6 +1410,7 @@ void test_complex_rack_aware_view_pairing_test(bool more_or_less) { // Initialize the token_metadata locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { auto& topo = tm.get_topology(); for (const auto& [ring_point, endpoint, id] : ring_points) { diff --git a/test/boost/service_level_controller_test.cc b/test/boost/service_level_controller_test.cc index 5375cedaaf..e1ad9ac6cc 100644 --- a/test/boost/service_level_controller_test.cc +++ b/test/boost/service_level_controller_test.cc @@ -14,7 +14,9 @@ #include #include +#include #include "seastarx.hh" + #include "service/qos/qos_common.hh" #include "test/lib/scylla_test_case.hh" #include "test/lib/test_utils.hh" @@ -107,6 +109,7 @@ SEASTAR_THREAD_TEST_CASE(subscriber_simple) { sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -180,6 +183,7 @@ SEASTAR_THREAD_TEST_CASE(too_many_service_levels) { sl_options.workload = service_level_options::workload_type::interactive; scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg1", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -256,6 +260,7 @@ SEASTAR_THREAD_TEST_CASE(add_remove_bad_sequence) { sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg3", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); auto stop_as = defer([&as] { as.stop().get(); }); @@ -282,6 +287,7 @@ SEASTAR_THREAD_TEST_CASE(verify_unset_shares_in_cache_when_service_level_created sl_options.shares.emplace(1000); scheduling_group default_scheduling_group = create_scheduling_group("sl_default_sg", 1.0).get(); locator::shared_token_metadata tm({}, {locator::topology::config{ .local_dc_rack = locator::endpoint_dc_rack::default_location }}); + auto stop_tm = deferred_stop(tm); sharded as; as.start().get(); diff --git a/test/boost/storage_proxy_test.cc b/test/boost/storage_proxy_test.cc index 9c004c0395..3be36a3108 100644 --- a/test/boost/storage_proxy_test.cc +++ b/test/boost/storage_proxy_test.cc @@ -52,9 +52,11 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { } }; + auto& stm = e.shared_token_metadata().local(); + { // Ring with minimum token - auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()}); + auto tmptr = stm.make_token_metadata_ptr(); const auto host_id = locator::host_id{utils::UUID(0, 1)}; tmptr->update_topology(host_id, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal); tmptr->update_normal_tokens(std::unordered_set({dht::minimum_token()}), host_id).get(); @@ -69,7 +71,7 @@ SEASTAR_TEST_CASE(test_get_restricted_ranges) { } { - auto tmptr = locator::make_token_metadata_ptr(locator::token_metadata::config{e.shared_token_metadata().local().get()->get_topology().get_config()}); + auto tmptr = stm.make_token_metadata_ptr(); const auto id1 = locator::host_id{utils::UUID(0, 1)}; const auto id2 = locator::host_id{utils::UUID(0, 2)}; tmptr->update_topology(id1, locator::endpoint_dc_rack{"dc1", "rack1"}, locator::node::state::normal); diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index c2cd69ced8..a1aec7dde7 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -841,6 +841,7 @@ SEASTAR_TEST_CASE(test_get_shard) { .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + auto stop_stm = deferred_stop(stm); tablet_id tid(0); tablet_id tid1(0); @@ -1090,7 +1091,7 @@ SEASTAR_TEST_CASE(test_sharder) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + token_metadata tokm(e.get_shared_token_metadata().local(), token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); tokm.get_topology().add_or_update_endpoint(h1); std::vector tablet_ids; @@ -1305,7 +1306,14 @@ SEASTAR_TEST_CASE(test_intranode_sharding) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1, .local_dc_rack = locator::endpoint_dc_rack::default_location } }); + locator::token_metadata::config tm_cfg; + tm_cfg.topo_cfg.this_host_id = h1; + tm_cfg.topo_cfg.local_dc_rack = endpoint_dc_rack::default_location; + semaphore sem(1); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto tmptr = stm.make_token_metadata_ptr(); + auto& tokm = *tmptr; tokm.get_topology().add_or_update_endpoint(h1); auto leaving_replica = tablet_replica{h1, 5}; @@ -3779,6 +3787,7 @@ static void execute_tablet_for_new_rf_test(calculate_tablet_replicas_for_new_rf_ tm_cfg.topo_cfg.local_dc_rack = { snitch.local()->get_datacenter(), snitch.local()->get_rack() }; tm_cfg.topo_cfg.this_host_id = test_config.ring_points[0].id; locator::shared_token_metadata stm([] () noexcept { return db::schema_tables::hold_merge_lock(); }, tm_cfg); + auto stop_stm = deferred_stop(stm); // Initialize the token_metadata stm.mutate_token_metadata([&] (token_metadata& tm) -> future<> { diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index ba92fe80e8..4567e26146 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -8,6 +8,7 @@ #include #include +#include #include "test/lib/scylla_test_case.hh" #include "test/lib/test_utils.hh" #include "locator/token_metadata.hh" @@ -31,13 +32,11 @@ namespace { }; } - mutable_token_metadata_ptr create_token_metadata(host_id this_host_id) { - return make_lw_shared(token_metadata::config { - topology::config { - .this_host_id = this_host_id, - .local_dc_rack = get_dc_rack(this_host_id) - } - }); + token_metadata::config create_token_metadata_config(host_id this_host_id) { + return token_metadata::config{topology::config{ + .this_host_id = this_host_id, + .local_dc_rack = get_dc_rack(this_host_id) + }}; } template @@ -55,7 +54,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy const auto t1 = dht::token::from_int64(10); const auto t2 = dht::token::from_int64(20); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_normal_tokens({t1}, e1_id).get(); @@ -75,7 +78,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { const auto e1_id = gen_id(1); const auto e2_id = gen_id(2); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_normal_tokens({t1}, e1_id).get(); @@ -103,7 +110,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -133,7 +144,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -165,7 +180,11 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { const auto e3_id = gen_id(3); const auto e4_id = gen_id(4); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -201,7 +220,11 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas const auto e2_id = gen_id(2); const auto e3_id = gen_id(3); - auto token_metadata = create_token_metadata(e1_id); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id, get_dc_rack(e1_id), node::state::normal); token_metadata->update_topology(e2_id, get_dc_rack(e2_id), node::state::normal); token_metadata->update_topology(e3_id, get_dc_rack(e3_id), node::state::normal); @@ -254,7 +277,11 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { const auto e1_id1 = gen_id(1); const auto e1_id2 = gen_id(2); - auto token_metadata = create_token_metadata(e1_id2); + semaphore sem(1); + auto tm_cfg = create_token_metadata_config(e1_id2); + shared_token_metadata stm([&] () noexcept { return get_units(sem, 1); }, tm_cfg); + auto stop_stm = deferred_stop(stm); + auto token_metadata = stm.make_token_metadata_ptr(); token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced); token_metadata->update_normal_tokens({t1}, e1_id1).get(); diff --git a/test/cluster/test_tablets_merge.py b/test/cluster/test_tablets_merge.py index 8b203eaf6b..8018870b7a 100644 --- a/test/cluster/test_tablets_merge.py +++ b/test/cluster/test_tablets_merge.py @@ -376,6 +376,71 @@ async def test_tablet_merge_cross_rack_migrations(manager: ManagerClient, racks) return tablet_count < old_tablet_count or None await wait_for(finished_merging, time.time() + 120) +# Reproduces #23284 +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_split_merge_with_many_tables(manager: ManagerClient, racks = 2): + cmdline = ['--smp', '4', '-m', '2G', '--target-tablet-size-in-bytes', '30000', '--max-task-backlog', '200',] + config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']} + + servers = [] + rf = racks + for rack_id in range(0, racks): + rack = f'rack{rack_id+1}' + servers.extend(await manager.servers_add(3, config=config, cmdline=cmdline, property_file={'dc': 'mydc', 'rack': rack})) + + cql = manager.get_cql() + ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': 1}}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c blob) WITH compression = {{'sstable_compression': ''}};") + await asyncio.gather(*[cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, c blob);") for i in range(1, 200)]) + + async def check_logs(when): + for server in servers: + log = await manager.server_open_log(server.server_id) + matches = await log.grep("Too long queue accumulated for gossip") + if matches: + pytest.fail(f"Server {server.server_id} has too long queue accumulated for gossip {when}: {matches=}") + + await check_logs("after creating tables") + + total_keys = 400 + keys = range(total_keys) + insert = cql.prepare(f"INSERT INTO {ks}.test(pk, c) VALUES(?, ?)") + for pk in keys: + value = random.randbytes(2000) + cql.execute(insert, [pk, value]) + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, ks) + + async def finished_splitting(): + # FIXME: fragile since it's expecting on-disk size will be enough to produce a few splits. + # (raw_data=800k / target_size=30k) = ~26, lower power-of-two is 16. Compression was disabled. + # Per-table hints (min_tablet_count) can be used to improve this. + tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + return tablet_count >= 16 or None + # Give enough time for split to happen in debug mode + await wait_for(finished_splitting, time.time() + 120) + + await check_logs("after split completion") + + delete_keys = range(total_keys - 1) + await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in delete_keys]) + keys = range(total_keys - 1, total_keys) + + old_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + + for server in servers: + await manager.api.flush_keyspace(server.ip_addr, ks) + await manager.api.keyspace_compaction(server.ip_addr, ks) + + async def finished_merging(): + tablet_count = await get_tablet_count(manager, servers[0], ks, 'test') + return tablet_count < old_tablet_count or None + await wait_for(finished_merging, time.time() + 120) + + await check_logs("after merge completion") + # Reproduces use-after-free when migration right after merge, but concurrently to background # merge completion handler. # See: https://github.com/scylladb/scylladb/issues/24045