Files
scylladb/gms/gossiper.cc
Gleb Natapov 4e56ca3c76 gossiper: do not gossip TOKENS and CDC_GENERATION_ID any more
They were used by legacy topology and cdc code only.
2026-03-10 10:39:58 +02:00

2516 lines
111 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"
#include "gms/gossip_digest.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
#include "gms/gossip_digest_ack2.hh"
#include "gms/versioned_value.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "gms/application_state.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "message/messaging_service.hh"
#include "utils/log.hh"
#include "db/system_keyspace.hh"
#include <algorithm>
#include <fmt/chrono.h>
#include <fmt/ranges.h>
#include <ranges>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/metrics.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/util/defer.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/exception.hh>
#include <seastar/coroutine/switch_to.hh>
#include <chrono>
#include "locator/host_id.hh"
#include <boost/range/algorithm/set_algorithm.hpp>
#include <boost/range/algorithm/partition.hpp>
#include <utility>
#include "gms/generation-number.hh"
#include "locator/token_metadata.hh"
#include <seastar/core/shard_id.hh>
#include <seastar/rpc/rpc_types.hh>
#include "utils/assert.hh"
#include "utils/exceptions.hh"
#include "utils/error_injection.hh"
#include "idl/gossip.dist.hh"
#include <csignal>
#include "build_mode.hh"
#include "utils/labels.hh"
namespace gms {
using clk = gossiper::clk;
static logging::logger logger("gossip");
constexpr std::chrono::milliseconds gossiper::INTERVAL;
constexpr std::chrono::hours gossiper::A_VERY_LONG_TIME;
constexpr generation_type::value_type gossiper::MAX_GENERATION_DIFFERENCE;
const sstring& gossiper::get_cluster_name() const noexcept {
return _gcfg.cluster_name;
}
void gossiper::set_group0_id(utils::UUID group0_id) {
if (_gcfg.group0_id) {
on_internal_error(logger, "group0_id is already set");
}
_gcfg.group0_id = std::move(group0_id);
}
const utils::UUID& gossiper::get_group0_id() const noexcept {
return _gcfg.group0_id;
}
const utils::UUID& gossiper::get_recovery_leader() const noexcept {
return _gcfg.recovery_leader;
}
const std::set<inet_address>& gossiper::get_seeds() const noexcept {
return _gcfg.seeds;
}
gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, gossip_config gcfg, gossip_address_map& address_map,
service::topology_state_machine& tsm)
: _topo_sm(tsm)
, _abort_source(as)
, _shared_token_metadata(stm)
, _messaging(ms)
, _address_map(address_map)
, _gcfg(std::move(gcfg)) {
// Gossiper's stuff below runs only on CPU0
if (this_shard_id() != 0) {
return;
}
_scheduled_gossip_task.set_callback(_gcfg.gossip_scheduling_group, [this] { run(); });
// Register this instance with JMX
namespace sm = seastar::metrics;
auto ep = my_host_id();
_metrics.add_group("gossip", {
sm::make_counter("heart_beat",
[ep, this] {
auto es = get_endpoint_state_ptr(ep);
if (es) {
return es->get_heart_beat_state().get_heart_beat_version().value();
} else {
return 0;
}
}, sm::description("Heartbeat of the current Node."))(basic_level),
sm::make_gauge("live",
[this] {
return _live_endpoints.size();
}, sm::description("How many live nodes the current node sees"))(basic_level),
sm::make_gauge("unreachable",
[this] {
return _unreachable_endpoints.size();
}, sm::description("How many unreachable nodes the current node sees"))(basic_level),
});
// Add myself to the map on start
_address_map.add_or_update_entry(_gcfg.host_id, get_broadcast_address());
_address_map.set_nonexpiring(_gcfg.host_id);
}
/*
* First construct a map whose key is the endpoint in the GossipDigest and the value is the
* GossipDigest itself. Then build a list of version differences i.e difference between the
* version in the GossipDigest and the version in the local state for a given InetAddress.
* Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
* to the endpoint from the map that was initially constructed.
*/
void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) const {
/* Construct a map of endpoint to GossipDigest. */
std::map<inet_address, gossip_digest> ep_to_digest_map;
for (auto g_digest : g_digest_list) {
ep_to_digest_map.emplace(g_digest.get_endpoint(), g_digest);
}
/*
* These digests have their maxVersion set to the difference of the version
* of the local EndpointState and the version found in the GossipDigest.
*/
utils::chunked_vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
locator::host_id id = try_get_host_id(ep).value_or(locator::host_id{});
auto ep_state = get_endpoint_state_ptr(id);
version_type version = ep_state ? get_max_endpoint_state_version(*ep_state) : version_type();
int32_t diff_version = ::abs((version - g_digest.get_max_version()).value());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version)));
}
g_digest_list.clear();
std::sort(diff_digests.begin(), diff_digests.end());
int size = diff_digests.size();
/*
* Report the digests in descending order. This takes care of the endpoints
* that are far behind w.r.t this local endpoint
*/
for (int i = size - 1; i >= 0; --i) {
g_digest_list.emplace_back(ep_to_digest_map[diff_digests[i].get_endpoint()]);
}
}
// Depends on
// - no external dependency
future<> gossiper::handle_syn_msg(locator::host_id from, gossip_digest_syn syn_msg) {
logger.trace(
"handle_syn_msg():from={},cluster_name:peer={},local={},group0_id:peer={},local={},"
"recovery_leader:peer={},local={},partitioner_name:peer={},local={}",
from, syn_msg.cluster_id(), get_cluster_name(), syn_msg.group0_id(), get_group0_id(),
syn_msg.recovery_leader(), get_recovery_leader(), syn_msg.partioner(), get_partitioner_name());
if (!is_enabled()) {
co_return;
}
/* If the message is from a different cluster throw it away. */
if (syn_msg.cluster_id() != get_cluster_name()) {
logger.warn("ClusterName mismatch from {} {}!={}", from, syn_msg.cluster_id(), get_cluster_name());
co_return;
}
// Recovery leader mismatch implies an administrator's mistake during the Raft-based recovery procedure.
// Throw away the message and signal that something is wrong.
bool both_nodes_in_recovery = syn_msg.recovery_leader() && get_recovery_leader();
if (both_nodes_in_recovery && syn_msg.recovery_leader() != get_recovery_leader()) {
logger.warn("Recovery leader mismatch from {} {} != {},",
from, syn_msg.recovery_leader(), get_recovery_leader());
co_return;
}
// If the message is from a node with a different group0 id throw it away.
// A group0 id mismatch is expected during a rolling restart in the Raft-based recovery procedure.
bool no_recovery = !syn_msg.recovery_leader() && !get_recovery_leader();
bool group0_ids_mismatch = syn_msg.group0_id() && get_group0_id() && syn_msg.group0_id() != get_group0_id();
if (no_recovery && group0_ids_mismatch) {
logger.warn("Group0Id mismatch from {} {} != {}", from, syn_msg.group0_id(), get_group0_id());
co_return;
}
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
logger.warn("Partitioner mismatch from {} {}!={}", from, syn_msg.partioner(), get_partitioner_name());
co_return;
}
syn_msg_pending& p = _syn_handlers[from];
if (p.pending) {
// The latest syn message from peer has the latest information, so
// it is safe to drop the previous syn message and keep the latest
// one only.
logger.debug("Queue gossip syn msg from node {}, syn_msg={}", from, syn_msg);
p.syn_msg = std::move(syn_msg);
co_return;
}
// Process the syn message immediately
logger.debug("Process gossip syn msg from node {}, syn_msg={}", from, syn_msg);
p.pending = true;
for (;;) {
try {
co_await do_send_ack_msg(from, std::move(syn_msg));
if (!_syn_handlers.contains(from)) {
co_return;
}
syn_msg_pending& p = _syn_handlers[from];
if (p.syn_msg) {
// Process pending gossip syn msg and send ack msg back
logger.debug("Handle queued gossip syn msg from node {}, syn_msg={}, pending={}",
from, p.syn_msg, p.pending);
syn_msg = std::move(p.syn_msg.value());
p.syn_msg = {};
continue;
} else {
// No more pending syn msg to process
p.pending = false;
logger.debug("No more queued gossip syn msg from node {}, syn_msg={}, pending={}",
from, p.syn_msg, p.pending);
co_return;
}
} catch (...) {
auto ep = std::current_exception();
if (_syn_handlers.contains(from)) {
syn_msg_pending& p = _syn_handlers[from];
p.pending = false;
p.syn_msg = {};
}
logger.warn("Failed to process gossip syn msg from node {}: {}", from, ep);
throw;
}
}
}
future<> gossiper::do_send_ack_msg(locator::host_id from, gossip_digest_syn syn_msg) {
auto g_digest_list = syn_msg.get_gossip_digests();
do_sort(g_digest_list);
utils::chunked_vector<gossip_digest> delta_gossip_digest_list;
std::map<inet_address, endpoint_state> delta_ep_state_map;
examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
logger.debug("Calling do_send_ack_msg to node {}, syn_msg={}, ack_msg={}", from, syn_msg, ack_msg);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack(&_messaging, from, std::move(ack_msg));
}
static bool should_count_as_msg_processing(const std::map<inet_address, endpoint_state>& map) {
bool count_as_msg_processing = false;
for (const auto& x : map) {
const auto& state = x.second;
for (const auto& entry : state.get_application_state_map()) {
const auto& app_state = entry.first;
auto is_critical_state = [](application_state state) {
switch (state) {
case application_state::LOAD:
case application_state::VIEW_BACKLOG:
case application_state::CACHE_HITRATES:
case application_state::GROUP0_STATE_ID:
return false;
default:
return true;
}
};
if (is_critical_state(app_state)) {
count_as_msg_processing = true;
logger.debug("node={}, app_state={}, count_as_msg_processing={}",
x.first, app_state, count_as_msg_processing);
return count_as_msg_processing;
}
}
}
return count_as_msg_processing;
}
// Depends on
// - failure_detector
// - on_change callbacks, e.g., storage_service -> access db system_table
// - on_restart callbacks
// - on_join callbacks
// - on_alive
future<> gossiper::handle_ack_msg(locator::host_id id, gossip_digest_ack ack_msg) {
logger.trace("handle_ack_msg():from={},msg={}", id, ack_msg);
if (!is_enabled()) {
co_return;
}
auto g_digest_list = ack_msg.get_gossip_digest_list();
auto& ep_state_map = ack_msg.get_endpoint_state_map();
bool count_as_msg_processing = should_count_as_msg_processing(ep_state_map);
if (count_as_msg_processing) {
_msg_processing++;
}
auto mp = defer([count_as_msg_processing, this] {
if (count_as_msg_processing) {
_msg_processing--;
}
});
if (ep_state_map.size() > 0) {
update_timestamp_for_nodes(ep_state_map);
co_await apply_state_locally(std::move(ep_state_map));
}
auto from = id;
auto ack_msg_digest = std::move(g_digest_list);
ack_msg_pending& p = _ack_handlers[from];
if (p.pending) {
// The latest ack message digests from peer has the latest information, so
// it is safe to drop the previous ack message digests and keep the latest
// one only.
logger.debug("Queue gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest);
p.ack_msg_digest = std::move(ack_msg_digest);
co_return;
}
// Process the ack message immediately
logger.debug("Process gossip ack msg digests from node {}, ack_msg_digest={}", from, ack_msg_digest);
p.pending = true;
for (;;) {
try {
co_await do_send_ack2_msg(from, std::move(ack_msg_digest));
if (!_ack_handlers.contains(from)) {
co_return;
}
ack_msg_pending& p = _ack_handlers[from];
if (p.ack_msg_digest) {
// Process pending gossip ack msg digests and send ack2 msg back
logger.debug("Handle queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}",
from, p.ack_msg_digest, p.pending);
ack_msg_digest = std::move(p.ack_msg_digest.value());
p.ack_msg_digest= {};
continue;
} else {
// No more pending ack msg digests to process
p.pending = false;
logger.debug("No more queued gossip ack msg digests from node {}, ack_msg_digest={}, pending={}",
from, p.ack_msg_digest, p.pending);
co_return;
}
} catch (...) {
auto ep = std::current_exception();
if (_ack_handlers.contains(from)) {
ack_msg_pending& p = _ack_handlers[from];
p.pending = false;
p.ack_msg_digest = {};
}
logger.warn("Failed to process gossip ack msg digests from node {}: {}", from, ep);
throw;
}
}
}
future<> gossiper::do_send_ack2_msg(locator::host_id from, utils::chunked_vector<gossip_digest> ack_msg_digest) {
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : ack_msg_digest) {
inet_address addr = g_digest.get_endpoint();
locator::host_id id = try_get_host_id(addr).value_or(locator::host_id{});
const auto es = get_endpoint_state_ptr(id);
if (!es || es->get_heart_beat_state().get_generation() < g_digest.get_generation()) {
continue;
}
// Local generation for addr may have been increased since the
// current node sent an initial SYN. Comparing versions across
// different generations in get_state_for_version_bigger_than
// could result in losing some app states with smaller versions.
const auto version = es->get_heart_beat_state().get_generation() > g_digest.get_generation()
? version_type(0)
: g_digest.get_max_version();
auto local_ep_state_ptr = get_state_for_version_bigger_than(id, version);
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(addr, *local_ep_state_ptr);
}
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
logger.debug("Calling do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
co_await ser::gossip_rpc_verbs::send_gossip_digest_ack2(&_messaging, from, std::move(ack2_msg));
logger.debug("finished do_send_ack2_msg to node {}, ack_msg_digest={}, ack2_msg={}", from, ack_msg_digest, ack2_msg);
}
// Depends on
// - failure_detector
// - on_change callbacks, e.g., storage_service -> access db system_table
// - on_restart callbacks
// - on_join callbacks
// - on_alive callbacks
future<> gossiper::handle_ack2_msg(locator::host_id from, gossip_digest_ack2 msg) {
logger.trace("handle_ack2_msg():msg={}", msg);
if (!is_enabled()) {
co_return;
}
auto& remote_ep_state_map = msg.get_endpoint_state_map();
update_timestamp_for_nodes(remote_ep_state_map);
bool count_as_msg_processing = should_count_as_msg_processing(remote_ep_state_map);
if (count_as_msg_processing) {
_msg_processing++;
}
auto mp = defer([count_as_msg_processing, this] {
if (count_as_msg_processing) {
_msg_processing--;
}
});
co_await apply_state_locally(std::move(remote_ep_state_map));
}
future<> gossiper::handle_echo_msg(locator::host_id from_hid, seastar::rpc::opt_time_point timeout, std::optional<int64_t> generation_number_opt, bool notify_up) {
bool respond = true;
if (!_advertise_to_nodes.empty()) {
auto it = _advertise_to_nodes.find(from_hid);
if (it == _advertise_to_nodes.end()) {
respond = false;
} else {
auto es = get_endpoint_state_ptr(from_hid);
if (es) {
auto saved_generation_number = it->second;
auto current_generation_number = generation_number_opt ?
generation_type(generation_number_opt.value()) : es->get_heart_beat_state().get_generation();
respond = saved_generation_number == current_generation_number;
logger.debug("handle_echo_msg: from={}, saved_generation_number={}, current_generation_number={}",
from_hid, saved_generation_number, current_generation_number);
} else {
respond = false;
}
}
}
if (!respond) {
throw std::runtime_error("Not ready to respond gossip echo message");
}
if (notify_up) {
if (!timeout) {
on_internal_error(logger, "UP notification should have a timeout");
}
auto normal = [] (gossiper& g, locator::host_id hid) {
const auto& topo = g.get_token_metadata_ptr()->get_topology();
return topo.has_node(hid) && topo.find_node(hid)->is_normal();
};
co_await container().invoke_on(0, [from_hid, timeout, &normal] (gossiper& g) -> future<> {
try {
// Wait to see the node as normal. It may node be the case if the node bootstraps
while (rpc::rpc_clock_type::now() < *timeout && !(normal(g, from_hid) && g.is_alive(from_hid))) {
co_await sleep_abortable(std::chrono::milliseconds(100), g._abort_source);
}
} catch(...) {
logger.warn("handle_echo_msg: UP notification from {} failed with {}", from_hid, std::current_exception());
}
});
}
}
future<> gossiper::handle_shutdown_msg(locator::host_id from, std::optional<int64_t> generation_number_opt) {
if (!is_enabled()) {
logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
co_return;
}
auto permit = co_await lock_endpoint(from, null_permit_id);
if (generation_number_opt) {
debug_validate_gossip_generation(*generation_number_opt);
auto es = get_endpoint_state_ptr(from);
if (es) {
auto local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
if (local_generation.value() != generation_number_opt.value()) {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
co_return;
}
} else {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found",
from, generation_number_opt.value());
co_return;
}
}
co_await mark_as_shutdown(from, permit.id());
}
future<gossip_get_endpoint_states_response>
gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) {
std::unordered_map<gms::inet_address, gms::endpoint_state> map;
const auto& application_states_wanted = request.application_states;
for (const auto& [node, state] : _endpoint_state_map) {
const heart_beat_state& hbs = state->get_heart_beat_state();
auto it = map.find(state->get_ip());
if (it != map.end()) {
if (it->second.get_heart_beat_state().get_generation() < hbs.get_generation()) {
map.erase(it);
}
}
auto state_wanted = endpoint_state(hbs, state->get_ip());
auto& apps = state->get_application_state_map();
for (const auto& app : apps) {
if (application_states_wanted.count(app.first) > 0) {
state_wanted.get_application_state_map().emplace(app);
}
}
map.emplace(state->get_ip(), std::move(state_wanted));
}
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
}
future<rpc::no_wait_type> gossiper::background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn) {
(void)with_gate(_background_msg, [this, type = std::move(type), fn = std::move(fn)] () mutable {
return container().invoke_on(0, std::move(fn)).handle_exception([type = std::move(type)] (auto ep) {
logger.warn("Failed to handle {}: {}", type, ep);
});
});
return make_ready_future<rpc::no_wait_type>(netw::messaging_service::no_wait());
}
void gossiper::init_messaging_service_handler() {
ser::gossip_rpc_verbs::register_gossip_digest_syn(&_messaging, [this] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return background_msg("GOSSIP_DIGEST_SYN", [from, syn_msg = std::move(syn_msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_syn_msg(from, std::move(syn_msg));
});
});
ser::gossip_rpc_verbs::register_gossip_digest_ack(&_messaging, [this] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return background_msg("GOSSIP_DIGEST_ACK", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack_msg(from, std::move(msg));
});
});
ser::gossip_rpc_verbs::register_gossip_digest_ack2(&_messaging, [this] (const rpc::client_info& cinfo, gossip_digest_ack2 msg) {
auto from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return background_msg("GOSSIP_DIGEST_ACK2", [from, msg = std::move(msg)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_ack2_msg(from, std::move(msg));
});
});
ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point timeout, rpc::optional<int64_t> generation_number_opt, rpc::optional<bool> notify_up_opt) {
auto from_hid = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return handle_echo_msg(from_hid, timeout, generation_number_opt, notify_up_opt.value_or(false));
});
ser::gossip_rpc_verbs::register_gossip_shutdown(&_messaging, [this] (const rpc::client_info& cinfo, inet_address from, rpc::optional<int64_t> generation_number_opt) {
auto from_hid = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
return background_msg("GOSSIP_SHUTDOWN", [from_hid, generation_number_opt] (gms::gossiper& gossiper) {
return gossiper.handle_shutdown_msg(from_hid, generation_number_opt);
});
});
ser::gossip_rpc_verbs::register_gossip_get_endpoint_states(&_messaging, [this] (const rpc::client_info& cinfo, rpc::opt_time_point, gossip_get_endpoint_states_request request) {
return container().invoke_on(0, [request = std::move(request)] (gms::gossiper& gossiper) mutable {
return gossiper.handle_get_endpoint_states_msg(std::move(request));
});
});
}
future<> gossiper::uninit_messaging_service_handler() {
auto& ms = _messaging;
return ser::gossip_rpc_verbs::unregister(&ms);
}
template<typename T>
future<> gossiper::send_gossip(gossip_digest_syn message, std::set<T> epset) {
utils::chunked_vector<T> __live_endpoints(epset.begin(), epset.end());
size_t size = __live_endpoints.size();
if (size < 1) {
return make_ready_future<>();
}
/* Generate a random number from 0 -> size */
std::uniform_int_distribution<int> dist(0, size - 1);
int index = dist(_random_engine);
std::conditional_t<std::is_same_v<T, gms::inet_address>, netw::msg_addr, T> id{__live_endpoints[index]};
logger.trace("Sending a GossipDigestSyn to {} ...", id);
return ser::gossip_rpc_verbs::send_gossip_digest_syn(&_messaging, id, std::move(message)).handle_exception([id] (auto ep) {
// It is normal to reach here because it is normal that a node
// tries to send a SYN message to a peer node which is down before
// failure_detector thinks that peer node is down.
logger.trace("Fail to send GossipDigestSyn to {}: {}", id, ep);
});
}
future<> gossiper::do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round) {
co_await utils::get_local_injector().inject("delay_gossiper_apply", [&node, &remote_state](auto& handler) -> future<> {
const auto gossip_delay_node = handler.template get<std::string_view>("delay_node");
if (gossip_delay_node && !remote_state.get_host_id() && inet_address(sstring(gossip_delay_node.value())) == remote_state.get_ip()) {
logger.debug("delay_gossiper_apply: suspend for node {}", node);
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::minutes{5});
logger.debug("delay_gossiper_apply: resume for node {}", node);
}
});
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await lock_endpoint(node, null_permit_id);
auto es = get_endpoint_state_ptr(node);
// If remote state update does not contain a host id, check whether the endpoint still
// exists in the `_endpoint_state_map` since after a preemption point it could have been deleted.
if (!remote_state.get_host_id() && !es) {
throw std::runtime_error(format("Entry for host id {} does not exist in the endpoint state map.", node));
}
if (es) {
endpoint_state local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
auto remote_generation = remote_state.get_heart_beat_state().get_generation();
logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation);
if (remote_generation > generation_type(get_generation_number().value() + MAX_GENERATION_DIFFERENCE)) {
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
node, local_generation, remote_generation);
} else if (remote_generation > local_generation) {
logger.trace("Updating heartbeat state generation to {} from {} for {} (notify={})", remote_generation, local_generation, node, !shadow_round);
// major state change will handle the update by inserting the remote state directly
co_await handle_major_state_change(std::move(remote_state), permit.id(), shadow_round);
} else if (remote_generation == local_generation) {
// find maximum state
auto local_max_version = get_max_endpoint_state_version(local_state);
auto remote_max_version = get_max_endpoint_state_version(remote_state);
if (remote_max_version > local_max_version) {
// apply states, but do not notify since there is no major change
co_await apply_new_states(std::move(local_state), remote_state, permit.id(), shadow_round);
} else {
logger.debug("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
}
// Re-rake after apply_new_states
es = get_endpoint_state_ptr(node);
if (!is_alive(es->get_host_id()) && !is_dead_state(*es) && !shadow_round) { // unless of course, it was dead
mark_alive(es);
}
} else {
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
}
} else {
logger.debug("Applying remote_state for node {} ({} node)", node, !shadow_round ? "old" : "new");
co_await handle_major_state_change(std::move(remote_state), permit.id(), shadow_round);
}
}
future<> gossiper::apply_state_locally_in_shadow_round(std::unordered_map<inet_address, endpoint_state> map) {
for (auto& [node, remote_state] : map) {
remote_state.set_ip(node);
auto id = remote_state.get_host_id();
co_await do_apply_state_locally(id, std::move(remote_state), true);
}
}
future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> map) {
auto start = std::chrono::steady_clock::now();
auto endpoints = map | std::views::keys | std::ranges::to<utils::chunked_vector<inet_address>>();
std::shuffle(endpoints.begin(), endpoints.end(), _random_engine);
auto node_is_seed = [this] (gms::inet_address ip) { return is_seed(ip); };
boost::partition(endpoints, node_is_seed);
logger.debug("apply_state_locally_endpoints={}", endpoints);
co_await coroutine::parallel_for_each(endpoints, [this, &map] (auto&& ep) -> future<> {
if (ep == get_broadcast_address()) {
return make_ready_future<>();
}
auto it = map.find(ep);
it->second.set_ip(ep);
locator::host_id hid = it->second.get_host_id();
if (hid == locator::host_id::create_null_id()) {
// If there is no host id in the new state there should be one locally
hid = get_host_id(ep);
}
if (hid == my_host_id()) {
logger.trace("Ignoring gossip for {} because it maps to local id, but is not local address", ep);
return make_ready_future<>();
}
if (_topo_sm._topology.left_nodes.contains(raft::server_id(hid.uuid()))) {
logger.trace("Ignoring gossip for {} because it left", ep);
return make_ready_future<>();
}
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, hid, state = std::move(it->second)] () mutable {
return do_apply_state_locally(hid, std::move(state), false);
});
});
logger.debug("apply_state_locally() took {} ms", std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start).count());
}
future<> gossiper::force_remove_endpoint(locator::host_id id, permit_id pid) {
co_await coroutine::switch_to(_gcfg.gossip_scheduling_group);
co_await container().invoke_on(0, [pid, id] (auto& gossiper) mutable -> future<> {
auto permit = co_await gossiper.lock_endpoint(id, pid);
pid = permit.id();
try {
if (id == gossiper.my_host_id()) {
throw std::runtime_error(format("Can not force remove node {} itself", id));
}
if (!gossiper._endpoint_state_map.contains(id)) {
logger.debug("Force remove node is called on non exiting endpoint {}", id);
co_return;
}
co_await gossiper.remove_endpoint(id, pid);
co_await gossiper.evict_from_membership(id, pid);
logger.info("Finished to force remove node {}", id);
} catch (...) {
logger.warn("Failed to force remove node {}: {}", id, std::current_exception());
}
});
}
future<> gossiper::remove_endpoint(locator::host_id endpoint, permit_id pid) {
auto permit = co_await lock_endpoint(endpoint, pid);
pid = permit.id();
auto state = get_endpoint_state_ptr(endpoint);
auto ip = state ? state->get_ip() : inet_address{};
// do subscribers first so anything in the subscriber that depends on gossiper state won't get confused
try {
co_await _subscribers.for_each([endpoint, ip, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_remove(ip, endpoint, pid);
});
} catch (...) {
logger.warn("Fail to call on_remove callback: {}", std::current_exception());
}
if (!state) {
logger.warn("There is no state for the removed IP {}", endpoint);
co_return;
}
if(_seeds.contains(ip)) {
build_seeds_list();
_seeds.erase(ip);
logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds);
}
auto host_id = state->get_host_id();
bool was_alive = false;
co_await mutate_live_and_unreachable_endpoints([host_id, &was_alive] (live_and_unreachable_endpoints& data) {
was_alive = data.live.erase(host_id);
data.unreachable.erase(host_id);
});
_syn_handlers.erase(host_id);
_ack_handlers.erase(host_id);
logger.info("Removed endpoint {}", endpoint);
if (was_alive) {
try {
logger.info("InetAddress {}/{} is now DOWN, status = {}", state->get_host_id(), ip, get_gossip_status(*state));
co_await do_on_dead_notifications(ip, std::move(state), pid);
} catch (...) {
logger.warn("Fail to call on_dead callback: {}", std::current_exception());
}
}
}
gossiper::endpoint_permit::endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, locator::host_id addr, std::source_location caller) noexcept
: _ptr(std::move(ptr))
, _permit_id(_ptr->pid)
, _addr(std::move(addr))
, _caller(std::move(caller))
{
++_ptr->holders;
if (!_ptr->first_holder) {
_ptr->first_holder = _caller;
}
_ptr->last_holder = _caller;
logger.debug("{}: lock_endpoint {}: acquired: permit_id={} holders={}", _caller.function_name(), _addr, _permit_id, _ptr->holders);
}
gossiper::endpoint_permit::endpoint_permit(endpoint_permit&& o) noexcept
: _ptr(std::exchange(o._ptr, nullptr))
, _permit_id(std::exchange(o._permit_id, null_permit_id))
, _addr(std::exchange(o._addr, locator::host_id{}))
, _caller(std::move(o._caller))
{}
gossiper::endpoint_permit::~endpoint_permit() {
release();
}
bool gossiper::endpoint_permit::release() noexcept {
if (auto ptr = std::exchange(_ptr, nullptr)) {
SCYLLA_ASSERT(ptr->pid == _permit_id);
logger.debug("{}: lock_endpoint {}: released: permit_id={} holders={}", _caller.function_name(), _addr, _permit_id, ptr->holders);
if (!--ptr->holders) {
logger.debug("{}: lock_endpoint {}: released: permit_id={}", _caller.function_name(), _addr, _permit_id);
ptr->units.return_all();
ptr->pid = null_permit_id;
ptr->first_holder = ptr->last_holder = std::nullopt;
_permit_id = null_permit_id;
return true;
}
}
return false;
}
gossiper::endpoint_lock_entry::endpoint_lock_entry() noexcept
: sem(1)
, pid(permit_id::create_null_id())
{}
future<gossiper::endpoint_permit> gossiper::lock_endpoint(locator::host_id ep, permit_id pid, std::source_location l) {
if (current_scheduling_group() != _gcfg.gossip_scheduling_group) {
logger.warn("Incorrect scheduling group used for gossiper::lock_endpoint: {}, should be {}, backtrace {}", current_scheduling_group().name(), _gcfg.gossip_scheduling_group.name(), current_backtrace());
}
if (this_shard_id() != 0) {
on_internal_error(logger, "lock_endpoint must be called on shard 0");
}
auto eptr = co_await _endpoint_locks.get_or_load(ep, [] (const locator::host_id& ep) { return endpoint_lock_entry(); });
if (pid) {
if (eptr->pid == pid) {
// Already locked with the same permit
co_return endpoint_permit(std::move(eptr), std::move(ep), std::move(l));
} else {
// permit_id mismatch means either that the endpoint lock was released,
// or maybe we're passed a permit_id that was acquired for a different endpoint.
on_internal_error_noexcept(logger, fmt::format("{}: lock_endpoint {}: permit_id={}: endpoint_lock_entry has mismatching permit_id={}", l.function_name(), ep, pid, eptr->pid));
}
}
pid = permit_id::create_random_id();
logger.debug("{}: lock_endpoint {}: waiting: permit_id={}", l.function_name(), ep, pid);
while (true) {
_abort_source.check();
static constexpr auto duration = std::chrono::minutes{1};
abort_on_expiry aoe(lowres_clock::now() + duration);
auto sub = _abort_source.subscribe([&aoe] () noexcept {
aoe.abort_source().request_abort();
});
SCYLLA_ASSERT(sub); // due to check() above
try {
eptr->units = co_await get_units(eptr->sem, 1, aoe.abort_source());
break;
} catch (const abort_requested_exception&) {
if (_abort_source.abort_requested()) {
throw;
}
// If we didn't rethrow above, the abort had to come from `abort_on_expiry`'s timer.
static constexpr auto fmt_loc = [] (const std::source_location& l) {
return fmt::format("{}({}:{}) `{}`", l.file_name(), l.line(), l.column(), l.function_name());
};
static constexpr auto fmt_loc_opt = [] (const std::optional<std::source_location>& l) {
if (!l) {
return "null"s;
}
return fmt_loc(*l);
};
logger.error(
"{}: waiting for endpoint lock (ep={}) took more than {}, signifying possible deadlock;"
" holders: {}, first holder: {}, last holder (might not be current): {}",
fmt_loc(l), ep, duration, eptr->holders, fmt_loc_opt(eptr->first_holder), fmt_loc_opt(eptr->last_holder));
}
}
eptr->pid = pid;
if (eptr->holders) {
on_internal_error_noexcept(logger, fmt::format("{}: lock_endpoint {}: newly held endpoint_lock_entry has {} holders", l.function_name(), ep, eptr->holders));
}
_abort_source.check();
co_return endpoint_permit(std::move(eptr), std::move(ep), std::move(l));
}
void gossiper::permit_internal_error(const locator::host_id& addr, permit_id pid) {
on_internal_error(logger, fmt::format("Must be called under lock_endpoint for node {}", addr));
}
future<semaphore_units<>> gossiper::lock_endpoint_update_semaphore() {
if (this_shard_id() != 0) {
on_internal_error(logger, "must be called on shard 0");
}
return get_units(_endpoint_update_semaphore, 1, _abort_source);
}
future<> gossiper::mutate_live_and_unreachable_endpoints(std::function<void(live_and_unreachable_endpoints&)> func) {
auto lock = co_await lock_endpoint_update_semaphore();
auto cloned = std::make_unique<live_and_unreachable_endpoints>(_live_endpoints, _unreachable_endpoints);
func(*cloned);
// Bump the _live_endpoints_version unconditionally,
// even if only _unreachable_endpoints changed.
// It will trigger a new round in the failure_detector_loop
// but that's not too bad as changing _unreachable_endpoints
// is rare enough.
co_await replicate_live_endpoints_on_change(make_foreign(std::move(cloned)), _live_endpoints_version + 1);
}
future<std::set<inet_address>> gossiper::get_live_members_synchronized() {
return container().invoke_on(0, [] (gms::gossiper& g) -> future<std::set<inet_address>> {
// Make sure the value we return is synchronized on all shards
auto lock = co_await g.lock_endpoint_update_semaphore();
co_return g.get_live_members() | std::views::transform([&g] (auto id) { return g._address_map.get(id); }) | std::ranges::to<std::set>();
});
}
future<std::set<inet_address>> gossiper::get_unreachable_members_synchronized() {
return container().invoke_on(0, [] (gms::gossiper& g) -> future<std::set<inet_address>> {
// Make sure the value we return is synchronized on all shards
auto lock = co_await g.lock_endpoint_update_semaphore();
co_return g.get_unreachable_members() | std::views::transform([&g] (auto id) { return g._address_map.get(id); }) | std::ranges::to<std::set>();
});
}
future<> gossiper::send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up) {
return ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, host_id, netw::messaging_service::clock_type::now() + timeout_ms, _abort_source, generation_number, notify_up);
}
future<> gossiper::failure_detector_loop_sleep(std::chrono::seconds duration) {
bool sleep_expired = false;
bool abort_requested = false;
timer<> sleep_timer([&] {
sleep_expired = true;
_failure_detector_loop_cv.signal();
});
auto as_sub = _abort_source.subscribe([&] () noexcept {
abort_requested = true;
sleep_timer.cancel();
_failure_detector_loop_cv.signal();
});
sleep_timer.arm(duration);
while (is_enabled() && !sleep_expired && !abort_requested) {
co_await _failure_detector_loop_cv.when();
}
}
future<> gossiper::failure_detector_loop_for_node(locator::host_id host_id, generation_type gossip_generation, uint64_t live_endpoints_version) {
auto last = gossiper::clk::now();
auto diff = gossiper::clk::duration(0);
auto echo_interval = std::chrono::seconds(2);
auto max_duration = echo_interval + std::chrono::milliseconds(_gcfg.failure_detector_timeout_ms());
while (is_enabled() && !_abort_source.abort_requested()) {
auto node = _address_map.find(host_id);
bool failed = false;
try {
logger.debug("failure_detector_loop: Send echo to node {}/{}, status = started", host_id, node);
co_await send_echo(host_id, max_duration, gossip_generation.value(), false);
logger.debug("failure_detector_loop: Send echo to node {}/{}, status = ok", host_id, node);
} catch (...) {
failed = true;
logger.warn("failure_detector_loop: Send echo to node {}/{}, status = failed: {}", host_id, node, std::current_exception());
}
auto now = gossiper::clk::now();
diff = now - last;
if (!failed) {
last = now;
}
if (diff > max_duration) {
logger.info("failure_detector_loop: Mark node {}/{} as DOWN", host_id, node);
co_await container().invoke_on(0, [host_id] (gms::gossiper& g) {
return g.convict(host_id);
});
co_return;
}
// When live_endpoints changes, live_endpoints_version changes. When
// live_endpoints changes, it is the time to re-distribute live nodes
// to different shards. We return from the per node loop here. The
// failure_detector_loop main loop will restart the per node loop.
if (_live_endpoints_version != live_endpoints_version) {
logger.debug("failure_detector_loop: Finished loop for node {}/{}, live_endpoints={}, current_live_endpoints_version={}, live_endpoints_version={}",
host_id, node, _live_endpoints, _live_endpoints_version, live_endpoints_version);
co_return;
} else {
co_await failure_detector_loop_sleep(echo_interval);
}
}
co_return;
}
future<> gossiper::failure_detector_loop() {
auto shard = this_shard_id();
if (shard != 0) {
co_return;
}
logger.info("failure_detector_loop: Started main loop");
while (is_enabled()) {
try {
if (_live_endpoints.empty()) {
logger.debug("failure_detector_loop: Wait until live_nodes={} is not empty", _live_endpoints);
co_await failure_detector_loop_sleep(std::chrono::seconds(1));
continue;
}
auto nodes = _live_endpoints | std::ranges::to<std::vector>();
auto live_endpoints_version = _live_endpoints_version;
auto generation_number = my_endpoint_state().get_heart_beat_state().get_generation();
co_await coroutine::parallel_for_each(std::views::iota(0u, nodes.size()), [this, generation_number, live_endpoints_version, &nodes] (size_t idx) {
const auto& node = nodes[idx];
auto shard = idx % smp::count;
logger.debug("failure_detector_loop: Started new round for node={} on shard={}, live_nodes={}, live_endpoints_version={}",
node, shard, nodes, live_endpoints_version);
return container().invoke_on(shard, [node, generation_number, live_endpoints_version] (gms::gossiper& g) {
return g.failure_detector_loop_for_node(node, generation_number, live_endpoints_version);
});
});
} catch (...) {
logger.warn("failure_detector_loop: Got error in the loop, live_nodes={}: {}",
_live_endpoints, std::current_exception());
}
}
logger.info("failure_detector_loop: Finished main loop");
}
// This needs to be run with a lock
future<> gossiper::replicate_live_endpoints_on_change(foreign_ptr<std::unique_ptr<live_and_unreachable_endpoints>> data0, uint64_t new_version) {
auto coordinator = this_shard_id();
SCYLLA_ASSERT(coordinator == 0);
//
// Gossiper task runs only on CPU0:
//
// - replicate _live_endpoints and _unreachable_endpoints
// across all other shards.
// - use _live_endpoints_version on each shard
// to determine if it has the latest copy, and replicate the respective
// member from shard 0, if the shard is outdated.
//
logger.debug("replicating live and unreachable endpoints to other shards");
std::vector<foreign_ptr<std::unique_ptr<live_and_unreachable_endpoints>>> per_shard_data;
per_shard_data.resize(smp::count);
per_shard_data[coordinator] = std::move(data0);
// Prepare copies on each other shard
co_await coroutine::parallel_for_each(std::views::iota(0u, smp::count), [&per_shard_data, coordinator] (auto shard) -> future<> {
if (shard != this_shard_id()) {
const auto& src = *per_shard_data[coordinator];
per_shard_data[shard] = co_await smp::submit_to(shard, [&] {
return make_foreign(std::make_unique<live_and_unreachable_endpoints>(src));
});
}
});
// Apply copies on each other shard
co_await container().invoke_on_all([&] (gossiper& local_gossiper) noexcept {
if (local_gossiper._live_endpoints_version >= new_version) {
on_fatal_internal_error(logger, fmt::format("shard already has unexpected live_endpoints_version {} > {}",
local_gossiper._live_endpoints_version, new_version));
}
auto data = per_shard_data[this_shard_id()].release();
local_gossiper._live_endpoints = std::move(data->live);
local_gossiper._unreachable_endpoints = std::move(data->unreachable);
local_gossiper._live_endpoints_version = new_version;
});
}
// Depends on:
// - failure_detector
// - on_remove callbacks, e.g, storage_service -> access token_metadata
void gossiper::run() {
// Run it in the background.
(void)seastar::with_semaphore(_callback_running, 1, [this] {
return seastar::async([this, g = shared_from_this()] {
logger.trace("=== Gossip round START");
//wait on messaging service to start listening
// MessagingService.instance().waitUntilListening();
{
auto permit = lock_endpoint(my_host_id(), null_permit_id).get();
/* Update the local heartbeat counter. */
heart_beat_state& hbs = my_endpoint_state().get_heart_beat_state();
hbs.update_heart_beat();
logger.trace("My heartbeat is now {}", hbs.get_heart_beat_version());
}
utils::chunked_vector<gossip_digest> g_digests = make_random_gossip_digest();
if (g_digests.size() > 0) {
gossip_digest_syn message(
get_cluster_name(), get_partitioner_name(), g_digests, get_group0_id(), get_recovery_leader());
if (_endpoints_to_talk_with.empty() && !_live_endpoints.empty()) {
auto live_endpoints = _live_endpoints | std::ranges::to<std::vector>();
std::shuffle(live_endpoints.begin(), live_endpoints.end(), _random_engine);
// This guarantees the local node will talk with all nodes
// in live_endpoints at least once within nr_rounds gossip rounds.
// Other gossip implementation like SWIM uses similar approach.
// https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf
constexpr size_t nr_rounds = 10;
size_t nodes_per_round = (live_endpoints.size() + nr_rounds - 1) / nr_rounds;
_endpoints_to_talk_with = live_endpoints | std::views::chunk(nodes_per_round) | std::ranges::to<std::list<std::vector<locator::host_id>>>();
logger.debug("Set live nodes to talk: endpoint_state_map={}, all_live_nodes={}, endpoints_to_talk_with={}",
_endpoint_state_map.size(), live_endpoints, _endpoints_to_talk_with);
}
if (!_endpoints_to_talk_with.empty()) {
auto live_nodes = std::move(_endpoints_to_talk_with.front());
_endpoints_to_talk_with.pop_front();
logger.debug("Talk to live nodes: {}", live_nodes);
for (auto& ep: live_nodes) {
(void)with_gate(_background_msg, [this, message, ep] () mutable {
return do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
logger.trace("Failed to send gossip to live members: {}", ep);
});
});
}
} else if (!_seeds.empty()) {
logger.debug("No live nodes yet: try initial contact point nodes={}", _seeds);
for (auto& ep: _seeds) {
(void)with_gate(_background_msg, [this, message, ep] () mutable {
return do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
logger.trace("Failed to send gossip to live members: {}", ep);
});
});
}
} else {
logger.debug("No one to talk with");
}
/* Gossip to some unreachable member with some probability to check if he is back up */
(void)with_gate(_background_msg, [this, message = std::move(message)] () mutable {
return do_gossip_to_unreachable_member(std::move(message)).handle_exception([] (auto ep) {
logger.trace("Failed to send gossip to unreachable members: {}", ep);
});
});
}
}).then_wrapped([this] (auto&& f) {
try {
f.get();
_nr_run++;
logger.trace("=== Gossip round OK");
} catch (...) {
logger.warn("=== Gossip round FAIL: {}", std::current_exception());
}
if (logger.is_enabled(logging::log_level::trace)) {
for (auto& x : _endpoint_state_map) {
logger.trace("ep={}, eps={}", x.first, *x.second);
}
}
if (is_enabled()) {
_scheduled_gossip_task.arm(INTERVAL);
} else {
logger.info("Gossip loop is not scheduled because it is disabled");
}
});
});
}
bool gossiper::is_seed(const gms::inet_address& endpoint) const {
return _seeds.contains(endpoint);
}
void gossiper::register_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
_subscribers.add(subscriber);
}
future<> gossiper::unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return _subscribers.remove(subscriber);
}
std::set<locator::host_id> gossiper::get_live_members() const {
std::set<locator::host_id> live_members(_live_endpoints.begin(), _live_endpoints.end());
auto myip = get_broadcast_address();
logger.debug("live_members before={}", live_members);
if (!is_shutdown(myip)) {
live_members.insert(my_host_id());
}
logger.debug("live_members after={}", live_members);
return live_members;
}
std::set<locator::host_id> gossiper::get_live_token_owners() const {
std::set<locator::host_id> token_owners;
auto normal_token_owners = get_token_metadata_ptr()->get_normal_token_owners();
for (auto& node: normal_token_owners) {
if (is_alive(node)) {
token_owners.insert(node);
}
}
return token_owners;
}
// Return downtime in microseconds
int64_t gossiper::get_endpoint_downtime(locator::host_id ep) const noexcept {
auto it = _unreachable_endpoints.find(ep);
if (it != _unreachable_endpoints.end()) {
auto& downtime = it->second;
return std::chrono::duration_cast<std::chrono::microseconds>(now() - downtime).count();
} else {
return 0L;
}
}
// Depends on
// - on_dead callbacks
// It is called from failure_detector
future<> gossiper::convict(locator::host_id endpoint) {
auto permit = co_await lock_endpoint(endpoint, null_permit_id);
auto state = get_endpoint_state_ptr(endpoint);
if (!state || !is_alive(state->get_host_id())) {
co_return;
}
if (is_shutdown(endpoint)) {
co_await mark_as_shutdown(endpoint, permit.id());
} else {
co_await mark_dead(endpoint, state, permit.id());
}
}
std::set<locator::host_id> gossiper::get_unreachable_members() const {
return _unreachable_endpoints | std::views::keys | std::ranges::to<std::set>();
}
version_type gossiper::get_max_endpoint_state_version(const endpoint_state& state) const noexcept {
auto max_version = state.get_heart_beat_state().get_heart_beat_version();
for (auto& entry : state.get_application_state_map()) {
auto& value = entry.second;
max_version = std::max(max_version, value.version());
}
return max_version;
}
future<> gossiper::evict_from_membership(locator::host_id hid, permit_id pid) {
verify_permit(hid, pid);
co_await mutate_live_and_unreachable_endpoints([hid] (live_and_unreachable_endpoints& data) {
data.unreachable.erase(hid);
data.live.erase(hid);
});
co_await container().invoke_on_all([hid] (auto& g) {
if (this_shard_id() == 0) {
g._address_map.set_expiring(hid);
}
g._endpoint_state_map.erase(hid);
});
_expire_time_endpoint_map.erase(hid);
logger.debug("evicting {} from gossip", hid);
}
utils::chunked_vector<gossip_digest> gossiper::make_random_gossip_digest() const {
std::unordered_map<inet_address, gossip_digest> g_digests;
generation_type generation;
version_type max_version;
// local epstate will be part of _endpoint_state_map
utils::chunked_vector<locator::host_id> endpoints;
for (auto&& x : _endpoint_state_map) {
endpoints.push_back(x.first);
}
std::shuffle(endpoints.begin(), endpoints.end(), _random_engine);
for (auto& endpoint : endpoints) {
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
auto& eps = *es;
generation = eps.get_heart_beat_state().get_generation();
max_version = get_max_endpoint_state_version(eps);
}
gossip_digest d{es->get_ip(), generation, max_version};
auto [it, inserted] = g_digests.emplace(es->get_ip(), d);
if (!inserted && it->second.get_generation() < generation) {
// If there are multiple hosts with the same IP send out the one with newest generation
it->second = d;
}
}
return g_digests | std::views::values | std::ranges::to<utils::chunked_vector<gossip_digest>>();
}
future<> gossiper::replicate(endpoint_state es, permit_id pid) {
if (!es.get_host_id()) {
// TODO (#25818): re-introduce the on_internal_error() call once all the code paths leading to this are fixed
logger.warn("attempting to add a state with empty host id for ip: {}", es.get_ip());
co_return;
}
verify_permit(es.get_host_id(), pid);
// First pass: replicate the new endpoint_state on all shards.
// Use foreign_ptr<std::unique_ptr> to ensure destroy on remote shards on exception
std::vector<foreign_ptr<endpoint_state_ptr>> ep_states;
ep_states.resize(smp::count);
auto p = make_foreign(make_endpoint_state_ptr(std::move(es)));
const auto *eps = p.get();
ep_states[this_shard_id()] = std::move(p);
co_await coroutine::parallel_for_each(std::views::iota(0u, smp::count), [&, orig = this_shard_id()] (auto shard) -> future<> {
if (shard != orig) {
ep_states[shard] = co_await smp::submit_to(shard, [eps] {
return make_foreign(make_endpoint_state_ptr(*eps));
});
}
});
// Second pass: set replicated endpoint_state on all shards
// Must not throw
try {
co_return co_await container().invoke_on_all([&] (gossiper& g) {
auto eps = ep_states[this_shard_id()].release();
auto hid = eps->get_host_id();
if (this_shard_id() == 0) {
g._address_map.add_or_update_entry(hid, eps->get_ip(), eps->get_heart_beat_state().get_generation());
g._address_map.set_nonexpiring(hid);
}
g._endpoint_state_map[hid] = std::move(eps);
});
} catch (...) {
on_fatal_internal_error(logger, fmt::format("Failed to replicate endpoint_state: {}", std::current_exception()));
}
}
future<> gossiper::advertise_token_removed(locator::host_id host_id, permit_id pid) {
auto permit = co_await lock_endpoint(host_id, pid);
pid = permit.id();
auto eps = get_endpoint_state(host_id);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", host_id);
add_expire_time_for_endpoint(host_id, expire_time);
co_await replicate(std::move(eps), pid);
// ensure at least one gossip round occurs before returning
co_await sleep_abortable(INTERVAL * 2, _abort_source);
}
future<> gossiper::assassinate_endpoint(sstring address) {
throw std::runtime_error("Assassinating endpoint is not supported in topology over raft mode");
}
future<generation_type> gossiper::get_current_generation_number(locator::host_id endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_generation();
});
}
future<version_type> gossiper::get_current_heart_beat_version(locator::host_id endpoint) const {
// FIXME: const container() has no const invoke_on variant
return const_cast<gossiper*>(this)->container().invoke_on(0, [endpoint] (const gossiper& gossiper) {
return gossiper.get_endpoint_state(endpoint).get_heart_beat_state().get_heart_beat_version();
});
}
template<typename T>
future<> gossiper::do_gossip_to_live_member(gossip_digest_syn message, T ep) {
return send_gossip<T>(message, {ep});
}
future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
double live_endpoint_count = _live_endpoints.size();
double unreachable_endpoint_count = _unreachable_endpoints.size();
if (unreachable_endpoint_count > 0) {
/* based on some probability */
double prob = unreachable_endpoint_count / (live_endpoint_count + 1);
std::uniform_real_distribution<double> dist(0, 1);
double rand_dbl = dist(_random_engine);
if (rand_dbl < prob) {
std::set<locator::host_id> addrs;
for (auto&& x : _unreachable_endpoints) {
// Ignore the node which is decommissioned
if (get_gossip_status(_address_map.get(x.first)) != sstring(versioned_value::STATUS_LEFT)) {
addrs.insert(x.first);
}
}
logger.trace("do_gossip_to_unreachable_member: live_endpoint nr={} unreachable_endpoints nr={}",
live_endpoint_count, unreachable_endpoint_count);
return send_gossip(message, addrs);
}
}
return make_ready_future<>();
}
clk::time_point gossiper::get_expire_time_for_endpoint(locator::host_id id) const noexcept {
/* default expire_time is A_VERY_LONG_TIME */
auto it = _expire_time_endpoint_map.find(id);
if (it == _expire_time_endpoint_map.end()) {
return compute_expire_time();
} else {
auto stored_time = it->second;
return stored_time;
}
}
endpoint_state_ptr gossiper::get_endpoint_state_ptr(locator::host_id ep) const noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
} else {
return it->second;
}
}
void gossiper::update_timestamp(const endpoint_state_ptr& eps) noexcept {
const_cast<endpoint_state&>(*eps).update_timestamp();
}
const endpoint_state& gossiper::get_endpoint_state(locator::host_id ep) const {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
}
return *it->second;
}
endpoint_state& gossiper::my_endpoint_state() {
auto id = my_host_id();
auto ep = get_broadcast_address();
auto it = _endpoint_state_map.find(id);
if (it == _endpoint_state_map.end()) {
it = _endpoint_state_map.emplace(id, make_endpoint_state_ptr({ep})).first;
}
return const_cast<endpoint_state&>(*it->second);
}
future<> gossiper::reset_endpoint_state_map() {
logger.debug("Resetting endpoint state map");
auto lock = co_await lock_endpoint_update_semaphore();
auto version = _live_endpoints_version + 1;
co_await container().invoke_on_all([version] (gossiper& g) {
if (this_shard_id() == 0) {
for (auto&& [_, es_ptr] : g._endpoint_state_map) {
g._address_map.set_expiring(es_ptr->get_host_id());
}
}
g._unreachable_endpoints.clear();
g._live_endpoints.clear();
g._live_endpoints_version = version;
g._endpoint_state_map.clear();
});
}
stop_iteration gossiper::for_each_endpoint_state_until(std::function<stop_iteration(const endpoint_state&)> func) const {
for (const auto& [node, eps] : _endpoint_state_map) {
if (func(*eps) == stop_iteration::yes) {
return stop_iteration::yes;
}
}
return stop_iteration::no;
}
bool gossiper::is_cql_ready(const locator::host_id& endpoint) const {
// Note:
// - New scylla node always send application_state::RPC_READY = false when
// the node boots and send application_state::RPC_READY = true when cql
// server is up
// - Old scylla node that does not support the application_state::RPC_READY
// never has application_state::RPC_READY in the endpoint_state, we can
// only think their cql server is up, so we return true here if
// application_state::RPC_READY is not present
auto eps = get_endpoint_state_ptr(endpoint);
if (!eps) {
logger.debug("Node {} does not have RPC_READY application_state, return is_cql_ready=true", endpoint);
return true;
}
auto ready = eps->is_cql_ready();
logger.debug("Node {}: is_cql_ready={}", endpoint, ready);
return ready;
}
locator::host_id gossiper::get_host_id(inet_address endpoint) const {
auto ids = _endpoint_state_map | std::views::values | std::views::filter([endpoint] (const auto& es) { return es->get_ip() == endpoint; });
if (std::ranges::distance(ids) == 0) {
throw std::runtime_error(format("Could not get host_id for endpoint {}: endpoint state not found", endpoint));
}
// Find an entry with largest generation
const auto& es = std::ranges::max(ids, [](const auto& ep1, const auto& ep2) { return ep1->get_heart_beat_state().get_generation() < ep2->get_heart_beat_state().get_generation(); });
auto host_id = es->get_host_id();
if (!host_id) {
throw std::runtime_error(format("Host {} does not have HOST_ID application_state", endpoint));
}
return host_id;
}
std::optional<locator::host_id> gossiper::try_get_host_id(inet_address endpoint) const {
std::optional<locator::host_id> host_id;
try {
host_id = get_host_id(endpoint);
} catch (std::runtime_error&) {}
return host_id;
}
std::optional<gms::inet_address> gossiper::get_node_ip(locator::host_id host_id) const {
if (auto it = _endpoint_state_map.find(host_id); it != _endpoint_state_map.end()) {
return {it->second->get_ip()};
} else {
return {};
}
}
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(locator::host_id for_endpoint, version_type version) const {
std::optional<endpoint_state> reqd_endpoint_state;
auto es = get_endpoint_state_ptr(for_endpoint);
if (es) {
auto& eps = *es;
/*
* Here we try to include the Heart Beat state only if it is
* greater than the version passed in. It might happen that
* the heart beat version maybe lesser than the version passed
* in and some application state has a version that is greater
* than the version passed in. In this case we also send the old
* heart beat and throw it away on the receiver if it is redundant.
*/
auto local_hb_version = eps.get_heart_beat_state().get_heart_beat_version();
if (local_hb_version > version) {
reqd_endpoint_state.emplace(eps.get_heart_beat_state(), eps.get_ip());
logger.trace("local heartbeat version {} greater than {} for {}", local_hb_version, version, for_endpoint);
}
/* Accumulate all application states whose versions are greater than "version" variable */
for (auto& entry : eps.get_application_state_map()) {
auto& value = entry.second;
if (value.version() > version) {
if (!reqd_endpoint_state) {
reqd_endpoint_state.emplace(eps.get_heart_beat_state(), eps.get_ip());
}
auto& key = entry.first;
logger.trace("Adding state of {}, {}: {}" , for_endpoint, key, value.value());
reqd_endpoint_state->add_application_state(key, value);
}
}
}
return reqd_endpoint_state;
}
sstring gossiper::get_rpc_address(const locator::host_id& endpoint) const {
auto* v = get_application_state_ptr(endpoint, gms::application_state::RPC_ADDRESS);
if (v) {
return v->value();
}
return fmt::to_string(endpoint);
}
void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map) {
for (const auto& x : map) {
const locator::host_id& endpoint = try_get_host_id(x.first).value_or(locator::host_id{});
const endpoint_state& remote_endpoint_state = x.second;
auto local_endpoint_state = get_endpoint_state_ptr(endpoint);
if (local_endpoint_state) {
bool update = false;
auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
auto remote_generation = remote_endpoint_state.get_heart_beat_state().get_generation();
if (remote_generation > local_generation) {
update = true;
} else if (remote_generation == local_generation) {
auto local_version = get_max_endpoint_state_version(*local_endpoint_state);
auto remote_version = remote_endpoint_state.get_heart_beat_state().get_heart_beat_version();
if (remote_version > local_version) {
update = true;
}
}
if (update) {
logger.trace("Updated timestamp for node {}", endpoint);
update_timestamp(local_endpoint_state);
}
}
}
}
future<> gossiper::notify_nodes_on_up(std::unordered_set<locator::host_id> dsts) {
co_await coroutine::parallel_for_each(dsts, [this] (locator::host_id dst) -> future<> {
if (dst != _gcfg.host_id) {
try {
auto generation = my_endpoint_state().get_heart_beat_state().get_generation();
co_await send_echo(dst, std::chrono::seconds(10), generation.value(), true);
} catch (...) {
logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception());
}
}
});
}
void gossiper::mark_alive(endpoint_state_ptr node) {
auto id = node->get_host_id();
auto addr = node->get_ip();
// Enter the _background_msg gate so stop() would wait on it
auto inserted = _pending_mark_alive_endpoints.insert(id).second;
if (inserted) {
// The node is not in the _pending_mark_alive_endpoints
logger.debug("Mark Node {}/{} alive with EchoMessage", id, addr);
} else {
// We are in the progress of marking this node alive
logger.debug("Node {}/{} is being marked as up, ignoring duplicated mark alive operation", id, addr);
return;
}
// unmark addr as pending on exception or after background continuation completes
auto unmark_pending = deferred_action([this, id, g = shared_from_this()] () noexcept {
_pending_mark_alive_endpoints.erase(id);
});
if (_address_map.find(id) != addr) {
// We are here because id has now different ip but we
// try to ping the old one
return;
}
auto generation = my_endpoint_state().get_heart_beat_state().get_generation();
// Enter the _background_msg gate so stop() would wait on it
auto gh = _background_msg.hold();
logger.debug("Sending a EchoMessage to {}/{}, with generation_number={}", id, addr, generation);
(void) send_echo(id, std::chrono::seconds(15), generation.value(), false).then([this, id] {
logger.trace("Got EchoMessage Reply");
return real_mark_alive(id);
}).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending), id] (auto ep) {
logger.warn("Fail to send EchoMessage to {}/{}: {}", id, addr, ep);
});
}
future<> gossiper::real_mark_alive(locator::host_id host_id) {
auto permit = co_await lock_endpoint(host_id, null_permit_id);
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto es = get_endpoint_state_ptr(host_id);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", host_id);
co_return;
}
// Do not mark a node with status shutdown as UP.
auto status = sstring(get_gossip_status(*es));
if (status == sstring(versioned_value::SHUTDOWN)) {
logger.warn("Skip marking node {} with status = {} as UP", host_id, status);
co_return;
}
logger.debug("Mark Node {} alive after EchoMessage", host_id);
logger.debug("removing expire time for endpoint : {}", host_id);
bool was_live = false;
co_await mutate_live_and_unreachable_endpoints([addr = host_id, &was_live] (live_and_unreachable_endpoints& data) {
data.unreachable.erase(addr);
auto [it_, inserted] = data.live.insert(addr);
was_live = !inserted;
});
_expire_time_endpoint_map.erase(host_id);
if (was_live) {
co_return;
}
if (_endpoints_to_talk_with.empty()) {
_endpoints_to_talk_with.push_back({host_id});
} else {
_endpoints_to_talk_with.front().push_back(host_id);
}
auto addr = es->get_ip();
logger.info("InetAddress {}/{} is now UP, status = {}", host_id, addr, status);
co_await _subscribers.for_each([addr, host_id, es, pid = permit.id()] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, host_id, es, pid);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
});
}
future<> gossiper::mark_dead(locator::host_id addr, endpoint_state_ptr state, permit_id pid) {
logger.trace("marking as down {}", addr);
verify_permit(addr, pid);
co_await mutate_live_and_unreachable_endpoints([addr = state->get_host_id()] (live_and_unreachable_endpoints& data) {
data.live.erase(addr);
data.unreachable[addr] = now();
});
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state));
co_await do_on_dead_notifications(state->get_ip(), std::move(state), pid);
}
future<> gossiper::handle_major_state_change(endpoint_state eps, permit_id pid, bool shadow_round) {
auto ep = eps.get_host_id();
verify_permit(ep, pid);
endpoint_state_ptr eps_old = get_endpoint_state_ptr(ep);
if (!is_dead_state(eps) && !shadow_round) {
if (_endpoint_state_map.contains(ep)) {
logger.info("Node {} has restarted, now UP, status = {}", ep, get_gossip_status(eps));
} else {
logger.debug("Node {} is now part of the cluster, status = {}", ep, get_gossip_status(eps));
}
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
co_await replicate(eps, pid);
if (shadow_round) {
co_return;
}
if (eps_old) {
// the node restarted: it is up to the subscriber to take whatever action is necessary
co_await _subscribers.for_each([ep, eps_old, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_restart(eps_old->get_ip(), ep, eps_old, pid);
});
}
auto ep_state = get_endpoint_state_ptr(ep);
if (!ep_state) {
throw std::out_of_range(format("ep={}", ep));
}
if (!is_dead_state(*ep_state)) {
mark_alive(ep_state);
} else {
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
co_await mark_dead(ep, ep_state, pid);
}
co_await _subscribers.for_each([ep, ep_state, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_join(ep_state->get_ip(), ep, ep_state, pid);
});
// check this at the end so nodes will learn about the endpoint
if (is_shutdown(*ep_state)) {
co_await mark_as_shutdown(ep, pid);
}
}
bool gossiper::is_dead_state(const endpoint_state& eps) const {
return std::ranges::any_of(DEAD_STATES, [state = get_gossip_status(eps)](const auto& deadstate) { return state == deadstate; });
}
bool gossiper::is_shutdown(const locator::host_id& endpoint) const {
return get_gossip_status(endpoint) == versioned_value::SHUTDOWN;
}
bool gossiper::is_shutdown(const endpoint_state& eps) const {
return get_gossip_status(eps) == versioned_value::SHUTDOWN;
}
bool gossiper::is_normal(const locator::host_id& endpoint) const {
return get_gossip_status(endpoint) == versioned_value::STATUS_NORMAL;
}
bool gossiper::is_silent_shutdown_state(const endpoint_state& ep_state) const{
return std::ranges::any_of(SILENT_SHUTDOWN_STATES, [state = get_gossip_status(ep_state)](const auto& deadstate) { return state == deadstate; });
}
future<> gossiper::apply_new_states(endpoint_state local_state, const endpoint_state& remote_state, permit_id pid, bool shadow_round) {
// don't SCYLLA_ASSERT here, since if the node restarts the version will go back to zero
//int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version();
auto host_id = local_state.get_host_id();
verify_permit(host_id, pid);
if (!shadow_round) {
local_state.set_heart_beat_state_and_update_timestamp(remote_state.get_heart_beat_state());
}
// if (logger.isTraceEnabled()) {
// logger.trace("Updating heartbeat state version to {} from {} for {} ...",
// local_state.get_heart_beat_state().get_heart_beat_version(), oldVersion, addr);
// }
application_state_map changed;
auto&& remote_map = remote_state.get_application_state_map();
std::exception_ptr ep;
try {
// we need to make two loops here, one to apply, then another to notify,
// this way all states in an update are present and current when the notifications are received
for (const auto& remote_entry : remote_map) {
const auto& remote_key = remote_entry.first;
const auto& remote_value = remote_entry.second;
auto remote_gen = remote_state.get_heart_beat_state().get_generation();
auto local_gen = local_state.get_heart_beat_state().get_generation();
if(remote_gen != local_gen) {
auto err = format("Remote generation {} != local generation {}", remote_gen, local_gen);
logger.warn("{}", err);
throw std::runtime_error(err);
}
const versioned_value* local_val = local_state.get_application_state_ptr(remote_key);
if (!local_val || remote_value.version() > local_val->version()) {
changed.emplace(remote_key, remote_value);
local_state.add_application_state(remote_key, remote_value);
}
}
} catch (...) {
ep = std::current_exception();
}
auto addr = local_state.get_ip();
// We must replicate endpoint states before listeners run.
// Exceptions during replication will cause abort because node's state
// would be inconsistent across shards. Changes listeners depend on state
// being replicated to all shards.
co_await replicate(std::move(local_state), pid);
if (shadow_round) {
co_return;
}
// Exceptions thrown from listeners will result in abort because that could leave the node in a bad
// state indefinitely. Unless the value changes again, we wouldn't retry notifications.
// Some values are set only once, so listeners would never be re-run.
// Listeners should decide which failures are non-fatal and swallow them.
try {
co_await do_on_change_notifications(addr, host_id, changed, pid);
} catch (...) {
auto msg = format("Gossip change listener failed: {}", std::current_exception());
if (_abort_source.abort_requested()) {
logger.warn("{}. Ignored", msg);
} else {
on_fatal_internal_error(logger, msg);
}
}
maybe_rethrow_exception(std::move(ep));
}
future<> gossiper::do_on_change_notifications(inet_address addr, locator::host_id id, const gms::application_state_map& states, permit_id pid) const {
co_await _subscribers.for_each([&] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
// Once _abort_source is aborted, don't attempt to process any further notifications
// because that would violate monotonicity due to partially failed notification.
_abort_source.check();
return subscriber->on_change(addr, id, states, pid);
});
}
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) const {
co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_dead(addr, state->get_host_id(), state, pid);
});
}
void gossiper::request_all(gossip_digest& g_digest,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) const {
/* We are here since we have no data for this endpoint locally so request everything. */
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation);
logger.trace("request_all for {}", g_digest.get_endpoint());
}
void gossiper::send_all(gossip_digest& g_digest,
std::map<inet_address, endpoint_state>& delta_ep_state_map,
version_type max_remote_version) const {
auto ep = g_digest.get_endpoint();
auto id = try_get_host_id(ep);
logger.trace("send_all(): ep={}/{}, version > {}", id, ep, max_remote_version);
if (id) {
auto local_ep_state_ptr = get_state_for_version_bigger_than(*id, max_remote_version);
if (local_ep_state_ptr) {
delta_ep_state_map.emplace(ep, *local_ep_state_ptr);
}
}
}
void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_list,
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list,
std::map<inet_address, endpoint_state>& delta_ep_state_map) const {
for (gossip_digest& g_digest : g_digest_list) {
auto remote_generation = g_digest.get_generation();
auto max_remote_version = g_digest.get_max_version();
/* Get state associated with the end point in digest */
auto&& ep = g_digest.get_endpoint();
auto id = try_get_host_id(ep);
auto es = get_endpoint_state_ptr(id.value_or(locator::host_id{}));
/* Here we need to fire a GossipDigestAckMessage. If we have some
* data associated with this endpoint locally then we follow the
* "if" path of the logic. If we have absolutely nothing for this
* endpoint we need to request all the data for this endpoint.
*/
if (es) {
const endpoint_state& ep_state_ptr = *es;
auto local_generation = ep_state_ptr.get_heart_beat_state().get_generation();
/* get the max version of all keys in the state associated with this endpoint */
auto max_local_version = get_max_endpoint_state_version(ep_state_ptr);
logger.trace("examine_gossiper(): ep={}, remote={}.{}, local={}.{}", ep,
remote_generation, max_remote_version, local_generation, max_local_version);
if (remote_generation == local_generation && max_remote_version == max_local_version) {
continue;
}
if (remote_generation > local_generation) {
/* we request everything from the gossiper */
request_all(g_digest, delta_gossip_digest_list, remote_generation);
} else if (remote_generation < local_generation) {
/* send all data with generation = localgeneration and version > 0 */
send_all(g_digest, delta_ep_state_map, version_type());
} else if (remote_generation == local_generation) {
/*
* If the max remote version is greater then we request the
* remote endpoint send us all the data for this endpoint
* with version greater than the max version number we have
* locally for this endpoint.
*
* If the max remote version is lesser, then we send all
* the data we have locally for this endpoint with version
* greater than the max remote version.
*/
if (max_remote_version > max_local_version) {
logger.trace("examine_gossiper(): requesting version > {} from {}", max_local_version, g_digest.get_endpoint());
delta_gossip_digest_list.emplace_back(g_digest.get_endpoint(), remote_generation, max_local_version);
} else if (max_remote_version < max_local_version) {
/* send all data with generation = localgeneration and version > max_remote_version */
send_all(g_digest, delta_ep_state_map, max_remote_version);
}
}
} else {
/* We are here since we have no data for this endpoint locally so request everything. */
request_all(g_digest, delta_gossip_digest_list, remote_generation);
}
}
}
future<> gossiper::start_gossiping(gms::generation_type generation_nbr, application_state_map preload_local_states) {
co_await coroutine::switch_to(_gcfg.gossip_scheduling_group);
auto permit = co_await lock_endpoint(my_host_id(), null_permit_id);
build_seeds_list();
if (_gcfg.force_gossip_generation() > 0) {
generation_nbr = gms::generation_type(_gcfg.force_gossip_generation());
logger.warn("Use the generation number provided by user: generation = {}", generation_nbr);
}
// Create a new local state.
endpoint_state local_state{get_broadcast_address()};
local_state.set_heart_beat_state_and_update_timestamp(heart_beat_state(generation_nbr));
for (auto& entry : preload_local_states) {
local_state.add_application_state(entry.first, entry.second);
}
co_await utils::get_local_injector().inject("gossiper_publish_local_state_pause", utils::wait_for_message(5min));
co_await replicate(local_state, permit.id());
logger.info("Gossip started with local state: {}", my_endpoint_state());
_enabled = true;
_nr_run = 0;
_scheduled_gossip_task.arm(INTERVAL);
if (!_background_msg.is_closed()) {
co_await _background_msg.close();
}
_background_msg = seastar::named_gate("gossiper");
/* Ensure all shards have enabled gossip before starting the failure detector loop */
co_await container().invoke_on_all([] (gms::gossiper& g) {
g._enabled = true;
});
co_await container().invoke_on(0, [] (gms::gossiper& g) {
g._failure_detector_loop_done = g.failure_detector_loop();
});
}
future<gossiper::generation_for_nodes>
gossiper::get_generation_for_nodes(std::unordered_set<locator::host_id> nodes) const {
generation_for_nodes ret;
for (const auto& node : nodes) {
auto es = get_endpoint_state_ptr(node);
if (es) {
auto current_generation_number = es->get_heart_beat_state().get_generation();
ret.emplace(node, current_generation_number);
} else {
return make_exception_future<generation_for_nodes>(
std::runtime_error(format("Can not find generation number for node={}", node)));
}
}
return make_ready_future<generation_for_nodes>(std::move(ret));
}
future<> gossiper::advertise_to_nodes(generation_for_nodes advertise_to_nodes) {
return container().invoke_on_all([advertise_to_nodes = std::move(advertise_to_nodes)] (auto& g) {
g._advertise_to_nodes = advertise_to_nodes;
});
}
future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes, mandatory is_mandatory) {
co_await coroutine::switch_to(_gcfg.gossip_scheduling_group);
nodes.erase(get_broadcast_address());
gossip_get_endpoint_states_request request{{
gms::application_state::STATUS,
gms::application_state::HOST_ID,
gms::application_state::DC,
gms::application_state::RACK,
gms::application_state::SUPPORTED_FEATURES,
gms::application_state::SNITCH_NAME}};
logger.info("Gossip shadow round started with nodes={}", nodes);
std::unordered_set<gms::inet_address> nodes_talked;
auto start_time = clk::now();
std::list<gms::gossip_get_endpoint_states_response> responses;
for (;;) {
size_t nodes_down = 0;
co_await coroutine::parallel_for_each(nodes, [this, &request, &responses, &nodes_talked, &nodes_down] (gms::inet_address node) -> future<> {
logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states);
try {
auto response = co_await ser::gossip_rpc_verbs::send_gossip_get_endpoint_states(&_messaging, msg_addr(node), netw::messaging_service::clock_type::now() + std::chrono::seconds(5), request);
logger.debug("Got get_endpoint_states response from {}, response={}", node, response.endpoint_state_map);
responses.push_back(std::move(response));
nodes_talked.insert(node);
utils::get_local_injector().inject("stop_during_gossip_shadow_round", [] { std::raise(SIGSTOP); });
} catch (seastar::rpc::unknown_verb_error&) {
auto err = format("Node {} does not support get_endpoint_states verb", node);
logger.error("{}", err);
throw std::runtime_error{err};
} catch (seastar::rpc::timeout_error&) {
nodes_down++;
logger.warn("The get_endpoint_states verb to node {} timed out", node);
} catch (seastar::rpc::closed_error&) {
nodes_down++;
logger.warn("Node {} is down for get_endpoint_states verb", node);
}
});
for (auto& response : responses) {
try {
co_await apply_state_locally_in_shadow_round(std::move(response.endpoint_state_map));
} catch (const std::exception& exception) {
logger.warn("Error while applying node state {}", exception.what());
}
}
if (!nodes_talked.empty()) {
break;
}
if (nodes_down == nodes.size() && !is_mandatory) {
logger.warn("All nodes={} are down for get_endpoint_states verb. Skip ShadowRound.", nodes);
break;
}
if (clk::now() > start_time + std::chrono::milliseconds(_gcfg.shadow_round_ms)) {
throw std::runtime_error(fmt::format("Unable to gossip with any nodes={} (ShadowRound).", nodes));
}
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
logger.info("Connect nodes={} again ... ({} seconds passed)",
nodes, std::chrono::duration_cast<std::chrono::seconds>(clk::now() - start_time).count());
}
logger.info("Gossip shadow round finished with nodes_talked={}", nodes_talked);
}
void gossiper::build_seeds_list() {
for (inet_address seed : get_seeds() ) {
if (seed == get_broadcast_address()) {
continue;
}
_seeds.emplace(seed);
}
}
future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) {
co_await coroutine::switch_to(_gcfg.gossip_scheduling_group);
if (host_id == my_host_id()) {
logger.debug("Attempt to add self as saved endpoint");
co_return;
}
const auto& ep = st.endpoint;
if (!host_id) {
on_internal_error(logger, format("Attempt to add {} with null host_id as saved endpoint", ep));
}
if (ep == inet_address{}) {
on_internal_error(logger, format("Attempt to add {} with null inet_address as saved endpoint", host_id));
}
if (ep == get_broadcast_address()) {
on_internal_error(logger, format("Attempt to add {} with broadcast_address {} as saved endpoint", host_id, ep));
}
auto permit = co_await lock_endpoint(host_id, pid);
const auto tmptr = get_token_metadata_ptr();
// Make the generation of an excluded (and banned) node negative. This is needed on restart during an ongoing
// replace with the same IP in the Raft-based topology. We ensure that gossiper::get_host_id(inet_address endpoint)
// always returns the host ID of the replacing node, just like before restarting.
const auto* node = tmptr->get_topology().find_node(host_id);
generation_type generation(node && node->is_excluded() ? -1 : 0);
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
auto ep_state = endpoint_state(heart_beat_state(generation), ep);
auto es = get_endpoint_state_ptr(host_id);
if (es) {
if (es->get_heart_beat_state().get_generation()) {
auto msg = fmt::format("Attempted to add saved endpoint {} after endpoint_state was already established with gossip: {}, at {}", ep, es->get_heart_beat_state(), current_backtrace());
on_internal_error(logger, msg);
}
ep_state = *es;
logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state);
ep_state.update_timestamp();
}
// It's okay to use the local version generator for the loaded application state values
// As long as the endpoint_state has zero generation.
// It will get updated as a whole by handle_major_state_change
// via do_apply_state_locally when (remote_generation > local_generation)
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id));
if (st.opt_dc_rack) {
ep_state.add_application_state(gms::application_state::DC, gms::versioned_value::datacenter(st.opt_dc_rack->dc));
ep_state.add_application_state(gms::application_state::RACK, gms::versioned_value::datacenter(st.opt_dc_rack->rack));
}
co_await replicate(std::move(ep_state), permit.id());
_unreachable_endpoints[host_id] = now();
logger.trace("Adding saved endpoint {} {}", ep, generation);
}
future<> gossiper::add_local_application_state(application_state state, versioned_value value) {
application_state_map tmp;
tmp.emplace(std::pair(std::move(state), std::move(value)));
return add_local_application_state(std::move(tmp));
}
// Depends on:
// - on_change callbacks
// #2894. Similar to origin fix, but relies on non-interruptability to ensure we apply
// values "in order".
//
// NOTE: having the values being actual versioned values here is sort of pointless, because
// we overwrite the version to ensure the set is monotonic. However, it does not break anything,
// and changing this tends to spread widely (see versioned_value::factory), so that can be its own
// change later, if needed.
// Retaining the slightly broken signature is also consistent with origin. Hooray.
//
future<> gossiper::add_local_application_state(application_state_map states) {
if (states.empty()) {
co_return;
}
try {
co_await coroutine::switch_to(_gcfg.gossip_scheduling_group);
co_await container().invoke_on(0, [&](gossiper& gossiper) mutable -> future<> {
inet_address ep_addr = gossiper.get_broadcast_address();
auto ep_id = gossiper.my_host_id();
// for symmetry with other apply, use endpoint lock for our own address.
auto permit = co_await gossiper.lock_endpoint(ep_id, null_permit_id);
auto ep_state_before = gossiper.get_endpoint_state_ptr(ep_id);
if (!ep_state_before) {
auto err = fmt::format("endpoint_state_map does not contain endpoint = {}, application_states = {}",
ep_addr, states);
co_await coroutine::return_exception(std::runtime_error(err));
}
auto local_state = *ep_state_before;
for (auto& p : states) {
auto& state = p.first;
auto& value = p.second;
// Notifications may have taken some time, so preventively raise the version
// of the new value, otherwise it could be ignored by the remote node
// if another value with a newer version was received in the meantime:
value = versioned_value::clone_with_higher_version(value);
// Add to local application state
local_state.add_application_state(state, value);
}
// It is OK to replicate the new endpoint_state
// after all application states were modified as a batch.
// We guarantee that the on_change notifications
// will be called in the order given by `states` anyhow.
co_await gossiper.replicate(std::move(local_state), permit.id());
// fire "on change" notifications:
// now we might defer again, so this could be reordered. But we've
// ensured the whole set of values are monotonically versioned and
// applied to endpoint state.
co_await gossiper.do_on_change_notifications(ep_addr, gossiper.my_host_id(), states, permit.id());
});
} catch (...) {
logger.warn("Fail to apply application_state: {}", std::current_exception());
}
}
future<> gossiper::do_stop_gossiping() {
// Don't rely on is_enabled() since it
// also considers _abort_source and return false
// before _enabled is set to false down below.
if (!_enabled) {
logger.info("gossip is already stopped");
co_return;
}
auto my_ep_state = get_this_endpoint_state_ptr();
if (my_ep_state) {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
auto local_generation = my_ep_state->get_heart_beat_state().get_generation();
logger.info("Announcing shutdown");
co_await add_local_application_state(application_state::STATUS, versioned_value::shutdown(true));
auto live_endpoints = _live_endpoints;
for (locator::host_id id : live_endpoints) {
logger.info("Sending a GossipShutdown to {} with generation {}", id, local_generation);
try {
co_await ser::gossip_rpc_verbs::send_gossip_shutdown(&_messaging, id, get_broadcast_address(), local_generation.value());
logger.trace("Got GossipShutdown Reply");
} catch (...) {
logger.warn("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
}
}
co_await sleep(std::chrono::milliseconds(_gcfg.shutdown_announce_ms));
} else {
logger.warn("No local state or state is in silent shutdown, not announcing shutdown");
}
logger.info("Disable and wait for gossip loop started");
// Set disable flag and cancel the timer makes sure gossip loop will not be scheduled
co_await container().invoke_on_all([] (gms::gossiper& g) {
g._enabled = false;
g._failure_detector_loop_cv.broadcast();
});
_scheduled_gossip_task.cancel();
// Take the semaphore makes sure existing gossip loop is finished
auto units = co_await get_units(_callback_running, 1);
co_await container().invoke_on(0, [] (auto& g) {
// #21159
// gossiper::shutdown can be called from more than once place - both
// storage_service::isolate and normal gossip service stop. The former is
// waited for in storage_service::stop, but if we, as was done in cql_test_env,
// call shutdown independently, we could still end up here twite, and not hit
// the _enabled guard (because we do waiting things before setting it, and setting it
// is also waiting). However, making sure we don't leave an invalid future
// here should ensure even if we reenter this method in such as way, we don't crash.
return std::exchange(g._failure_detector_loop_done, make_ready_future<>());
});
logger.info("Gossip is now stopped");
}
future<> gossiper::start() {
init_messaging_service_handler();
return make_ready_future();
}
future<> gossiper::shutdown() {
if (!_background_msg.is_closed()) {
co_await _background_msg.close();
}
if (this_shard_id() == 0) {
co_await do_stop_gossiping();
}
}
future<> gossiper::stop() {
co_await shutdown();
co_await uninit_messaging_service_handler();
}
bool gossiper::is_enabled() const {
return _enabled && !_abort_source.abort_requested();
}
void gossiper::add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time) {
auto now_ = now();
auto diff = std::chrono::duration_cast<std::chrono::seconds>(expire_time - now_).count();
logger.info("Node {} will be removed from gossip at [{:%Y-%m-%d %T %z}]: (expire = {}, now = {}, diff = {} seconds)",
endpoint, fmt::gmtime(clk::to_time_t(expire_time)), expire_time.time_since_epoch().count(),
now_.time_since_epoch().count(), diff);
_expire_time_endpoint_map[endpoint] = expire_time;
}
clk::time_point gossiper::compute_expire_time() {
return now() + A_VERY_LONG_TIME;
}
bool gossiper::is_alive(locator::host_id id) const {
if (id == my_host_id()) {
return true;
}
bool is_alive = _live_endpoints.contains(id);
#ifndef SCYLLA_BUILD_MODE_RELEASE
// Live endpoints must always have a valid endpoint_state.
// Verify that in testing mode to reduce the overhead in production.
if (is_alive && !get_endpoint_state_ptr(id)) {
on_internal_error(logger, fmt::format("Node {} is alive but has no endpoint state", id));
}
#endif
return is_alive;
}
future<> gossiper::wait_alive_helper(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout) {
auto start_time = std::chrono::steady_clock::now();
for (;;) {
auto nodes = get_nodes();
std::vector<locator::host_id> live_nodes;
for (const auto& node: nodes) {
auto es = get_endpoint_state_ptr(node);
if (es) {
size_t nr_alive = co_await container().map_reduce0([node = es->get_host_id()] (gossiper& g) -> size_t {
return g.is_alive(node) ? 1 : 0;
}, 0, std::plus<size_t>());
logger.debug("Marked node={} as alive on {} out of {} shards", node, nr_alive, smp::count);
if (nr_alive == smp::count) {
live_nodes.push_back(node);
}
}
}
logger.debug("Waited for marking node as up, replace_nodes={}, live_nodes={}", nodes, live_nodes);
if (live_nodes.size() == nodes.size()) {
break;
}
if (std::chrono::steady_clock::now() > timeout + start_time) {
throw std::runtime_error(fmt::format("Failed to mark node as alive in {} ms, nodes={}, live_nodes={}",
timeout.count(), nodes, live_nodes));
}
co_await sleep_abortable(std::chrono::milliseconds(100), _abort_source);
}
}
// Needed for legacy (node_ops) mode only)
future<> gossiper::wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout) {
auto ids = nodes | std::views::transform([this] (auto ip) { return get_host_id(ip); }) | std::ranges::to<std::vector>();
return wait_alive(std::move(ids), timeout);
}
future<> gossiper::wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout) {
return wait_alive_helper([nodes = std::move(nodes)] { return nodes; }, timeout);
}
future<> gossiper::wait_alive(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout) {
return wait_alive_helper(std::move(get_nodes), timeout);
}
const versioned_value* gossiper::get_application_state_ptr(locator::host_id endpoint, application_state appstate) const noexcept {
auto eps = get_endpoint_state_ptr(std::move(endpoint));
if (!eps) {
return nullptr;
}
return eps->get_application_state_ptr(appstate);
}
sstring gossiper::get_application_state_value(locator::host_id endpoint, application_state appstate) const {
auto v = get_application_state_ptr(endpoint, appstate);
if (!v) {
return {};
}
return v->value();
}
/**
* This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
* @param endpoint endpoint that has shut itself down
*/
future<> gossiper::mark_as_shutdown(const locator::host_id& endpoint, permit_id pid) {
verify_permit(endpoint, pid);
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
auto ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
co_await replicate(std::move(ep_state), pid);
co_await mark_dead(endpoint, get_endpoint_state_ptr(endpoint), pid);
}
}
void gossiper::force_newer_generation() {
auto& eps = my_endpoint_state();
eps.get_heart_beat_state().force_newer_generation_unsafe();
}
static std::string_view do_get_gossip_status(const gms::versioned_value* app_state) noexcept {
if (!app_state) {
return gms::versioned_value::STATUS_UNKNOWN;
}
const std::string_view value = app_state->value();
const auto pos = value.find(',');
if (value.empty() || !pos) {
return gms::versioned_value::STATUS_UNKNOWN;
}
// npos allowed (full value)
return value.substr(0, pos);
}
std::string_view gossiper::get_gossip_status(const endpoint_state& ep_state) const noexcept {
return do_get_gossip_status(ep_state.get_application_state_ptr(application_state::STATUS));
}
std::string_view gossiper::get_gossip_status(const locator::host_id& endpoint) const noexcept {
return do_get_gossip_status(get_application_state_ptr(endpoint, application_state::STATUS));
}
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) const {
// We allow to bootstrap a new node in only two cases:
// 1) The node is a completely new node and no state in gossip at all
// 2) The node has state in gossip and it is already removed from the
// cluster either by nodetool decommission or nodetool removenode
bool allowed = true;
auto host_id = try_get_host_id(endpoint);
if (!host_id) {
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
return allowed;
}
auto eps = get_endpoint_state_ptr(*host_id);
if (!eps) {
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
return allowed;
}
auto status = get_gossip_status(*eps);
std::unordered_set<std::string_view> allowed_statuses{
versioned_value::STATUS_LEFT,
versioned_value::REMOVED_TOKEN,
};
allowed = allowed_statuses.contains(status);
logger.debug("is_safe_for_bootstrap: node={}, status={}, allowed_to_bootstrap={}", endpoint, status, allowed);
return allowed;
}
std::set<sstring> gossiper::get_supported_features(locator::host_id endpoint) const {
auto app_state = get_application_state_ptr(endpoint, application_state::SUPPORTED_FEATURES);
if (!app_state) {
return {};
}
return feature_service::to_feature_set(app_state->value());
}
std::set<sstring> gossiper::get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const {
std::unordered_map<locator::host_id, std::set<sstring>> features_map;
std::set<sstring> common_features;
for (auto& x : loaded_peer_features) {
auto features = feature_service::to_feature_set(x.second);
if (features.empty()) {
logger.warn("Loaded empty features for peer node {}", x.first);
} else {
features_map.emplace(x.first, std::move(features));
}
}
for (auto& x : _endpoint_state_map) {
auto host_id = x.second->get_host_id();
auto features = get_supported_features(host_id);
if (ignore_local_node && host_id == my_host_id()) {
logger.debug("Ignore SUPPORTED_FEATURES of local node: features={}", features);
continue;
}
if (features.empty()) {
auto it = loaded_peer_features.find(host_id);
if (it != loaded_peer_features.end()) {
logger.info("Node {} does not contain SUPPORTED_FEATURES in gossip, using features saved in system table, features={}", host_id, feature_service::to_feature_set(it->second));
} else {
logger.warn("Node {} does not contain SUPPORTED_FEATURES in gossip or system table", host_id);
}
} else {
// Replace the features with live info
features_map[host_id] = std::move(features);
}
}
if (ignore_local_node) {
features_map.erase(my_host_id());
}
if (!features_map.empty()) {
common_features = features_map.begin()->second;
}
for (auto& x : features_map) {
auto& features = x.second;
std::set<sstring> result;
std::set_intersection(features.begin(), features.end(),
common_features.begin(), common_features.end(),
std::inserter(result, result.end()));
common_features = std::move(result);
}
common_features.erase("");
return common_features;
}
void gossiper::check_snitch_name_matches(sstring local_snitch_name) const {
for (const auto& [address, state] : _endpoint_state_map) {
const auto remote_snitch_name = state->get_application_state_ptr(application_state::SNITCH_NAME);
if (!remote_snitch_name) {
continue;
}
if (remote_snitch_name->value() != local_snitch_name) {
throw std::runtime_error(format("Snitch check failed. This node cannot join the cluster because it uses {} and not {}", local_snitch_name, remote_snitch_name->value()));
}
}
}
int gossiper::get_down_endpoint_count() const noexcept {
return _endpoint_state_map.size() - get_up_endpoint_count();
}
int gossiper::get_up_endpoint_count() const noexcept {
return std::ranges::count_if(_endpoint_state_map | std::views::values, [this] (const endpoint_state_ptr& es) {
return is_alive(es->get_host_id());
});
}
void gossiper::append_endpoint_state(std::stringstream& ss, const endpoint_state& state) {
ss << " generation:" << state.get_heart_beat_state().get_generation() << "\n";
ss << " heartbeat:" << state.get_heart_beat_state().get_heart_beat_version() << "\n";
for (const auto& entry : state.get_application_state_map()) {
auto& app_state = entry.first;
auto& versioned_val = entry.second;
if (app_state == application_state::TOKENS) {
continue;
}
fmt::print(ss, " {}:{}:{}\n", app_state, versioned_val.version(), versioned_val.value());
}
const auto& app_state_map = state.get_application_state_map();
if (app_state_map.contains(application_state::TOKENS)) {
ss << " TOKENS:" << app_state_map.at(application_state::TOKENS).version() << ":<hidden>\n";
} else {
ss << " TOKENS: not present" << "\n";
}
}
locator::token_metadata_ptr gossiper::get_token_metadata_ptr() const noexcept {
return _shared_token_metadata.get();
}
} // namespace gms
auto fmt::formatter<gms::loaded_endpoint_state>::format(const gms::loaded_endpoint_state& st, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{{ endpoint={} dc={} rack={} }}", st.endpoint,
st.opt_dc_rack ? st.opt_dc_rack->dc : "",
st.opt_dc_rack ? st.opt_dc_rack->rack : "");
}