gossip: Get rid of seed concept

The concept of seed and the different behaviour between seed nodes and
non seed nodes generate a lot of confusion, complication and error for
users. For example, how to add a seed node into into a cluster, how to
promote a non seed node to a seed node, how to choose seeds node in
multiple DC setup, edit config files for seeds, why seed node does not
bootstrap.

If we remove the concept of seed, it will get much easier for users.
After this series, seed config option is only used once when a new node
joins a cluster.

Major changes:

- Seed nodes are only used as the initial contact point nodes.

- Seed nodes now perform bootstrap. The only exception is the first node
  in the cluster.

- The unsafe auto_bootstrap option is now ignored.

- Gossip shadow round now attempts to talk to all nodes instead of just seed nodes.

Manual test:

- bootstrap n1, n2, n3  (n1 and n2 are listed as seed, check only n1
  will skip bootstrap, n2 and n3 will bootstrap)
- shtudown n1, n2, n3
- start n2 (check non seed node can boot)
- start n1 (check n1 talks to both n2 and n3)
- start n3 (check n3 talks to both n1 and n3)

Upgrade/Downgrade test:

- Initialize cluster
  Start 3 node with n1, n2, n3 using old version
  n1 and n2 are listed as seed

- Test upgrade starting from seed nodes
  Rolling restart n1 using new version
  Rolling restart n2 using new version
  Rolling restart n3 using new version

- Test downgrade to old version
  Rolling restart n1 using old version
  Rolling restart n2 using old version
  Rolling restart n3 using old version

- Test upgrade starting from non seed nodes
  Rolling restart n3 using new version
  Rolling restart n2 using new version
  Rolling restart n1 using new version

Notes on upgrade procedure:

There is no special procedure needed to upgrade to Scylla without seed
concept. Rolling upgrade node one by one is good enough.

Fixes: #6845
Tests: ./test.py + update_cluster_layout_tests.py + manual test
This commit is contained in:
Asias He
2020-07-22 10:01:49 +08:00
parent cd7d64f588
commit d0b3f3dfe8
4 changed files with 139 additions and 117 deletions

View File

@@ -1763,38 +1763,86 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
});
}
future<> gossiper::do_shadow_round() {
return seastar::async([this, g = this->shared_from_this()] {
build_seeds_list();
_in_shadow_round = true;
auto t = clk::now();
// When peer node receives a syn message, it will send back a ack message.
// So, we need to register gossip message handlers before sending syn message.
get_gossiper().invoke_on_all([] (gossiper& g) {
return g.init_messaging_service_handler();
future<> gossiper::do_shadow_round(std::unordered_set<gms::inet_address> nodes, bind_messaging_port do_bind) {
return seastar::async([this, g = this->shared_from_this(), nodes = std::move(nodes), do_bind] () mutable {
get_gossiper().invoke_on_all([do_bind] (gossiper& g) {
return g.init_messaging_service_handler(do_bind);
}).get();
while (this->_in_shadow_round) {
// send a completely empty syn
for (inet_address seed : _seeds) {
utils::chunked_vector<gossip_digest> digests;
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
auto id = get_msg_addr(seed);
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
// Do it in the background.
(void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
nodes.erase(get_broadcast_address());
gossip_get_endpoint_states_request request{{
gms::application_state::STATUS,
gms::application_state::HOST_ID,
gms::application_state::TOKENS,
gms::application_state::SUPPORTED_FEATURES}};
logger.info("Gossip shadow round started with nodes={}", nodes);
std::unordered_set<gms::inet_address> nodes_talked;
size_t nodes_down = 0;
auto start_time = clk::now();
bool fall_back_to_syn_msg = false;
std::list<gms::gossip_get_endpoint_states_response> responses;
for (;;) {
parallel_for_each(nodes.begin(), nodes.end(), [this, &request, &responses, &nodes_talked, &nodes_down, &fall_back_to_syn_msg] (gms::inet_address node) {
logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states);
return ms().send_gossip_get_endpoint_states(msg_addr(node), std::chrono::milliseconds(5000), request).then(
[node, &nodes_talked, &responses] (gms::gossip_get_endpoint_states_response response) {
logger.debug("Got get_endpoint_states response from {}, response={}", node, response.endpoint_state_map);
responses.push_back(std::move(response));
nodes_talked.insert(node);
}).handle_exception_type([node, &fall_back_to_syn_msg] (seastar::rpc::unknown_verb_error&) {
logger.warn("Node {} does not support get_endpoint_states verb", node);
fall_back_to_syn_msg = true;
}).handle_exception_type([node, &nodes_down] (seastar::rpc::closed_error&) {
nodes_down++;
logger.warn("Node {} is down for get_endpoint_states verb", node);
});
}).get();
for (auto& response : responses) {
apply_state_locally_without_listener_notification(response.endpoint_state_map);
}
if (!nodes_talked.empty()) {
break;
}
if (nodes_down == nodes.size()) {
logger.warn("All nodes={} are down for get_endpoint_states verb. Skip ShadowRound.", nodes);
break;
}
if (fall_back_to_syn_msg) {
break;
}
if (clk::now() > start_time + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound).", nodes));
}
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
if (this->_in_shadow_round) {
if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
throw std::runtime_error(format("Unable to gossip with any seeds (ShadowRound)"));
logger.info("Connect nodes={} again ... ({} seconds passed)",
nodes, std::chrono::duration_cast<std::chrono::seconds>(clk::now() - start_time).count());
}
if (fall_back_to_syn_msg) {
logger.info("Fallback to old method for ShadowRound");
auto t = clk::now();
_in_shadow_round = true;
while (this->_in_shadow_round) {
// send a completely empty syn
for (const auto& node : nodes) {
utils::chunked_vector<gossip_digest> digests;
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
auto id = get_msg_addr(node);
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
// Do it in the background.
(void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
});
}
sleep_abortable(std::chrono::seconds(1), _abort_source).get();
if (this->_in_shadow_round) {
if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) {
throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound),", nodes));
}
logger.info("Connect nodes={} again ... ({} seconds passed)",
nodes, std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
}
logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
}
}
logger.info("Gossip shadow round finisehd with nodes_talked={}", nodes_talked);
});
}

View File

@@ -485,7 +485,7 @@ public:
* Do a single 'shadow' round of gossip, where we do not modify any state
* Only used when replacing a node, to get and assume its states
*/
future<> do_shadow_round();
future<> do_shadow_round(std::unordered_set<gms::inet_address> nodes = {}, bind_messaging_port do_bind = bind_messaging_port::yes);
private:
void build_seeds_list();

View File

@@ -206,12 +206,36 @@ bool get_property_load_ring_state() {
return get_local_storage_service().db().local().get_config().load_ring_state();
}
bool storage_service::should_bootstrap() const {
return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !_gossiper.get_seeds().count(get_broadcast_address());
bool storage_service::is_first_node() {
if (db().local().is_replacing()) {
return false;
}
auto seeds = _gossiper.get_seeds();
if (seeds.empty()) {
return false;
}
// Node with the smallest IP address is chosen as the very first node
// in the cluster. The first node is the only node that does not
// bootstrap in the cluser. All other nodes will bootstrap.
std::vector<gms::inet_address> sorted_seeds(seeds.begin(), seeds.end());
std::sort(sorted_seeds.begin(), sorted_seeds.end());
if (sorted_seeds.front() == get_broadcast_address()) {
slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address());
return true;
}
return false;
}
bool storage_service::should_bootstrap() {
return !db::system_keyspace::bootstrap_complete() && !is_first_node();
}
// Runs inside seastar::async context
void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
void storage_service::prepare_to_join(
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
bind_messaging_port do_bind) {
std::map<gms::application_state, gms::versioned_value> app_states;
if (db::system_keyspace::was_decommissioned()) {
if (db().local().get_config().override_decommission()) {
@@ -233,62 +257,20 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
if (db::system_keyspace::bootstrap_complete()) {
throw std::runtime_error("Cannot replace address with a node that is already bootstrapped");
}
if (!is_auto_bootstrap()) {
throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
}
std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(loaded_peer_features).get0();
std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(initial_contact_nodes, loaded_peer_features, do_bind).get0();
auto replace_address = db().local().get_replace_address();
replacing_a_node_with_same_ip = replace_address && *replace_address == get_broadcast_address();
replacing_a_node_with_diff_ip = replace_address && *replace_address != get_broadcast_address();
} else if (should_bootstrap()) {
check_for_endpoint_collision(loaded_peer_features).get();
check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get();
} else {
auto seeds = _gossiper.get_seeds();
auto my_ep = get_broadcast_address();
auto local_features = _feature_service.known_feature_set();
if (seeds.count(my_ep)) {
// This node is a seed node
if (loaded_peer_features.empty()) {
// This is a competely new seed node, skip the check
slogger.info("Checking remote features skipped, since this node is a new seed node which knows nothing about the cluster");
} else {
// This is a existing seed node
if (seeds.size() == 1) {
// This node is the only seed node, check features with system table
slogger.info("Checking remote features with system table, since this node is the only seed node");
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
} else {
// More than one seed node in the seed list, do shadow round with other seed nodes
try {
slogger.info("Checking remote features with gossip and system tables");
_gossiper.do_shadow_round().get();
} catch (...) {
slogger.info("Shadow round failed with {}, checking remote features with system tables only",
std::current_exception());
_gossiper.finish_shadow_round();
}
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
_gossiper.reset_endpoint_state_map().get();
for (auto ep : loaded_endpoints) {
_gossiper.add_saved_endpoint(ep);
}
}
}
} else {
// This node is a non-seed node
// Do shadow round to check if this node knows all the features
// advertised by all other nodes, otherwise this node is too old
// (missing features) to join the cluser.
slogger.info("Checking remote features with gossip");
_gossiper.do_shadow_round().get();
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
_gossiper.reset_endpoint_state_map().get();
for (auto ep : loaded_endpoints) {
_gossiper.add_saved_endpoint(ep);
}
slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes);
_gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get();
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
_gossiper.reset_endpoint_state_map().get();
for (auto ep : loaded_endpoints) {
_gossiper.add_saved_endpoint(ep);
}
}
@@ -412,14 +394,6 @@ void storage_service::join_token_ring(int delay) {
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
std::unordered_set<inet_address> current;
slogger.debug("Bootstrap variables: {} {} {} {}",
is_auto_bootstrap(),
db::system_keyspace::bootstrap_in_progress(),
db::system_keyspace::bootstrap_complete(),
_gossiper.get_seeds().count(get_broadcast_address()));
if (is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) {
slogger.info("This node will not auto bootstrap because it is configured to be a seed node.");
}
if (should_bootstrap()) {
bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress();
if (resume_bootstrap) {
@@ -547,19 +521,6 @@ void storage_service::join_token_ring(int delay) {
}
}
if (!is_auto_bootstrap()) {
slogger.warn("auto_bootstrap set to \"off\". This causes UNDEFINED BEHAVIOR. YOU MAY LOSE DATA.");
}
if (!db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) {
slogger.warn("Bootstrapping node marked as seed (present in the seed list)."
" This can only be done for the very first node in a new cluster."
" If this is not the first node, YOU MAY LOSE DATA."
" Bootstrapping new nodes into an existing cluster as seeds"
" causes UNDEFINED BEHAVIOR. DO NOT EVER do that.");
}
slogger.debug("Setting tokens to {}", _bootstrap_tokens);
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
@@ -962,8 +923,6 @@ void storage_service::bootstrap() {
}
}
_gossiper.check_seen_seeds();
_db.invoke_on_all([this] (database& db) {
for (auto& cf : db.get_non_system_column_families()) {
cf->notify_bootstrap_or_replace_start();
@@ -1643,7 +1602,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) {
// pending ranges when keyspace is chagned
_mnotifier.local().register_listener(this);
std::vector<inet_address> loaded_endpoints;
std::unordered_set<inet_address> loaded_endpoints;
if (get_property_load_ring_state()) {
slogger.info("Loading persisted ring state");
auto loaded_tokens = db::system_keyspace::load_tokens().get0();
@@ -1668,19 +1627,27 @@ future<> storage_service::init_server(bind_messaging_port do_bind) {
if (loaded_host_ids.count(ep)) {
_token_metadata.update_host_id(loaded_host_ids.at(ep), ep);
}
loaded_endpoints.push_back(ep);
loaded_endpoints.insert(ep);
_gossiper.add_saved_endpoint(ep);
}
}
}
// Seeds are now only used as the initial contact point nodes. If the
// loaded_endpoints are empty which means this node is a completely new
// node, we use the nodes specified in seeds as the initial contact
// point nodes, otherwise use the peer nodes persisted in system table.
auto seeds = _gossiper.get_seeds();
auto initial_contact_nodes = loaded_endpoints.empty() ?
std::unordered_set<gms::inet_address>(seeds.begin(), seeds.end()) :
loaded_endpoints;
auto loaded_peer_features = db::system_keyspace::load_peer_features().get0();
slogger.info("loaded_peer_features: peer_features size={}", loaded_peer_features.size());
slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}",
initial_contact_nodes, loaded_endpoints, loaded_peer_features.size());
for (auto& x : loaded_peer_features) {
slogger.info("loaded_peer_features: peer={}, supported_features={}", x.first, x.second);
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
prepare_to_join(std::move(loaded_endpoints), loaded_peer_features, do_bind);
prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), do_bind);
});
}
@@ -1745,16 +1712,16 @@ future<> storage_service::stop() {
});
}
future<> storage_service::check_for_endpoint_collision(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
future<> storage_service::check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
slogger.debug("Starting shadow gossip round to check for endpoint collision");
return seastar::async([this, loaded_peer_features] {
return seastar::async([this, initial_contact_nodes, loaded_peer_features, do_bind] {
auto t = gms::gossiper::clk::now();
bool found_bootstrapping_node = false;
auto local_features = _feature_service.known_feature_set();
do {
slogger.info("Checking remote features with gossip");
_gossiper.do_shadow_round().get();
_gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get();
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);
auto addr = get_broadcast_address();
if (!_gossiper.is_safe_for_bootstrap(addr)) {
@@ -1807,7 +1774,7 @@ void storage_service::remove_endpoint(inet_address endpoint) {
}
future<storage_service::replacement_info>
storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features) {
storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind) {
if (!db().local().get_replace_address()) {
throw std::runtime_error(format("replace_address is empty"));
}
@@ -1823,7 +1790,7 @@ storage_service::prepare_replacement_info(const std::unordered_map<gms::inet_add
// make magic happen
slogger.info("Checking remote features with gossip");
return _gossiper.do_shadow_round().then([this, loaded_peer_features, replace_address] {
return _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).then([this, loaded_peer_features, replace_address] {
auto local_features = _feature_service.known_feature_set();
_gossiper.check_knows_remote_features(local_features, loaded_peer_features);

View File

@@ -303,12 +303,14 @@ private:
// Tokens and the CDC streams timestamp of the replaced node.
using replacement_info = std::pair<std::unordered_set<token>, std::optional<db_clock::time_point>>;
future<replacement_info> prepare_replacement_info(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
future<replacement_info> prepare_replacement_info(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
public:
future<bool> is_initialized();
future<> check_for_endpoint_collision(const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
/*!
* \brief Init the messaging service part of the service.
@@ -345,8 +347,13 @@ public:
void flush_column_families();
private:
bool should_bootstrap() const;
void prepare_to_join(std::vector<inet_address> loaded_endpoints, const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes);
bool should_bootstrap();
bool is_first_node();
void prepare_to_join(
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
bind_messaging_port do_bind = bind_messaging_port::yes);
void join_token_ring(int delay);
void maybe_start_sys_dist_ks();
public: