From acbc667d3e1ceebeaf6548bcfe82da855b693155 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 30 Dec 2024 13:50:45 +0200 Subject: [PATCH] storage_service: set raft topology change mode before using it in join_cluster ss::join_cluster calls raft_topology_change_enabled() before the mode is initialized below in the same function. Fix it by changing the order. --- service/storage_service.cc | 77 +++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f4fcc6d91e..7468d91b04 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2938,38 +2938,7 @@ future<> storage_service::join_cluster(sharded& set_mode(mode::STARTING); - std::unordered_map loaded_endpoints; - if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) { - slogger.info("Loading persisted ring state"); - loaded_endpoints = co_await _sys_ks.local().load_endpoint_state(); - - auto tmlock = co_await get_token_metadata_lock(); - auto tmptr = co_await get_mutable_token_metadata_ptr(); - for (auto& [host_id, st] : loaded_endpoints) { - if (st.endpoint == get_broadcast_address()) { - // entry has been mistakenly added, delete it - slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint); - co_await _sys_ks.local().remove_endpoint(st.endpoint); - } else { - if (host_id == my_host_id()) { - on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); - } - if (!st.opt_dc_rack) { - st.opt_dc_rack = locator::endpoint_dc_rack::default_location; - slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack); - } - const auto& dc_rack = *st.opt_dc_rack; - slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens); - tmptr->update_topology(host_id, dc_rack, locator::node::state::normal); - co_await tmptr->update_normal_tokens(st.tokens, host_id); - tmptr->update_host_id(host_id, st.endpoint); - // gossiping hasn't started yet - // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id); - } - } - co_await replicate_to_all_cores(std::move(tmptr)); - } + std::unordered_map loaded_endpoints = co_await _sys_ks.local().load_endpoint_state(); // Seeds are now only used as the initial contact point nodes. If the // loaded_endpoints are empty which means this node is a completely new @@ -2981,12 +2950,6 @@ future<> storage_service::join_cluster(sharded& loaded_endpoints | std::views::transform([] (const auto& x) { return x.second.endpoint; }) | std::ranges::to>(); - auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); - slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints | std::views::keys, loaded_peer_features.size()); - for (auto& x : loaded_peer_features) { - slogger.info("peer={}, supported_features={}", x.first, x.second); - } if (_group0->client().in_recovery()) { slogger.info("Raft recovery - starting in legacy topology operations mode"); @@ -3044,6 +3007,44 @@ future<> storage_service::join_cluster(sharded& // We must allow restarts of zero-token nodes in the gossip-based topology due to the recovery mode. } + if (_db.local().get_config().load_ring_state() && !raft_topology_change_enabled()) { + slogger.info("Loading persisted ring state"); + + auto tmlock = co_await get_token_metadata_lock(); + auto tmptr = co_await get_mutable_token_metadata_ptr(); + for (auto& [host_id, st] : loaded_endpoints) { + if (st.endpoint == get_broadcast_address()) { + // entry has been mistakenly added, delete it + slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint); + co_await _sys_ks.local().remove_endpoint(st.endpoint); + } else { + if (host_id == my_host_id()) { + on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); + } + if (!st.opt_dc_rack) { + st.opt_dc_rack = locator::endpoint_dc_rack::default_location; + slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack); + } + const auto& dc_rack = *st.opt_dc_rack; + slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens); + tmptr->update_topology(host_id, dc_rack, locator::node::state::normal); + co_await tmptr->update_normal_tokens(st.tokens, host_id); + tmptr->update_host_id(host_id, st.endpoint); + // gossiping hasn't started yet + // so no need to lock the endpoint + co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id); + } + } + co_await replicate_to_all_cores(std::move(tmptr)); + } + + auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); + slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", + initial_contact_nodes, loaded_endpoints | std::views::keys, loaded_peer_features.size()); + for (auto& x : loaded_peer_features) { + slogger.info("peer={}, supported_features={}", x.first, x.second); + } + co_return co_await join_topology(sys_dist_ks, proxy, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay(), start_hm, new_generation); }