And switch to std::source_location.
Upcoming seastar update will deprecate its compatibility layer.
The patch is
for f in $(git grep -l 'seastar::compat::source_location'); do
sed -e 's/seastar::compat::source_location/std::source_location/g' -i $f;
done
and removal of few header includes.
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Closes scylladb/scylladb#27309
730 lines
30 KiB
C++
730 lines
30 KiB
C++
/*
|
|
*
|
|
* Modified by ScyllaDB
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/gate.hh>
|
|
#include <seastar/rpc/rpc_types.hh>
|
|
#include "locator/host_id.hh"
|
|
#include "utils/atomic_vector.hh"
|
|
#include "utils/UUID.hh"
|
|
#include "gms/generation-number.hh"
|
|
#include "gms/versioned_value.hh"
|
|
#include "gms/application_state.hh"
|
|
#include "gms/endpoint_state.hh"
|
|
#include "gms/gossip_digest_syn.hh"
|
|
#include "gms/gossip_digest.hh"
|
|
#include "utils/loading_shared_values.hh"
|
|
#include "utils/updateable_value.hh"
|
|
#include "message/messaging_service_fwd.hh"
|
|
#include <optional>
|
|
#include <chrono>
|
|
#include <set>
|
|
#include <seastar/core/metrics_registration.hh>
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/scheduling.hh>
|
|
#include "locator/token_metadata.hh"
|
|
#include "locator/types.hh"
|
|
#include "gms/gossip_address_map.hh"
|
|
|
|
namespace gms {
|
|
|
|
class gossip_digest_syn;
|
|
class gossip_digest_ack;
|
|
class gossip_digest_ack2;
|
|
class gossip_digest;
|
|
class inet_address;
|
|
class i_endpoint_state_change_subscriber;
|
|
class gossip_get_endpoint_states_request;
|
|
class gossip_get_endpoint_states_response;
|
|
|
|
struct syn_msg_pending {
|
|
bool pending = false;
|
|
std::optional<gossip_digest_syn> syn_msg;
|
|
};
|
|
|
|
struct ack_msg_pending {
|
|
bool pending = false;
|
|
std::optional<utils::chunked_vector<gossip_digest>> ack_msg_digest;
|
|
};
|
|
|
|
struct gossip_config {
|
|
seastar::scheduling_group gossip_scheduling_group = seastar::scheduling_group();
|
|
sstring cluster_name;
|
|
locator::host_id host_id;
|
|
utils::UUID group0_id;
|
|
std::set<inet_address> seeds;
|
|
sstring partitioner;
|
|
uint32_t ring_delay_ms = 30 * 1000;
|
|
uint32_t shadow_round_ms = 300 * 1000;
|
|
uint32_t shutdown_announce_ms = 2 * 1000;
|
|
uint32_t skip_wait_for_gossip_to_settle = -1;
|
|
utils::updateable_value<uint32_t> failure_detector_timeout_ms;
|
|
utils::updateable_value<int32_t> force_gossip_generation;
|
|
utils::updateable_value<utils::UUID> recovery_leader;
|
|
};
|
|
|
|
struct loaded_endpoint_state {
|
|
gms::inet_address endpoint;
|
|
std::unordered_set<dht::token> tokens;
|
|
std::optional<locator::endpoint_dc_rack> opt_dc_rack;
|
|
};
|
|
|
|
/**
|
|
* This module is responsible for Gossiping information for the local endpoint. This abstraction
|
|
* maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
|
|
* chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
|
|
* rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
|
|
* it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
|
|
* sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
|
|
* GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
|
|
* of the three above mentioned messages updates the Failure Detector with the liveness information.
|
|
* Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in
|
|
* the Failure Detector.
|
|
*/
|
|
class gossiper : public seastar::async_sharded_service<gossiper>, public seastar::peering_sharded_service<gossiper> {
|
|
public:
|
|
using clk = seastar::lowres_system_clock;
|
|
using ignore_features_of_local_node = bool_class<class ignore_features_of_local_node_tag>;
|
|
using generation_for_nodes = std::unordered_map<locator::host_id, generation_type>;
|
|
private:
|
|
using messaging_verb = netw::messaging_verb;
|
|
using messaging_service = netw::messaging_service;
|
|
using msg_addr = netw::msg_addr;
|
|
|
|
void init_messaging_service_handler();
|
|
future<> uninit_messaging_service_handler();
|
|
future<> handle_syn_msg(locator::host_id from, gossip_digest_syn syn_msg);
|
|
future<> handle_ack_msg(locator::host_id from, gossip_digest_ack ack_msg);
|
|
future<> handle_ack2_msg(locator::host_id from, gossip_digest_ack2 msg);
|
|
future<> handle_echo_msg(locator::host_id id, seastar::rpc::opt_time_point, std::optional<int64_t> generation_number_opt, bool notify_up);
|
|
future<> handle_shutdown_msg(locator::host_id from, std::optional<int64_t> generation_number_opt);
|
|
future<> do_send_ack_msg(locator::host_id from, gossip_digest_syn syn_msg);
|
|
future<> do_send_ack2_msg(locator::host_id from, utils::chunked_vector<gossip_digest> ack_msg_digest);
|
|
future<> send_echo(locator::host_id host_id, std::chrono::milliseconds timeout_ms, int64_t generation_number, bool notify_up);
|
|
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
|
|
static constexpr uint32_t _default_cpuid = 0;
|
|
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) const;
|
|
timer<lowres_clock> _scheduled_gossip_task;
|
|
bool _enabled = false;
|
|
semaphore _callback_running{1};
|
|
semaphore _apply_state_locally_semaphore{100};
|
|
seastar::named_gate _background_msg;
|
|
std::unordered_map<locator::host_id, syn_msg_pending> _syn_handlers;
|
|
std::unordered_map<locator::host_id, ack_msg_pending> _ack_handlers;
|
|
// Map ip address and generation number
|
|
generation_for_nodes _advertise_to_nodes;
|
|
future<> _failure_detector_loop_done{make_ready_future<>()} ;
|
|
|
|
future<rpc::no_wait_type> background_msg(sstring type, noncopyable_function<future<>(gossiper&)> fn);
|
|
|
|
public:
|
|
// Get current generation number for the given nodes
|
|
future<generation_for_nodes>
|
|
get_generation_for_nodes(std::unordered_set<locator::host_id> nodes) const;
|
|
// Only respond echo message listed in nodes with the generation number
|
|
future<> advertise_to_nodes(generation_for_nodes advertise_to_nodes = {});
|
|
const sstring& get_cluster_name() const noexcept;
|
|
void set_group0_id(utils::UUID group0_id);
|
|
const utils::UUID& get_group0_id() const noexcept;
|
|
|
|
const utils::UUID& get_recovery_leader() const noexcept;
|
|
|
|
const sstring& get_partitioner_name() const noexcept {
|
|
return _gcfg.partitioner;
|
|
}
|
|
|
|
locator::host_id my_host_id() const noexcept {
|
|
return get_token_metadata_ptr()->get_topology().my_host_id();
|
|
}
|
|
inet_address get_broadcast_address() const noexcept {
|
|
return get_token_metadata_ptr()->get_topology().my_address();
|
|
}
|
|
const std::set<inet_address>& get_seeds() const noexcept;
|
|
|
|
public:
|
|
static clk::time_point inline now() noexcept { return clk::now(); }
|
|
public:
|
|
struct endpoint_lock_entry {
|
|
semaphore sem;
|
|
permit_id pid;
|
|
semaphore_units<> units;
|
|
size_t holders = 0;
|
|
std::optional<std::source_location> first_holder;
|
|
// last_holder is the caller of endpoint_permit who last took this entry,
|
|
// it might not be a current holder (the permit might've been destroyed)
|
|
std::optional<std::source_location> last_holder;
|
|
|
|
endpoint_lock_entry() noexcept;
|
|
};
|
|
using endpoint_locks_map = utils::loading_shared_values<locator::host_id, endpoint_lock_entry>;
|
|
class endpoint_permit {
|
|
endpoint_locks_map::entry_ptr _ptr;
|
|
permit_id _permit_id;
|
|
locator::host_id _addr;
|
|
std::source_location _caller;
|
|
public:
|
|
endpoint_permit(endpoint_locks_map::entry_ptr&& ptr, locator::host_id addr, std::source_location caller) noexcept;
|
|
endpoint_permit(endpoint_permit&&) noexcept;
|
|
~endpoint_permit();
|
|
bool release() noexcept;
|
|
const permit_id& id() const noexcept { return _permit_id; }
|
|
};
|
|
// Must be called on shard 0
|
|
future<endpoint_permit> lock_endpoint(locator::host_id, permit_id pid, std::source_location l = std::source_location::current());
|
|
|
|
private:
|
|
void permit_internal_error(const locator::host_id& addr, permit_id pid);
|
|
void verify_permit(const locator::host_id& addr, permit_id pid) {
|
|
if (!pid) {
|
|
permit_internal_error(addr, pid);
|
|
}
|
|
}
|
|
|
|
/* map where key is the endpoint and value is the state associated with the endpoint */
|
|
std::unordered_map<locator::host_id, endpoint_state_ptr> _endpoint_state_map;
|
|
// Used for serializing changes to _endpoint_state_map and running of associated change listeners.
|
|
endpoint_locks_map _endpoint_locks;
|
|
|
|
public:
|
|
static constexpr std::array DEAD_STATES{
|
|
versioned_value::REMOVED_TOKEN,
|
|
versioned_value::STATUS_LEFT,
|
|
};
|
|
static constexpr std::array SILENT_SHUTDOWN_STATES{
|
|
versioned_value::REMOVED_TOKEN,
|
|
versioned_value::STATUS_LEFT,
|
|
versioned_value::STATUS_BOOTSTRAPPING,
|
|
versioned_value::STATUS_UNKNOWN,
|
|
};
|
|
static constexpr std::chrono::milliseconds INTERVAL{1000};
|
|
static constexpr std::chrono::hours A_VERY_LONG_TIME{24 * 3};
|
|
|
|
static constexpr std::chrono::milliseconds GOSSIP_SETTLE_MIN_WAIT_MS{5000};
|
|
|
|
// Maximum difference between remote generation value and generation
|
|
// value this node would get if this node were restarted that we are
|
|
// willing to accept about a peer.
|
|
static constexpr generation_type::value_type MAX_GENERATION_DIFFERENCE = 86400 * 365;
|
|
std::chrono::milliseconds fat_client_timeout;
|
|
|
|
std::chrono::milliseconds quarantine_delay() const noexcept;
|
|
private:
|
|
mutable std::default_random_engine _random_engine{std::random_device{}()};
|
|
|
|
/**
|
|
* subscribers for interest in EndpointState change
|
|
*/
|
|
atomic_vector<shared_ptr<i_endpoint_state_change_subscriber>> _subscribers;
|
|
|
|
std::list<std::vector<locator::host_id>> _endpoints_to_talk_with;
|
|
|
|
/* live member set */
|
|
std::unordered_set<locator::host_id> _live_endpoints;
|
|
uint64_t _live_endpoints_version = 0;
|
|
|
|
/* nodes are being marked as alive */
|
|
std::unordered_set<locator::host_id> _pending_mark_alive_endpoints;
|
|
|
|
/* unreachable member set */
|
|
std::unordered_map<locator::host_id, clk::time_point> _unreachable_endpoints;
|
|
|
|
semaphore _endpoint_update_semaphore = semaphore(1);
|
|
|
|
/* initial seeds for joining the cluster */
|
|
std::set<inet_address> _seeds;
|
|
|
|
/* map where key is endpoint and value is timestamp when this endpoint was removed from
|
|
* gossip. We will ignore any gossip regarding these endpoints for QUARANTINE_DELAY time
|
|
* after removal to prevent nodes from falsely reincarnating during the time when removal
|
|
* gossip gets propagated to all nodes */
|
|
std::map<locator::host_id, clk::time_point> _just_removed_endpoints;
|
|
|
|
std::map<locator::host_id, clk::time_point> _expire_time_endpoint_map;
|
|
|
|
bool _in_shadow_round = false;
|
|
|
|
service::topology_state_machine* _topo_sm = nullptr;
|
|
|
|
// Must be called on shard 0.
|
|
future<semaphore_units<>> lock_endpoint_update_semaphore();
|
|
|
|
struct live_and_unreachable_endpoints {
|
|
std::unordered_set<locator::host_id> live;
|
|
std::unordered_map<locator::host_id, clk::time_point> unreachable;
|
|
};
|
|
|
|
// Must be called on shard 0.
|
|
// Update _live_endpoints and/or _unreachable_endpoints
|
|
// on shard 0, after acquiring `lock_endpoint_update_semaphore`.
|
|
//
|
|
// The called function modifies a copy of live_and_unreachable_endpoints
|
|
// which is then copied to temporary copies for all shards
|
|
// and then applied atomcically on all shards.
|
|
//
|
|
future<> mutate_live_and_unreachable_endpoints(std::function<void(live_and_unreachable_endpoints&)> func);
|
|
|
|
// replicate shard 0 live and unreachable endpoints sets across all other shards.
|
|
// _endpoint_update_semaphore must be held for the whole duration
|
|
future<> replicate_live_endpoints_on_change(foreign_ptr<std::unique_ptr<live_and_unreachable_endpoints>>, uint64_t new_live_endpoints_version);
|
|
|
|
void run();
|
|
// Replicates given endpoint_state to all other shards.
|
|
// The state state doesn't have to be kept alive around until completes.
|
|
// Must be called under lock_endpoint.
|
|
future<> replicate(endpoint_state, permit_id);
|
|
public:
|
|
explicit gossiper(abort_source& as, const locator::shared_token_metadata& stm, netw::messaging_service& ms, gossip_config gcfg, gossip_address_map& address_map);
|
|
|
|
/**
|
|
* Register for interesting state changes.
|
|
*
|
|
* @param subscriber module which implements the IEndpointStateChangeSubscriber
|
|
*/
|
|
void register_(shared_ptr<i_endpoint_state_change_subscriber> subscriber);
|
|
|
|
/**
|
|
* Unregister interest for state changes.
|
|
*
|
|
* @param subscriber module which implements the IEndpointStateChangeSubscriber
|
|
*/
|
|
future<> unregister_(shared_ptr<i_endpoint_state_change_subscriber> subscriber);
|
|
|
|
std::set<locator::host_id> get_live_members() const;
|
|
|
|
std::set<locator::host_id> get_live_token_owners() const;
|
|
|
|
/**
|
|
* @return a list of unreachable gossip participants, including fat clients
|
|
*/
|
|
std::set<locator::host_id> get_unreachable_members() const;
|
|
|
|
/**
|
|
* @return a list of unreachable nodes
|
|
*/
|
|
std::set<locator::host_id> get_unreachable_nodes() const;
|
|
|
|
int64_t get_endpoint_downtime(locator::host_id ep) const noexcept;
|
|
|
|
/**
|
|
* Return either: the greatest heartbeat or application state
|
|
*
|
|
* @param ep_state
|
|
* @return
|
|
*/
|
|
version_type get_max_endpoint_state_version(const endpoint_state& state) const noexcept;
|
|
|
|
void set_topology_state_machine(service::topology_state_machine* m) {
|
|
_topo_sm = m;
|
|
if (m) {
|
|
// In raft topology mode the coodinator maintains banned nodes list
|
|
_just_removed_endpoints.clear();
|
|
}
|
|
}
|
|
|
|
private:
|
|
/**
|
|
* @param endpoint end point that is convicted.
|
|
*/
|
|
future<> convict(locator::host_id endpoint);
|
|
|
|
/**
|
|
* Removes the endpoint from gossip completely
|
|
*
|
|
* @param endpoint endpoint to be removed from the current membership.
|
|
*
|
|
* Must be called under lock_endpoint.
|
|
*/
|
|
future<> evict_from_membership(locator::host_id endpoint, permit_id);
|
|
public:
|
|
/**
|
|
* Removes the endpoint from Gossip but retains endpoint state
|
|
*/
|
|
future<> remove_endpoint(locator::host_id endpoint, permit_id);
|
|
// Returns true if an endpoint was removed
|
|
future<> force_remove_endpoint(locator::host_id id, permit_id);
|
|
private:
|
|
/**
|
|
* Quarantines the endpoint for QUARANTINE_DELAY
|
|
*
|
|
* @param endpoint
|
|
*/
|
|
void quarantine_endpoint(locator::host_id id);
|
|
|
|
/**
|
|
* Quarantines the endpoint until quarantine_start + QUARANTINE_DELAY
|
|
*
|
|
* @param endpoint
|
|
* @param quarantine_start
|
|
*/
|
|
void quarantine_endpoint(locator::host_id id, clk::time_point quarantine_start);
|
|
|
|
private:
|
|
/**
|
|
* The gossip digest is built based on randomization
|
|
* rather than just looping through the collection of live endpoints.
|
|
*
|
|
*/
|
|
utils::chunked_vector<gossip_digest> make_random_gossip_digest() const;
|
|
|
|
public:
|
|
/**
|
|
* Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
|
|
*
|
|
* @param endpoint
|
|
* @param host_id
|
|
*/
|
|
future<> advertise_token_removed(locator::host_id host_id, permit_id);
|
|
|
|
/**
|
|
* Do not call this method unless you know what you are doing.
|
|
* It will try extremely hard to obliterate any endpoint from the ring,
|
|
* even if it does not know about it.
|
|
*
|
|
* @param address
|
|
* @throws UnknownHostException
|
|
*/
|
|
future<> assassinate_endpoint(sstring address);
|
|
|
|
public:
|
|
future<generation_type> get_current_generation_number(locator::host_id endpoint) const;
|
|
future<version_type> get_current_heart_beat_version(locator::host_id endpoint) const;
|
|
|
|
bool is_gossip_only_member(locator::host_id endpoint) const;
|
|
bool is_safe_for_bootstrap(inet_address endpoint) const;
|
|
bool is_safe_for_restart(locator::host_id host_id) const;
|
|
private:
|
|
/**
|
|
* Returns true if the chosen target was also a seed. False otherwise
|
|
*
|
|
* @param message
|
|
* @param epSet a set of endpoint from which a random endpoint is chosen.
|
|
* @return true if the chosen endpoint is also a seed.
|
|
*/
|
|
template<typename T>
|
|
future<> send_gossip(gossip_digest_syn message, std::set<T> epset);
|
|
|
|
/* Sends a Gossip message to a live member */
|
|
template<typename T>
|
|
future<> do_gossip_to_live_member(gossip_digest_syn message, T ep);
|
|
|
|
/* Sends a Gossip message to an unreachable member */
|
|
future<> do_gossip_to_unreachable_member(gossip_digest_syn message);
|
|
|
|
future<> do_status_check();
|
|
|
|
public:
|
|
clk::time_point get_expire_time_for_endpoint(locator::host_id endpoint) const noexcept;
|
|
|
|
// Gets a shared pointer to the endpoint_state, if exists.
|
|
// Otherwise, returns a null ptr.
|
|
// The endpoint_state is immutable (except for its update_timestamp), guaranteed not to change while
|
|
// the endpoint_state_ptr is held.
|
|
endpoint_state_ptr get_endpoint_state_ptr(locator::host_id ep) const noexcept;
|
|
|
|
// Return this node's endpoint_state_ptr
|
|
endpoint_state_ptr get_this_endpoint_state_ptr() const noexcept {
|
|
return get_endpoint_state_ptr(my_host_id());
|
|
}
|
|
|
|
const versioned_value* get_application_state_ptr(locator::host_id id, application_state appstate) const noexcept;
|
|
sstring get_application_state_value(locator::host_id endpoint, application_state appstate) const;
|
|
|
|
// removes ALL endpoint states; should only be called after shadow gossip.
|
|
// Must be called on shard 0
|
|
future<> reset_endpoint_state_map();
|
|
|
|
std::vector<locator::host_id> get_endpoints() const;
|
|
|
|
size_t num_endpoints() const noexcept {
|
|
return _endpoint_state_map.size();
|
|
}
|
|
|
|
// Calls func for each endpoint_state.
|
|
// Called function must not yield
|
|
void for_each_endpoint_state(std::function<void(const endpoint_state&)> func) const {
|
|
for_each_endpoint_state_until([func = std::move(func)] (const endpoint_state& eps) {
|
|
func(eps);
|
|
return stop_iteration::no;
|
|
});
|
|
}
|
|
|
|
// Calls func for each endpoint_state until it returns stop_iteration::yes
|
|
// Returns stop_iteration::yes iff `func` returns stop_iteration::yes.
|
|
// Called function must not yield
|
|
stop_iteration for_each_endpoint_state_until(std::function<stop_iteration(const endpoint_state&)>) const;
|
|
|
|
locator::host_id get_host_id(inet_address endpoint) const;
|
|
std::optional<locator::host_id> try_get_host_id(inet_address endpoint) const;
|
|
|
|
std::optional<gms::inet_address> get_node_ip(locator::host_id host_id) const;
|
|
|
|
std::optional<endpoint_state> get_state_for_version_bigger_than(locator::host_id for_endpoint, version_type version) const;
|
|
|
|
/**
|
|
* determine which endpoint started up earlier
|
|
*/
|
|
std::strong_ordering compare_endpoint_startup(locator::host_id addr1, locator::host_id addr2) const;
|
|
|
|
/**
|
|
* Return the rpc address associated with an endpoint as a string.
|
|
* @param endpoint The endpoint to get rpc address for
|
|
* @return the rpc address
|
|
*/
|
|
sstring get_rpc_address(const locator::host_id& endpoint) const;
|
|
|
|
future<> real_mark_alive(locator::host_id addr);
|
|
private:
|
|
endpoint_state& my_endpoint_state();
|
|
|
|
// Use with care, as the endpoint_state_ptr in the endpoint_state_map is considered
|
|
// immutable, with one exception - the update_timestamp.
|
|
void update_timestamp(const endpoint_state_ptr& eps) noexcept;
|
|
const endpoint_state& get_endpoint_state(locator::host_id ep) const;
|
|
|
|
void update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map);
|
|
|
|
void mark_alive(endpoint_state_ptr node);
|
|
|
|
// Must be called under lock_endpoint.
|
|
future<> mark_dead(locator::host_id addr, endpoint_state_ptr local_state, permit_id);
|
|
|
|
// Must be called under lock_endpoint.
|
|
future<> mark_as_shutdown(const locator::host_id& endpoint, permit_id);
|
|
|
|
/**
|
|
* This method is called whenever there is a "big" change in ep state (a generation change for a known node).
|
|
*
|
|
* @param ep endpoint
|
|
* @param ep_state EndpointState for the endpoint
|
|
*
|
|
* Must be called under lock_endpoint.
|
|
*/
|
|
future<> handle_major_state_change(endpoint_state eps, permit_id, bool shadow_round);
|
|
|
|
future<> wait_alive_helper(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout);
|
|
public:
|
|
bool is_alive(locator::host_id id) const;
|
|
|
|
bool is_dead_state(const endpoint_state& eps) const;
|
|
// Wait for nodes to be alive on all shards
|
|
future<> wait_alive(std::vector<gms::inet_address> nodes, std::chrono::milliseconds timeout);
|
|
future<> wait_alive(std::vector<locator::host_id> nodes, std::chrono::milliseconds timeout);
|
|
future<> wait_alive(noncopyable_function<std::vector<locator::host_id>()> get_nodes, std::chrono::milliseconds timeout);
|
|
|
|
// Wait for `n` live nodes to show up in gossip (including ourself).
|
|
future<> wait_for_live_nodes_to_show_up(size_t n);
|
|
|
|
// Get live members synchronized to all shards
|
|
future<std::set<inet_address>> get_live_members_synchronized();
|
|
|
|
// Get live members synchronized to all shards
|
|
future<std::set<inet_address>> get_unreachable_members_synchronized();
|
|
|
|
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
|
|
|
|
private:
|
|
future<> do_apply_state_locally(locator::host_id node, endpoint_state remote_state, bool shadow_round);
|
|
future<> apply_state_locally_in_shadow_round(std::unordered_map<inet_address, endpoint_state> map);
|
|
|
|
// Must be called under lock_endpoint.
|
|
future<> apply_new_states(endpoint_state local_state, const endpoint_state& remote_state, permit_id, bool shadow_round);
|
|
|
|
// notify that an application state has changed
|
|
// Must be called under lock_endpoint.
|
|
future<> do_on_change_notifications(inet_address addr, locator::host_id id, const application_state_map& states, permit_id) const;
|
|
|
|
// notify that a node is DOWN (dead)
|
|
// Must be called under lock_endpoint.
|
|
future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id) const;
|
|
|
|
/* Request all the state for the endpoint in the g_digest */
|
|
|
|
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, generation_type remote_generation) const;
|
|
|
|
/* Send all the data with version greater than max_remote_version */
|
|
void send_all(gossip_digest& g_digest, std::map<inet_address, endpoint_state>& delta_ep_state_map, version_type max_remote_version) const;
|
|
|
|
/*
|
|
This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
|
|
and the delta state are built up.
|
|
*/
|
|
void examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_list,
|
|
utils::chunked_vector<gossip_digest>& delta_gossip_digest_list,
|
|
std::map<inet_address, endpoint_state>& delta_ep_state_map) const;
|
|
|
|
public:
|
|
/**
|
|
* Start the gossiper with the generation number, preloading the map of application states before starting
|
|
*
|
|
* If advertise is set to false, gossip will not respond to gossip echo
|
|
* message, so that other nodes will not mark this node as alive.
|
|
*
|
|
* Note 1: In practice, advertise is set to false only when the local node is
|
|
* replacing a dead node using the same ip address of the dead node, i.e.,
|
|
* replacing_a_node_with_same_ip is set to true, because the issue (#7312)
|
|
* that the advertise flag fixes is limited to replacing a node with the
|
|
* same ip address only.
|
|
*
|
|
* Note 2: When a node with a new ip address joins the cluster, e.g.,
|
|
* replacing a dead node using the different ip address, with advertise =
|
|
* false, existing nodes will not mark the node as up. So existing nodes
|
|
* will not send gossip syn messages to the new node because the new node
|
|
* is not in either live node list or unreachable node list.
|
|
*
|
|
* The new node will only include itself in the gossip syn messages, so the
|
|
* syn message from new node to existing node will not exchange gossip
|
|
* application states of existing nodes. Gossip exchanges node information
|
|
* for node listed in SYN messages only.
|
|
*
|
|
* As a result, the new node will not learn other existing nodes in gossip
|
|
* and existing nodes will learn the new node.
|
|
*
|
|
* Note 3: When a node replaces a dead node using the same ip address of
|
|
* the dead node, with advertise = false, existing nodes will send syn
|
|
* messages to the replacing node, because the replacing node is listed
|
|
* in the unreachable node list.
|
|
*
|
|
* As a result, the replacing node will learn other existing nodes in
|
|
* gossip and existing nodes will learn the new replacing node. Yes,
|
|
* unreachable node is contacted with some probability, but all of the
|
|
* existing nodes can talk to the replacing node. So the probability of
|
|
* replacing node being talked to is pretty high.
|
|
*/
|
|
future<> start_gossiping(gms::generation_type generation_nbr, application_state_map preload_local_states = {});
|
|
|
|
public:
|
|
using mandatory = bool_class<class mandatory_tag>;
|
|
/**
|
|
* Do a single 'shadow' round of gossip, where we do not modify any state
|
|
*/
|
|
future<> do_shadow_round(std::unordered_set<gms::inet_address> nodes, mandatory is_mandatory);
|
|
|
|
private:
|
|
void build_seeds_list();
|
|
// Must be called on shard 0
|
|
future<> do_stop_gossiping();
|
|
|
|
public:
|
|
/**
|
|
* Add an endpoint we knew about previously, but whose state is unknown
|
|
*/
|
|
future<> add_saved_endpoint(locator::host_id host_id, loaded_endpoint_state st, permit_id);
|
|
|
|
future<> add_local_application_state(application_state state, versioned_value value);
|
|
|
|
/**
|
|
* Applies all states in set "atomically", as in guaranteed monotonic versions and
|
|
* inserted into endpoint state together (and assuming same grouping, overwritten together).
|
|
*/
|
|
future<> add_local_application_state(application_state_map states);
|
|
|
|
// Add multiple application states
|
|
future<> add_local_application_state(std::convertible_to<std::pair<const application_state, versioned_value>> auto&&... states);
|
|
|
|
future<> start();
|
|
future<> shutdown();
|
|
// Needed by seastar::sharded
|
|
future<> stop();
|
|
|
|
public:
|
|
bool is_enabled() const;
|
|
|
|
public:
|
|
void add_expire_time_for_endpoint(locator::host_id endpoint, clk::time_point expire_time);
|
|
|
|
static clk::time_point compute_expire_time();
|
|
public:
|
|
bool is_seed(const inet_address& endpoint) const;
|
|
bool is_shutdown(const locator::host_id& endpoint) const;
|
|
bool is_shutdown(const endpoint_state& eps) const;
|
|
bool is_normal(const locator::host_id& endpoint) const;
|
|
bool is_left(const locator::host_id& endpoint) const;
|
|
// Check if a node is in NORMAL or SHUTDOWN status which means the node is
|
|
// part of the token ring from the gossip point of view and operates in
|
|
// normal status or was in normal status but is shutdown.
|
|
bool is_normal_ring_member(const locator::host_id& endpoint) const;
|
|
bool is_cql_ready(const locator::host_id& endpoint) const;
|
|
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
|
|
void force_newer_generation();
|
|
public:
|
|
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;
|
|
std::string_view get_gossip_status(const locator::host_id& endpoint) const noexcept;
|
|
public:
|
|
future<> wait_for_gossip_to_settle() const;
|
|
future<> wait_for_range_setup() const;
|
|
private:
|
|
future<> wait_for_gossip(std::chrono::milliseconds, std::optional<int32_t> = {}) const;
|
|
|
|
uint64_t _nr_run = 0;
|
|
uint64_t _msg_processing = 0;
|
|
|
|
class msg_proc_guard;
|
|
private:
|
|
abort_source& _abort_source;
|
|
const locator::shared_token_metadata& _shared_token_metadata;
|
|
netw::messaging_service& _messaging;
|
|
gossip_address_map& _address_map;
|
|
gossip_config _gcfg;
|
|
// Get features supported by a particular node
|
|
std::set<sstring> get_supported_features(locator::host_id endpoint) const;
|
|
locator::token_metadata_ptr get_token_metadata_ptr() const noexcept;
|
|
public:
|
|
void check_knows_remote_features(std::set<std::string_view>& local_features, const std::unordered_map<locator::host_id, sstring>& loaded_peer_features) const;
|
|
// Get features supported by all the nodes this node knows about
|
|
std::set<sstring> get_supported_features(const std::unordered_map<locator::host_id, sstring>& loaded_peer_features, ignore_features_of_local_node ignore_local_node) const;
|
|
private:
|
|
seastar::metrics::metric_groups _metrics;
|
|
public:
|
|
void append_endpoint_state(std::stringstream& ss, const endpoint_state& state);
|
|
public:
|
|
void check_snitch_name_matches(sstring local_snitch_name) const;
|
|
int get_down_endpoint_count() const noexcept;
|
|
int get_up_endpoint_count() const noexcept;
|
|
// Send UP notification to all nodes in the set
|
|
future<> notify_nodes_on_up(std::unordered_set<locator::host_id>);
|
|
const gossip_address_map& get_address_map() const {
|
|
return _address_map;
|
|
}
|
|
gossip_address_map& get_mutable_address_map() const {
|
|
return _address_map;
|
|
}
|
|
private:
|
|
future<> failure_detector_loop();
|
|
future<> failure_detector_loop_for_node(locator::host_id node, generation_type gossip_generation, uint64_t live_endpoints_version);
|
|
};
|
|
|
|
|
|
struct gossip_get_endpoint_states_request {
|
|
// Application states the sender requested
|
|
std::unordered_set<gms::application_state> application_states;
|
|
};
|
|
|
|
struct gossip_get_endpoint_states_response {
|
|
std::unordered_map<gms::inet_address, gms::endpoint_state> endpoint_state_map;
|
|
};
|
|
|
|
future<>
|
|
gossiper::add_local_application_state(std::convertible_to<std::pair<const application_state, versioned_value>> auto&&... states) {
|
|
application_state_map tmp;
|
|
(..., tmp.emplace(std::forward<decltype(states)>(states)));
|
|
return add_local_application_state(std::move(tmp));
|
|
}
|
|
|
|
|
|
} // namespace gms
|
|
|
|
template <>
|
|
struct fmt::formatter<gms::loaded_endpoint_state> {
|
|
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
|
auto format(const gms::loaded_endpoint_state&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
|
};
|