gossiper: add_saved_endpoint: set dc and rack
When loading endpoint_state from system.peers, pass the loaded nodes dc/rack info from storage_service::join_token_ring to gossiper::add_saved_endpoint. Load the endpoint DC/RACK information to the endpoint_state, if available so they can propagate to bootstrapping nodes via gossip, even if those nodes are DOWN after a full cluster-restart. Note that this change makes the host_id presence mandatory following https://github.com/scylladb/scylladb/pull/16376. The reason to do so is that the other states: tokens, dc, and rack are useless with the host_id. This change is backward compatible since the HOST_ID application state was written to system.peers since inception in scylla and it would be missing only due to potential exception in older versions that failed to write it. In this case, manual intervention is needed and the correct HOST_ID needs to be manually updated in system.peers. Refs #15787 Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -2121,14 +2121,18 @@ void gossiper::build_seeds_list() {
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id pid) {
|
||||
future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) {
|
||||
if (host_id == my_host_id()) {
|
||||
logger.debug("Attempt to add self as saved endpoint");
|
||||
co_return;
|
||||
}
|
||||
const auto& ep = st.endpoint;
|
||||
if (!host_id) {
|
||||
on_internal_error(logger, format("Attempt to add {} with null host_id as saved endpoint", ep));
|
||||
}
|
||||
if (ep == inet_address{}) {
|
||||
on_internal_error(logger, format("Attempt to add {} with null inet_address as saved endpoint", host_id));
|
||||
}
|
||||
if (ep == get_broadcast_address()) {
|
||||
on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep));
|
||||
}
|
||||
@@ -2158,6 +2162,10 @@ future<> gossiper::add_saved_endpoint(locator::host_id host_id, inet_address ep,
|
||||
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
|
||||
ep_state.add_application_state(gms::application_state::TOKENS, versioned_value::tokens(tokens_set));
|
||||
}
|
||||
if (st.opt_dc_rack) {
|
||||
ep_state.add_application_state(gms::application_state::DC, gms::versioned_value::datacenter(st.opt_dc_rack->dc));
|
||||
ep_state.add_application_state(gms::application_state::RACK, gms::versioned_value::datacenter(st.opt_dc_rack->rack));
|
||||
}
|
||||
auto generation = ep_state.get_heart_beat_state().get_generation();
|
||||
co_await replicate(ep, std::move(ep_state), permit.id());
|
||||
_unreachable_endpoints[ep] = now();
|
||||
|
||||
@@ -620,7 +620,7 @@ public:
|
||||
/**
|
||||
* Add an endpoint we knew about previously, but whose state is unknown
|
||||
*/
|
||||
future<> add_saved_endpoint(locator::host_id host_id, inet_address ep, permit_id);
|
||||
future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id);
|
||||
|
||||
future<> add_local_application_state(application_state state, versioned_value value);
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include "db/consistency_level.hh"
|
||||
#include "seastar/core/when_all.hh"
|
||||
#include "service/tablet_allocator.hh"
|
||||
#include "locator/types.hh"
|
||||
#include "locator/tablets.hh"
|
||||
#include "locator/tablet_metadata_guard.hh"
|
||||
#include "replica/tablet_mutation_builder.hh"
|
||||
@@ -696,7 +697,13 @@ future<> storage_service::topology_state_load() {
|
||||
if (is_me(e)) {
|
||||
continue;
|
||||
}
|
||||
const auto ep = tmptr->get_endpoint_for_host_id(e);
|
||||
const auto& topo = tmptr->get_topology();
|
||||
const auto* node = topo.find_node(e);
|
||||
// node must exist in topology if it's in tmptr->get_all_endpoints
|
||||
if (!node) {
|
||||
on_internal_error(slogger, format("Found no node for {} in topology", e));
|
||||
}
|
||||
const auto& ep = node->endpoint();
|
||||
if (ep == inet_address{}) {
|
||||
continue;
|
||||
}
|
||||
@@ -705,7 +712,11 @@ future<> storage_service::topology_state_load() {
|
||||
// since it is not loaded in join_cluster in the
|
||||
// raft_topology_change_enabled() case.
|
||||
if (!_gossiper.get_endpoint_state_ptr(ep)) {
|
||||
co_await _gossiper.add_saved_endpoint(e, ep, permit.id());
|
||||
gms::loaded_endpoint_state st;
|
||||
st.endpoint = ep;
|
||||
st.tokens = boost::copy_range<std::unordered_set<dht::token>>(tmptr->get_tokens(e));
|
||||
st.opt_dc_rack = node->dc_rack();
|
||||
co_await _gossiper.add_saved_endpoint(e, std::move(st), permit.id());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1469,7 +1480,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
|
||||
for (const auto& [host_id, st] : loaded_endpoints) {
|
||||
// gossiping hasn't started yet
|
||||
// so no need to lock the endpoint
|
||||
co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id);
|
||||
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
auto features = _feature_service.supported_feature_set();
|
||||
@@ -2753,36 +2764,31 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
slogger.info("Loading persisted ring state");
|
||||
loaded_endpoints = co_await _sys_ks.local().load_endpoint_state();
|
||||
|
||||
auto get_dc_rack = [] (const gms::loaded_endpoint_state& st) {
|
||||
return st.opt_dc_rack.value_or(locator::endpoint_dc_rack::default_location);
|
||||
};
|
||||
|
||||
if (slogger.is_enabled(logging::log_level::debug)) {
|
||||
for (const auto& [host_id, st] : loaded_endpoints) {
|
||||
auto dc_rack = get_dc_rack(st);
|
||||
slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
|
||||
}
|
||||
}
|
||||
|
||||
auto tmlock = co_await get_token_metadata_lock();
|
||||
auto tmptr = co_await get_mutable_token_metadata_ptr();
|
||||
for (const auto& [host_id, st] : loaded_endpoints) {
|
||||
for (auto& [host_id, st] : loaded_endpoints) {
|
||||
if (st.endpoint == get_broadcast_address()) {
|
||||
// entry has been mistakenly added, delete it
|
||||
slogger.warn("Loaded saved endpoint={}/{} has my broadcast address. Deleting it", host_id, st.endpoint);
|
||||
co_await _sys_ks.local().remove_endpoint(st.endpoint);
|
||||
} else if (st.tokens.empty()) {
|
||||
slogger.debug("Not loading endpoint={}/{} since it owns no tokens", host_id, st.endpoint);
|
||||
slogger.debug("Not loading saved endpoint={}/{} since it owns no tokens", host_id, st.endpoint);
|
||||
} else {
|
||||
if (host_id == my_host_id()) {
|
||||
on_internal_error(slogger, format("Loaded saved endpoint {} with my host_id={}", st.endpoint, host_id));
|
||||
}
|
||||
const auto dc_rack = get_dc_rack(st);
|
||||
if (!st.opt_dc_rack) {
|
||||
st.opt_dc_rack = locator::endpoint_dc_rack::default_location;
|
||||
slogger.warn("Loaded no dc/rack for saved endpoint={}/{}. Set to default={}/{}", host_id, st.endpoint, st.opt_dc_rack->dc, st.opt_dc_rack->rack);
|
||||
}
|
||||
const auto& dc_rack = *st.opt_dc_rack;
|
||||
slogger.debug("Loaded tokens: endpoint={}/{} dc={} rack={} tokens={}", host_id, st.endpoint, dc_rack.dc, dc_rack.rack, st.tokens);
|
||||
tmptr->update_topology(host_id, dc_rack, locator::node::state::normal);
|
||||
co_await tmptr->update_normal_tokens(st.tokens, host_id);
|
||||
tmptr->update_host_id(host_id, st.endpoint);
|
||||
// gossiping hasn't started yet
|
||||
// so no need to lock the endpoint
|
||||
co_await _gossiper.add_saved_endpoint(host_id, st.endpoint, gms::null_permit_id);
|
||||
co_await _gossiper.add_saved_endpoint(host_id, st, gms::null_permit_id);
|
||||
}
|
||||
}
|
||||
co_await replicate_to_all_cores(std::move(tmptr));
|
||||
@@ -2800,7 +2806,7 @@ future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>&
|
||||
}));
|
||||
auto loaded_peer_features = co_await _sys_ks.local().load_peer_features();
|
||||
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
|
||||
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
|
||||
initial_contact_nodes, loaded_endpoints | boost::adaptors::map_keys, loaded_peer_features.size());
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user