diff --git a/gms/gossiper.cc b/gms/gossiper.cc index dd9f2becb2..70746d0c96 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2121,14 +2121,18 @@ void gossiper::build_seeds_list() { } } -future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id pid) { +future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) { if (host_id == my_host_id()) { logger.debug("Attempt to add self as saved endpoint"); co_return; } + const auto& ep = st.endpoint; if (!host_id) { on_internal_error(logger, format("Attempt to add {} with null host_id as saved endpoint", ep)); } + if (ep == inet_address{}) { + on_internal_error(logger, format("Attempt to add {} with null inet_address as saved endpoint", host_id)); + } if (ep == get_broadcast_address()) { on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep)); } @@ -2158,6 +2162,10 @@ future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep, std::unordered_set tokens_set(tokens.begin(), tokens.end()); ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set)); } + if (st.opt_dc_rack) { + ep_state.add_application_state(gms::application_state::DC, gms::versioned_value::datacenter(st.opt_dc_rack->dc)); + ep_state.add_application_state(gms::application_state::RACK, gms::versioned_value::datacenter(st.opt_dc_rack->rack)); + } auto generation = ep_state.get_heart_beat_state().get_generation(); co_await replicate(ep, std::move(ep_state), permit.id()); _unreachable_endpoints[ep] = now(); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a583b2ee1d..77518081f0 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -620,7 +620,7 @@ public: /** * Add an endpoint we knew about previously, but whose state is unknown */ - future<> add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id); + future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id); future<> add_local_application_state(application_state state, versioned_value value); diff --git a/service/storage_service.cc b/service/storage_service.cc index 66150bb418..417f9a4421 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -32,6 +32,7 @@ #include "db/consistency_level.hh" #include "seastar/core/when_all.hh" #include "service/tablet_allocator.hh" +#include "locator/types.hh" #include "locator/tablets.hh" #include "locator/tablet_metadata_guard.hh" #include "replica/tablet_mutation_builder.hh" @@ -696,7 +697,13 @@ future<> storage_service::topology_state_load() { if (is_me(e)) { continue; } - const auto ep = tmptr->get_endpoint_for_host_id(e); + const auto& topo = tmptr->get_topology(); + const auto* node = topo.find_node(e); + // node must exist in topology if it's in tmptr->get_all_endpoints + if (!node) { + on_internal_error(slogger, format("Found no node for {} in topology", e)); + } + const auto& ep = node->endpoint(); if (ep == inet_address{}) { continue; } @@ -705,7 +712,11 @@ future<> storage_service::topology_state_load() { // since it is not loaded in join_cluster in the // raft_topology_change_enabled() case. if (!_gossiper.get_endpoint_state_ptr(ep)) { - co_await _gossiper.add_saved_endpoint(e, ep, permit.id()); + gms::loaded_endpoint_state st; + st.endpoint = ep; + st.tokens = boost::copy_range>(tmptr->get_tokens(e)); + st.opt_dc_rack = node->dc_rack(); + co_await _gossiper.add_saved_endpoint(e, std::move(st), permit.id()); } } @@ -1469,7 +1480,7 @@ future<> storage_service::join_token_ring(sharded storage_service::join_cluster(sharded& slogger.info("Loading persisted ring state"); loaded_endpoints = co_await _sys_ks.local().load_endpoint_state(); - auto get_dc_rack = [] (const gms::loaded_endpoint_state& st) { - return st.opt_dc_rack.value_or(locator::endpoint_dc_rack::default_location); - }; - - if (slogger.is_enabled(logging::log_level::debug)) { - for (const auto& [host_id, st] : loaded_endpoints) { - auto dc_rack = get_dc_rack(st); - slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens); - } - } - auto tmlock = co_await get_token_metadata_lock(); auto tmptr = co_await get_mutable_token_metadata_ptr(); - for (const auto& [host_id, st] : loaded_endpoints) { + for (auto& [host_id, st] : loaded_endpoints) { if (st.endpoint == get_broadcast_address()) { // entry has been mistakenly added, delete it + slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint); co_await _sys_ks.local().remove_endpoint(st.endpoint); } else if (st.tokens.empty()) { - slogger.debug("Not loading endpoint={}/{} since it owns no tokens", host_id, st.endpoint); + slogger.debug("Not loading saved endpoint={}/{} since it owns no tokens", host_id, st.endpoint); } else { if (host_id == my_host_id()) { on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id)); } - const auto dc_rack = get_dc_rack(st); + if (!st.opt_dc_rack) { + st.opt_dc_rack = locator::endpoint_dc_rack::default_location; + slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack); + } + const auto& dc_rack = *st.opt_dc_rack; + slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens); tmptr->update_topology(host_id, dc_rack, locator::node::state::normal); co_await tmptr->update_normal_tokens(st.tokens, host_id); tmptr->update_host_id(host_id, st.endpoint); // gossiping hasn't started yet // so no need to lock the endpoint - co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id); + co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id); } } co_await replicate_to_all_cores(std::move(tmptr)); @@ -2800,7 +2806,7 @@ future<> storage_service::join_cluster(sharded& })); auto loaded_peer_features = co_await _sys_ks.local().load_peer_features(); slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", - initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); + initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); }