Merge "Get rid of seed concept in gossip" from Asias
" gossip: Get rid of seed concept The concept of seed and the different behaviour between seed nodes and non seed nodes generate a lot of confusion, complication and error for users. For example, how to add a seed node into into a cluster, how to promote a non seed node to a seed node, how to choose seeds node in multiple DC setup, edit config files for seeds, why seed node does not bootstrap. If we remove the concept of seed, it will get much easier for users. After this series, seed config option is only used once when a new node joins a cluster. Major changes: Seed nodes are only used as the initial contact point nodes. Seed nodes now perform bootstrap. The only exception is the first node in the cluster. The unsafe auto_bootstrap option is now ignored. Gossip shadow round now talks to all nodes instead of just seed nodes. Refs: #6845 Tests: update_cluster_layout_tests.py + manual test " * 'gossip_no_seed_v2' of github.com:asias/scylla: gossip: Get rid of seed concept gossip: Introduce GOSSIP_GET_ENDPOINT_STATES verb gossip: Add do_apply_state_locally helper gossip: Do not talk to seed node explicitly gossip: Talk to live endpoints in a shuffled fashion
This commit is contained in:
378
gms/gossiper.cc
378
gms/gossiper.cc
@@ -433,6 +433,26 @@ future<> gossiper::handle_shutdown_msg(inet_address from) {
|
||||
});
|
||||
}
|
||||
|
||||
future<gossip_get_endpoint_states_response>
|
||||
gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> map;
|
||||
const auto& application_states_wanted = request.application_states;
|
||||
for (auto& item : endpoint_state_map) {
|
||||
const inet_address& node = item.first;
|
||||
const endpoint_state& state = item.second;
|
||||
const heart_beat_state& hbs = state.get_heart_beat_state();
|
||||
auto state_wanted = endpoint_state(hbs);
|
||||
const std::map<application_state, versioned_value>& apps = state.get_application_state_map();
|
||||
for (const auto& app : apps) {
|
||||
if (application_states_wanted.count(app.first) > 0) {
|
||||
state_wanted.get_application_state_map().emplace(app);
|
||||
}
|
||||
}
|
||||
map.emplace(node, std::move(state_wanted));
|
||||
}
|
||||
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
|
||||
}
|
||||
|
||||
future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
if (_ms_registered) {
|
||||
return make_ready_future<>();
|
||||
@@ -483,6 +503,11 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_get_endpoint_states([] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
|
||||
return smp::submit_to(0, [request = std::move(request)] () mutable {
|
||||
return gms::get_local_gossiper().handle_get_endpoint_states_msg(std::move(request));
|
||||
});
|
||||
});
|
||||
|
||||
// Start listening messaging_service after gossip message handlers are registered
|
||||
if (do_bind) {
|
||||
@@ -498,7 +523,8 @@ future<> gossiper::uninit_messaging_service_handler() {
|
||||
ms.unregister_gossip_shutdown(),
|
||||
ms.unregister_gossip_digest_syn(),
|
||||
ms.unregister_gossip_digest_ack(),
|
||||
ms.unregister_gossip_digest_ack2()
|
||||
ms.unregister_gossip_digest_ack2(),
|
||||
ms.unregister_gossip_get_endpoint_states()
|
||||
).then_unpack([this] {
|
||||
_ms_registered = false;
|
||||
});
|
||||
@@ -516,7 +542,6 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address>
|
||||
inet_address to = __live_endpoints[index];
|
||||
auto id = get_msg_addr(to);
|
||||
logger.trace("Sending a GossipDigestSyn to {} ...", id);
|
||||
_gossiped_to_seed = _seeds.count(to);
|
||||
return ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
// It is normal to reach here because it is normal that a node
|
||||
// tries to send a SYN message to a peer node which is down before
|
||||
@@ -559,6 +584,85 @@ void gossiper::notify_failure_detector(inet_address endpoint, const endpoint_sta
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) {
|
||||
// If state does not exist just add it. If it does then add it if the remote generation is greater.
|
||||
// If there is a generation tie, attempt to break it by heartbeat version.
|
||||
auto permit = this->lock_endpoint(node).get0();
|
||||
auto es = this->get_endpoint_state_for_endpoint_ptr(node);
|
||||
if (es) {
|
||||
endpoint_state& local_state = *es;
|
||||
int local_generation = local_state.get_heart_beat_state().get_generation();
|
||||
int remote_generation = remote_state.get_heart_beat_state().get_generation();
|
||||
logger.trace("{} local generation {}, remote generation {}", node, local_generation, remote_generation);
|
||||
if (remote_generation > utils::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
|
||||
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
node, local_generation, remote_generation);
|
||||
} else if (remote_generation > local_generation) {
|
||||
if (listener_notification) {
|
||||
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
|
||||
// major state change will handle the update by inserting the remote state directly
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
} else {
|
||||
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
|
||||
endpoint_state_map[node] = remote_state;
|
||||
}
|
||||
} else if (remote_generation == local_generation) {
|
||||
if (listener_notification) {
|
||||
// find maximum state
|
||||
int local_max_version = this->get_max_endpoint_state_version(local_state);
|
||||
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
|
||||
if (remote_max_version > local_max_version) {
|
||||
// apply states, but do not notify since there is no major change
|
||||
this->apply_new_states(node, local_state, remote_state);
|
||||
} else {
|
||||
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
|
||||
}
|
||||
if (!local_state.is_alive() && !this->is_dead_state(local_state)) { // unless of course, it was dead
|
||||
this->mark_alive(node, local_state);
|
||||
}
|
||||
} else {
|
||||
for (const auto& item : remote_state.get_application_state_map()) {
|
||||
const auto& remote_key = item.first;
|
||||
const auto& remote_value = item.second;
|
||||
const versioned_value* local_value = local_state.get_application_state_ptr(remote_key);
|
||||
if (!local_value || remote_value.version > local_value->version) {
|
||||
logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}",
|
||||
node, remote_key, remote_value);
|
||||
local_state.add_application_state(remote_key, remote_value);
|
||||
} else {
|
||||
logger.debug("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
|
||||
}
|
||||
} else {
|
||||
if (listener_notification) {
|
||||
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
|
||||
fd().report(node);
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
} else {
|
||||
logger.debug("Applying remote_state for node {} (new node)", node);
|
||||
endpoint_state_map[node] = remote_state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map) {
|
||||
for (auto& x : map) {
|
||||
const inet_address& node = x.first;
|
||||
const endpoint_state& remote_state = x.second;
|
||||
if (node == this->get_broadcast_address()) {
|
||||
continue;
|
||||
}
|
||||
do_apply_state_locally(node, remote_state, false);
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> map) {
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
auto endpoints = boost::copy_range<utils::chunked_vector<inet_address>>(map | boost::adaptors::map_keys);
|
||||
@@ -578,47 +682,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
|
||||
}
|
||||
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, &ep, &map] () mutable {
|
||||
return seastar::async([this, &ep, &map] () mutable {
|
||||
/*
|
||||
If state does not exist just add it. If it does then add it if the remote generation is greater.
|
||||
If there is a generation tie, attempt to break it by heartbeat version.
|
||||
*/
|
||||
auto permit = this->lock_endpoint(ep).get0();
|
||||
const endpoint_state& remote_state = map[ep];
|
||||
auto es = this->get_endpoint_state_for_endpoint_ptr(ep);
|
||||
if (es) {
|
||||
endpoint_state& local_ep_state_ptr = *es;
|
||||
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
|
||||
int remote_generation = remote_state.get_heart_beat_state().get_generation();
|
||||
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
|
||||
if (remote_generation > utils::get_generation_number() + MAX_GENERATION_DIFFERENCE) {
|
||||
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
ep, local_generation, remote_generation);
|
||||
} else if (remote_generation > local_generation) {
|
||||
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, ep);
|
||||
// major state change will handle the update by inserting the remote state directly
|
||||
this->handle_major_state_change(ep, remote_state);
|
||||
} else if (remote_generation == local_generation) { //generation has not changed, apply new states
|
||||
/* find maximum state */
|
||||
int local_max_version = this->get_max_endpoint_state_version(local_ep_state_ptr);
|
||||
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
|
||||
if (remote_max_version > local_max_version) {
|
||||
// apply states, but do not notify since there is no major change
|
||||
this->apply_new_states(ep, local_ep_state_ptr, remote_state);
|
||||
} else {
|
||||
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, ep);
|
||||
}
|
||||
if (!local_ep_state_ptr.is_alive() && !this->is_dead_state(local_ep_state_ptr)) { // unless of course, it was dead
|
||||
this->mark_alive(ep, local_ep_state_ptr);
|
||||
}
|
||||
} else {
|
||||
logger.trace("Ignoring remote generation {} < {}", remote_generation, local_generation);
|
||||
}
|
||||
} else {
|
||||
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
|
||||
fd().report(ep);
|
||||
this->handle_major_state_change(ep, remote_state);
|
||||
}
|
||||
do_apply_state_locally(ep, map[ep], true);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -648,7 +712,6 @@ void gossiper::remove_endpoint(inet_address endpoint) {
|
||||
}
|
||||
|
||||
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), endpoint)));
|
||||
_live_endpoints_just_added.remove(endpoint);
|
||||
_unreachable_endpoints.erase(endpoint);
|
||||
_syn_handlers.erase(endpoint);
|
||||
_ack_handlers.erase(endpoint);
|
||||
@@ -736,38 +799,48 @@ void gossiper::run() {
|
||||
if (g_digests.size() > 0) {
|
||||
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), g_digests);
|
||||
|
||||
_gossiped_to_seed = false;
|
||||
|
||||
auto get_random_node = [this] (const utils::chunked_vector<inet_address>& nodes) {
|
||||
std::uniform_int_distribution<int> dist(0, nodes.size() - 1);
|
||||
int index = dist(this->_random_engine);
|
||||
return nodes[index];
|
||||
};
|
||||
|
||||
/* Gossip to some random live members */
|
||||
// TODO: For now, we choose 10th of all the nodes in the cluster.
|
||||
auto nr_live_nodes = std::max(size_t(1), endpoint_state_map.size() / 10);
|
||||
nr_live_nodes = std::min(nr_live_nodes, _live_endpoints.size());
|
||||
std::unordered_set<gms::inet_address> live_nodes;
|
||||
logger.debug("nr_live_nodes={}, endpoint_state_map.size()={}, live_endpoints.size={}",
|
||||
nr_live_nodes, endpoint_state_map.size(), _live_endpoints.size());
|
||||
while (live_nodes.size() < nr_live_nodes && nr_live_nodes <= _live_endpoints.size()) {
|
||||
if (!_live_endpoints_just_added.empty()) {
|
||||
auto ep = _live_endpoints_just_added.front();
|
||||
_live_endpoints_just_added.pop_front();
|
||||
logger.debug("Favor newly added node {}", ep);
|
||||
live_nodes.insert(ep);
|
||||
} else {
|
||||
// Get a random live node
|
||||
live_nodes.insert(get_random_node(_live_endpoints));
|
||||
if (_endpoints_to_talk_with.empty()) {
|
||||
std::shuffle(_live_endpoints.begin(), _live_endpoints.end(), _random_engine);
|
||||
// This guarantees the local node will talk with all nodes
|
||||
// in live_endpoints at least once within nr_rounds gossip rounds.
|
||||
// Other gossip implementation like SWIM uses similar approach.
|
||||
// https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf
|
||||
size_t nr_rounds = 10;
|
||||
size_t nodes_per_round = (_live_endpoints.size() + nr_rounds - 1) / nr_rounds;
|
||||
std::vector<inet_address> live_nodes;
|
||||
for (const auto& node : _live_endpoints) {
|
||||
if (live_nodes.size() < nodes_per_round) {
|
||||
live_nodes.push_back(node);
|
||||
} else {
|
||||
_endpoints_to_talk_with.push_back(std::move(live_nodes));
|
||||
live_nodes = {node};
|
||||
}
|
||||
}
|
||||
if (!live_nodes.empty()) {
|
||||
_endpoints_to_talk_with.push_back(live_nodes);
|
||||
}
|
||||
logger.debug("Set live nodes to talk: endpoint_state_map={}, all_live_nodes={}, endpoints_to_talk_with={}",
|
||||
endpoint_state_map.size(), _live_endpoints, _endpoints_to_talk_with);
|
||||
}
|
||||
if (_endpoints_to_talk_with.empty()) {
|
||||
auto nodes = std::vector<inet_address>(_seeds.begin(), _seeds.end());
|
||||
logger.debug("No live nodes yet: try initial contact point nodes={}", nodes);
|
||||
if (!nodes.empty()) {
|
||||
_endpoints_to_talk_with.push_back(std::move(nodes));
|
||||
}
|
||||
}
|
||||
logger.debug("Talk to {} live nodes: {}", nr_live_nodes, live_nodes);
|
||||
for (auto& ep: live_nodes) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
|
||||
logger.trace("Failed to do_gossip_to_live_member: {}", ep);
|
||||
});
|
||||
if (!_endpoints_to_talk_with.empty()) {
|
||||
auto live_nodes = _endpoints_to_talk_with.front();
|
||||
_endpoints_to_talk_with.pop_front();
|
||||
logger.debug("Talk to live nodes: {}", live_nodes);
|
||||
for (auto& ep: live_nodes) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_live_member(message, ep).handle_exception([] (auto ep) {
|
||||
logger.trace("Failed to do_gossip_to_live_member: {}", ep);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
logger.debug("No one to talk with");
|
||||
}
|
||||
|
||||
/* Gossip to some unreachable member with some probability to check if he is back up */
|
||||
@@ -776,31 +849,6 @@ void gossiper::run() {
|
||||
logger.trace("Faill to do_gossip_to_unreachable_member: {}", ep);
|
||||
});
|
||||
|
||||
/* Gossip to a seed if we did not do so above, or we have seen less nodes
|
||||
than there are seeds. This prevents partitions where each group of nodes
|
||||
is only gossiping to a subset of the seeds.
|
||||
|
||||
The most straightforward check would be to check that all the seeds have been
|
||||
verified either as live or unreachable. To avoid that computation each round,
|
||||
we reason that:
|
||||
|
||||
either all the live nodes are seeds, in which case non-seeds that come online
|
||||
will introduce themselves to a member of the ring by definition,
|
||||
|
||||
or there is at least one non-seed node in the list, in which case eventually
|
||||
someone will gossip to it, and then do a gossip to a random seed from the
|
||||
gossipedToSeed check.
|
||||
|
||||
See CASSANDRA-150 for more exposition. */
|
||||
logger.trace("gossiped_to_seed={}, _live_endpoints.size={}, _seeds.size={}",
|
||||
_gossiped_to_seed, _live_endpoints.size(), _seeds.size());
|
||||
if (!_gossiped_to_seed || _live_endpoints.size() < _seeds.size()) {
|
||||
// Do it in the background.
|
||||
(void)do_gossip_to_seed(message).handle_exception([] (auto ep) {
|
||||
logger.trace("Faill to do_gossip_to_seed: {}", ep);
|
||||
});
|
||||
}
|
||||
|
||||
do_status_check();
|
||||
}
|
||||
|
||||
@@ -1198,30 +1246,6 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> gossiper::do_gossip_to_seed(gossip_digest_syn prod) {
|
||||
size_t size = _seeds.size();
|
||||
if (size > 0) {
|
||||
if (size == 1 && _seeds.count(get_broadcast_address())) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (_live_endpoints.size() == 0) {
|
||||
logger.trace("do_gossip_to_seed: live_endpoints nr={}, seeds nr={}", 0, _seeds.size());
|
||||
return send_gossip(prod, _seeds);
|
||||
} else {
|
||||
/* Gossip with the seed with some probability. */
|
||||
double probability = _seeds.size() / (double) (_live_endpoints.size() + _unreachable_endpoints.size());
|
||||
std::uniform_real_distribution<double> dist(0, 1);
|
||||
double rand_dbl = dist(_random_engine);
|
||||
if (rand_dbl <= probability) {
|
||||
logger.trace("do_gossip_to_seed: live_endpoints nr={}, seeds nr={}", _live_endpoints.size(), _seeds.size());
|
||||
return send_gossip(prod, _seeds);
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
bool gossiper::is_gossip_only_member(inet_address endpoint) {
|
||||
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
|
||||
if (!es) {
|
||||
@@ -1279,7 +1303,6 @@ std::optional<endpoint_state> gossiper::get_endpoint_state_for_endpoint(inet_add
|
||||
future<> gossiper::reset_endpoint_state_map() {
|
||||
_unreachable_endpoints.clear();
|
||||
_live_endpoints.clear();
|
||||
_live_endpoints_just_added.clear();
|
||||
return container().invoke_on_all([] (gossiper& g) {
|
||||
g.endpoint_state_map.clear();
|
||||
});
|
||||
@@ -1443,10 +1466,10 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
|
||||
}
|
||||
|
||||
_live_endpoints.push_back(addr);
|
||||
|
||||
auto it = std::find(_live_endpoints_just_added.begin(), _live_endpoints_just_added.end(), addr);
|
||||
if (it == _live_endpoints_just_added.end()) {
|
||||
_live_endpoints_just_added.push_back(addr);
|
||||
if (_endpoints_to_talk_with.empty()) {
|
||||
_endpoints_to_talk_with.push_back({addr});
|
||||
} else {
|
||||
_endpoints_to_talk_with.front().push_back(addr);
|
||||
}
|
||||
|
||||
if (!_in_shadow_round) {
|
||||
@@ -1464,7 +1487,6 @@ void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
|
||||
logger.trace("marking as down {}", addr);
|
||||
local_state.mark_dead();
|
||||
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
|
||||
_live_endpoints_just_added.remove(addr);
|
||||
_unreachable_endpoints[addr] = now();
|
||||
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(local_state));
|
||||
_subscribers.for_each([addr, local_state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
@@ -1742,38 +1764,86 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
|
||||
});
|
||||
}
|
||||
|
||||
future<> gossiper::do_shadow_round() {
|
||||
return seastar::async([this, g = this->shared_from_this()] {
|
||||
build_seeds_list();
|
||||
_in_shadow_round = true;
|
||||
auto t = clk::now();
|
||||
|
||||
// When peer node receives a syn message, it will send back a ack message.
|
||||
// So, we need to register gossip message handlers before sending syn message.
|
||||
get_gossiper().invoke_on_all([] (gossiper& g) {
|
||||
return g.init_messaging_service_handler();
|
||||
future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes, bind_messaging_port do_bind) {
|
||||
return seastar::async([this, g = this->shared_from_this(), nodes = std::move(nodes), do_bind] () mutable {
|
||||
get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
|
||||
return g.init_messaging_service_handler(do_bind);
|
||||
}).get();
|
||||
|
||||
while (this->_in_shadow_round) {
|
||||
// send a completely empty syn
|
||||
for (inet_address seed : _seeds) {
|
||||
utils::chunked_vector<gossip_digest> digests;
|
||||
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
|
||||
auto id = get_msg_addr(seed);
|
||||
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
|
||||
// Do it in the background.
|
||||
(void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
|
||||
nodes.erase(get_broadcast_address());
|
||||
gossip_get_endpoint_states_request request{{
|
||||
gms::application_state::STATUS,
|
||||
gms::application_state::HOST_ID,
|
||||
gms::application_state::TOKENS,
|
||||
gms::application_state::SUPPORTED_FEATURES}};
|
||||
logger.info("Gossip shadow round started with nodes={}", nodes);
|
||||
std::unordered_set<gms::inet_address> nodes_talked;
|
||||
size_t nodes_down = 0;
|
||||
auto start_time = clk::now();
|
||||
bool fall_back_to_syn_msg = false;
|
||||
std::list<gms::gossip_get_endpoint_states_response> responses;
|
||||
for (;;) {
|
||||
parallel_for_each(nodes.begin(), nodes.end(), [this, &request, &responses, &nodes_talked, &nodes_down, &fall_back_to_syn_msg] (gms::inet_address node) {
|
||||
logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states);
|
||||
return ms().send_gossip_get_endpoint_states(msg_addr(node), std::chrono::milliseconds(5000), request).then(
|
||||
[node, &nodes_talked, &responses] (gms::gossip_get_endpoint_states_response response) {
|
||||
logger.debug("Got get_endpoint_states response from {}, response={}", node, response.endpoint_state_map);
|
||||
responses.push_back(std::move(response));
|
||||
nodes_talked.insert(node);
|
||||
}).handle_exception_type([node, &fall_back_to_syn_msg] (seastar::rpc::unknown_verb_error&) {
|
||||
logger.warn("Node {} does not support get_endpoint_states verb", node);
|
||||
fall_back_to_syn_msg = true;
|
||||
}).handle_exception_type([node, &nodes_down] (seastar::rpc::closed_error&) {
|
||||
nodes_down++;
|
||||
logger.warn("Node {} is down for get_endpoint_states verb", node);
|
||||
});
|
||||
}).get();
|
||||
for (auto& response : responses) {
|
||||
apply_state_locally_without_listener_notification(response.endpoint_state_map);
|
||||
}
|
||||
if (!nodes_talked.empty()) {
|
||||
break;
|
||||
}
|
||||
if (nodes_down == nodes.size()) {
|
||||
logger.warn("All nodes={} are down for get_endpoint_states verb. Skip ShadowRound.", nodes);
|
||||
break;
|
||||
}
|
||||
if (fall_back_to_syn_msg) {
|
||||
break;
|
||||
}
|
||||
if (clk::now() > start_time + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
|
||||
throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound).", nodes));
|
||||
}
|
||||
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
|
||||
if (this->_in_shadow_round) {
|
||||
if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
|
||||
throw std::runtime_error(format("Unable to gossip with any seeds (ShadowRound)"));
|
||||
logger.info("Connect nodes={} again ... ({} seconds passed)",
|
||||
nodes, std::chrono::duration_cast<std::chrono::seconds>(clk::now() - start_time).count());
|
||||
}
|
||||
if (fall_back_to_syn_msg) {
|
||||
logger.info("Fallback to old method for ShadowRound");
|
||||
auto t = clk::now();
|
||||
_in_shadow_round = true;
|
||||
while (this->_in_shadow_round) {
|
||||
// send a completely empty syn
|
||||
for (const auto& node : nodes) {
|
||||
utils::chunked_vector<gossip_digest> digests;
|
||||
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
|
||||
auto id = get_msg_addr(node);
|
||||
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
|
||||
// Do it in the background.
|
||||
(void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
|
||||
});
|
||||
}
|
||||
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
|
||||
if (this->_in_shadow_round) {
|
||||
if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
|
||||
throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound),", nodes));
|
||||
}
|
||||
logger.info("Connect nodes={} again ... ({} seconds passed)",
|
||||
nodes, std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
|
||||
}
|
||||
logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
|
||||
}
|
||||
}
|
||||
logger.info("Gossip shadow round finisehd with nodes_talked={}", nodes_talked);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,8 @@ class gossip_digest_ack2;
|
||||
class gossip_digest;
|
||||
class inet_address;
|
||||
class i_endpoint_state_change_subscriber;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
|
||||
class feature_service;
|
||||
|
||||
@@ -129,6 +131,7 @@ private:
|
||||
future<> handle_shutdown_msg(inet_address from);
|
||||
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
|
||||
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
|
||||
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
|
||||
static constexpr uint32_t _default_cpuid = 0;
|
||||
msg_addr get_msg_addr(inet_address to);
|
||||
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list);
|
||||
@@ -195,9 +198,10 @@ private:
|
||||
*/
|
||||
atomic_vector<shared_ptr<i_endpoint_state_change_subscriber>> _subscribers;
|
||||
|
||||
std::list<std::vector<inet_address>> _endpoints_to_talk_with;
|
||||
|
||||
/* live member set */
|
||||
utils::chunked_vector<inet_address> _live_endpoints;
|
||||
std::list<inet_address> _live_endpoints_just_added;
|
||||
|
||||
/* nodes are being marked as alive */
|
||||
std::unordered_set<inet_address> _pending_mark_alive_endpoints;
|
||||
@@ -381,9 +385,6 @@ private:
|
||||
/* Sends a Gossip message to an unreachable member */
|
||||
future<> do_gossip_to_unreachable_member(gossip_digest_syn message);
|
||||
|
||||
/* Gossip to a seed for facilitating partition healing */
|
||||
future<> do_gossip_to_seed(gossip_digest_syn prod);
|
||||
|
||||
void do_status_check();
|
||||
|
||||
public:
|
||||
@@ -443,6 +444,9 @@ public:
|
||||
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
|
||||
|
||||
private:
|
||||
void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification);
|
||||
void apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map);
|
||||
|
||||
void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);
|
||||
|
||||
// notify that a local application state is going to change (doesn't get triggered for remote changes)
|
||||
@@ -481,7 +485,7 @@ public:
|
||||
* Do a single 'shadow' round of gossip, where we do not modify any state
|
||||
* Only used when replacing a node, to get and assume its states
|
||||
*/
|
||||
future<> do_shadow_round();
|
||||
future<> do_shadow_round(std::unordered_set<gms::inet_address> nodes = {}, bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
|
||||
private:
|
||||
void build_seeds_list();
|
||||
@@ -548,7 +552,6 @@ private:
|
||||
uint64_t _nr_run = 0;
|
||||
uint64_t _msg_processing = 0;
|
||||
bool _ms_registered = false;
|
||||
bool _gossiped_to_seed = false;
|
||||
bool _gossip_settled = false;
|
||||
|
||||
class msg_proc_guard;
|
||||
@@ -647,5 +650,13 @@ inline future<std::map<inet_address, arrival_window>> get_arrival_samples() {
|
||||
});
|
||||
}
|
||||
|
||||
struct gossip_get_endpoint_states_request {
|
||||
// Application states the sender requested
|
||||
std::unordered_set<gms::application_state> application_states;
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_response {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> endpoint_state_map;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -74,4 +74,12 @@ class gossip_digest_ack2 {
|
||||
std::map<gms::inet_address, gms::endpoint_state> get_endpoint_state_map();
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_request {
|
||||
std::unordered_set<gms::application_state> application_states;
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_response {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> endpoint_state_map;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -475,6 +475,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK2:
|
||||
case messaging_verb::GOSSIP_SHUTDOWN:
|
||||
case messaging_verb::GOSSIP_ECHO:
|
||||
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
|
||||
case messaging_verb::GET_SCHEMA_VERSION:
|
||||
return 0;
|
||||
case messaging_verb::PREPARE_MESSAGE:
|
||||
@@ -1065,6 +1066,16 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg));
|
||||
}
|
||||
|
||||
void messaging_service::register_gossip_get_endpoint_states(std::function<future<gms::gossip_get_endpoint_states_response> (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_gossip_get_endpoint_states() {
|
||||
return unregister_handler(messaging_verb::GOSSIP_GET_ENDPOINT_STATES);
|
||||
}
|
||||
future<gms::gossip_get_endpoint_states_response> messaging_service::send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request) {
|
||||
return send_message_timeout<future<gms::gossip_get_endpoint_states_response>>(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(id), std::move(timeout), std::move(request));
|
||||
}
|
||||
|
||||
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func));
|
||||
|
||||
@@ -54,6 +54,8 @@ namespace gms {
|
||||
class gossip_digest_syn;
|
||||
class gossip_digest_ack;
|
||||
class gossip_digest_ack2;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
}
|
||||
|
||||
namespace utils {
|
||||
@@ -140,7 +142,8 @@ enum class messaging_verb : int32_t {
|
||||
PAXOS_LEARN = 41,
|
||||
HINT_MUTATION = 42,
|
||||
PAXOS_PRUNE = 43,
|
||||
LAST = 44,
|
||||
GOSSIP_GET_ENDPOINT_STATES = 44,
|
||||
LAST = 45,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -417,6 +420,11 @@ public:
|
||||
future<> unregister_gossip_digest_ack2();
|
||||
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
|
||||
|
||||
// Wrapper for GOSSIP_GET_ENDPOINT_STATES
|
||||
void register_gossip_get_endpoint_states(std::function<future<gms::gossip_get_endpoint_states_response> (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func);
|
||||
future<> unregister_gossip_get_endpoint_states();
|
||||
future<gms::gossip_get_endpoint_states_response> send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request);
|
||||
|
||||
// Wrapper for DEFINITIONS_UPDATE
|
||||
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func);
|
||||
|
||||
@@ -206,12 +206,36 @@ bool get_property_load_ring_state() {
|
||||
return get_local_storage_service().db().local().get_config().load_ring_state();
|
||||
}
|
||||
|
||||
bool storage_service::should_bootstrap() const {
|
||||
return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !_gossiper.get_seeds().count(get_broadcast_address());
|
||||
bool storage_service::is_first_node() {
|
||||
if (db().local().is_replacing()) {
|
||||
return false;
|
||||
}
|
||||
auto seeds = _gossiper.get_seeds();
|
||||
if (seeds.empty()) {
|
||||
return false;
|
||||
}
|
||||
// Node with the smallest IP address is chosen as the very first node
|
||||
// in the cluster. The first node is the only node that does not
|
||||
// bootstrap in the cluser. All other nodes will bootstrap.
|
||||
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
|
||||
std::sort(sorted_seeds.begin(), sorted_seeds.end());
|
||||
if (sorted_seeds.front() == get_broadcast_address()) {
|
||||
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool storage_service::should_bootstrap() {
|
||||
return !db::system_keyspace::bootstrap_complete() && !is_first_node();
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
|
||||
void storage_service::prepare_to_join(
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
bind_messaging_port do_bind) {
|
||||
std::map<gms::application_state, gms::versioned_value> app_states;
|
||||
if (db::system_keyspace::was_decommissioned()) {
|
||||
if (db().local().get_config().override_decommission()) {
|
||||
@@ -233,62 +257,20 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
if (db::system_keyspace::bootstrap_complete()) {
|
||||
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
|
||||
}
|
||||
if (!is_auto_bootstrap()) {
|
||||
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
|
||||
}
|
||||
|
||||
std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(loaded_peer_features).get0();
|
||||
std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(initial_contact_nodes, loaded_peer_features, do_bind).get0();
|
||||
auto replace_address = db().local().get_replace_address();
|
||||
replacing_a_node_with_same_ip = replace_address && *replace_address == get_broadcast_address();
|
||||
replacing_a_node_with_diff_ip = replace_address && *replace_address != get_broadcast_address();
|
||||
} else if (should_bootstrap()) {
|
||||
check_for_endpoint_collision(loaded_peer_features).get();
|
||||
check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get();
|
||||
} else {
|
||||
auto seeds = _gossiper.get_seeds();
|
||||
auto my_ep = get_broadcast_address();
|
||||
auto local_features = _feature_service.known_feature_set();
|
||||
|
||||
if (seeds.count(my_ep)) {
|
||||
// This node is a seed node
|
||||
if (loaded_peer_features.empty()) {
|
||||
// This is a competely new seed node, skip the check
|
||||
slogger.info("Checking remote features skipped, since this node is a new seed node which knows nothing about the cluster");
|
||||
} else {
|
||||
// This is a existing seed node
|
||||
if (seeds.size() == 1) {
|
||||
// This node is the only seed node, check features with system table
|
||||
slogger.info("Checking remote features with system table, since this node is the only seed node");
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
} else {
|
||||
// More than one seed node in the seed list, do shadow round with other seed nodes
|
||||
try {
|
||||
slogger.info("Checking remote features with gossip and system tables");
|
||||
_gossiper.do_shadow_round().get();
|
||||
} catch (...) {
|
||||
slogger.info("Shadow round failed with {}, checking remote features with system tables only",
|
||||
std::current_exception());
|
||||
_gossiper.finish_shadow_round();
|
||||
}
|
||||
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
_gossiper.reset_endpoint_state_map().get();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
_gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This node is a non-seed node
|
||||
// Do shadow round to check if this node knows all the features
|
||||
// advertised by all other nodes, otherwise this node is too old
|
||||
// (missing features) to join the cluser.
|
||||
slogger.info("Checking remote features with gossip");
|
||||
_gossiper.do_shadow_round().get();
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
_gossiper.reset_endpoint_state_map().get();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
_gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes);
|
||||
_gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get();
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
_gossiper.reset_endpoint_state_map().get();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
_gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -412,14 +394,6 @@ void storage_service::join_token_ring(int delay) {
|
||||
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
|
||||
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
|
||||
std::unordered_set<inet_address> current;
|
||||
slogger.debug("Bootstrap variables: {} {} {} {}",
|
||||
is_auto_bootstrap(),
|
||||
db::system_keyspace::bootstrap_in_progress(),
|
||||
db::system_keyspace::bootstrap_complete(),
|
||||
_gossiper.get_seeds().count(get_broadcast_address()));
|
||||
if (is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) {
|
||||
slogger.info("This node will not auto bootstrap because it is configured to be a seed node.");
|
||||
}
|
||||
if (should_bootstrap()) {
|
||||
bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress();
|
||||
if (resume_bootstrap) {
|
||||
@@ -547,19 +521,6 @@ void storage_service::join_token_ring(int delay) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (!is_auto_bootstrap()) {
|
||||
slogger.warn("auto_bootstrap set to \"off\". This causes UNDEFINED BEHAVIOR. YOU MAY LOSE DATA.");
|
||||
}
|
||||
|
||||
if (!db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) {
|
||||
slogger.warn("Bootstrapping node marked as seed (present in the seed list)."
|
||||
" This can only be done for the very first node in a new cluster."
|
||||
" If this is not the first node, YOU MAY LOSE DATA."
|
||||
" Bootstrapping new nodes into an existing cluster as seeds"
|
||||
" causes UNDEFINED BEHAVIOR. DO NOT EVER do that.");
|
||||
}
|
||||
|
||||
slogger.debug("Setting tokens to {}", _bootstrap_tokens);
|
||||
// This node must know about its chosen tokens before other nodes do
|
||||
// since they may start sending writes to this node after it gossips status = NORMAL.
|
||||
@@ -962,8 +923,6 @@ void storage_service::bootstrap() {
|
||||
}
|
||||
}
|
||||
|
||||
_gossiper.check_seen_seeds();
|
||||
|
||||
_db.invoke_on_all([this] (database& db) {
|
||||
for (auto& cf : db.get_non_system_column_families()) {
|
||||
cf->notify_bootstrap_or_replace_start();
|
||||
@@ -1640,7 +1599,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) {
|
||||
// pending ranges when keyspace is chagned
|
||||
_mnotifier.local().register_listener(this);
|
||||
|
||||
std::vector<inet_address> loaded_endpoints;
|
||||
std::unordered_set<inet_address> loaded_endpoints;
|
||||
if (get_property_load_ring_state()) {
|
||||
slogger.info("Loading persisted ring state");
|
||||
auto loaded_tokens = db::system_keyspace::load_tokens().get0();
|
||||
@@ -1665,19 +1624,27 @@ future<> storage_service::init_server(bind_messaging_port do_bind) {
|
||||
if (loaded_host_ids.contains(ep)) {
|
||||
_token_metadata.update_host_id(loaded_host_ids.at(ep), ep);
|
||||
}
|
||||
loaded_endpoints.push_back(ep);
|
||||
loaded_endpoints.insert(ep);
|
||||
_gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Seeds are now only used as the initial contact point nodes. If the
|
||||
// loaded_endpoints are empty which means this node is a completely new
|
||||
// node, we use the nodes specified in seeds as the initial contact
|
||||
// point nodes, otherwise use the peer nodes persisted in system table.
|
||||
auto seeds = _gossiper.get_seeds();
|
||||
auto initial_contact_nodes = loaded_endpoints.empty() ?
|
||||
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
|
||||
loaded_endpoints;
|
||||
auto loaded_peer_features = db::system_keyspace::load_peer_features().get0();
|
||||
slogger.info("loaded_peer_features: peer_features size={}", loaded_peer_features.size());
|
||||
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
|
||||
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("loaded_peer_features: peer={}, supported_features={}", x.first, x.second);
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
|
||||
prepare_to_join(std::move(loaded_endpoints), loaded_peer_features, do_bind);
|
||||
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), do_bind);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1742,16 +1709,16 @@ future<> storage_service::stop() {
|
||||
});
|
||||
}
|
||||
|
||||
future<> storage_service::check_for_endpoint_collision(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
|
||||
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
|
||||
slogger.debug("Starting shadow gossip round to check for endpoint collision");
|
||||
|
||||
return seastar::async([this, loaded_peer_features] {
|
||||
return seastar::async([this, initial_contact_nodes, loaded_peer_features, do_bind] {
|
||||
auto t = gms::gossiper::clk::now();
|
||||
bool found_bootstrapping_node = false;
|
||||
auto local_features = _feature_service.known_feature_set();
|
||||
do {
|
||||
slogger.info("Checking remote features with gossip");
|
||||
_gossiper.do_shadow_round().get();
|
||||
_gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get();
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
auto addr = get_broadcast_address();
|
||||
if (!_gossiper.is_safe_for_bootstrap(addr)) {
|
||||
@@ -1804,7 +1771,7 @@ void storage_service::remove_endpoint(inet_address endpoint) {
|
||||
}
|
||||
|
||||
future<storage_service::replacement_info>
|
||||
storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
|
||||
storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
|
||||
if (!db().local().get_replace_address()) {
|
||||
throw std::runtime_error(format("replace_address is empty"));
|
||||
}
|
||||
@@ -1820,7 +1787,7 @@ storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_add
|
||||
|
||||
// make magic happen
|
||||
slogger.info("Checking remote features with gossip");
|
||||
return _gossiper.do_shadow_round().then([this, loaded_peer_features, replace_address] {
|
||||
return _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).then([this, loaded_peer_features, replace_address] {
|
||||
auto local_features = _feature_service.known_feature_set();
|
||||
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
|
||||
|
||||
|
||||
@@ -303,12 +303,14 @@ private:
|
||||
|
||||
// Tokens and the CDC streams timestamp of the replaced node.
|
||||
using replacement_info = std::pair<std::unordered_set<token>, std::optional<db_clock::time_point>>;
|
||||
future<replacement_info> prepare_replacement_info(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
|
||||
public:
|
||||
future<bool> is_initialized();
|
||||
|
||||
future<> check_for_endpoint_collision(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
|
||||
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
|
||||
/*!
|
||||
* \brief Init the messaging service part of the service.
|
||||
@@ -345,8 +347,13 @@ public:
|
||||
void flush_column_families();
|
||||
|
||||
private:
|
||||
bool should_bootstrap() const;
|
||||
void prepare_to_join(std::vector<inet_address> loaded_endpoints, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
bool should_bootstrap();
|
||||
bool is_first_node();
|
||||
void prepare_to_join(
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
bind_messaging_port do_bind = bind_messaging_port::yes);
|
||||
void join_token_ring(int delay);
|
||||
void maybe_start_sys_dist_ks();
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user