From d0b3f3dfe80cfd52adbbd002dd3a8794cfb44fb5 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Jul 2020 10:01:49 +0800 Subject: [PATCH] gossip: Get rid of seed concept The concept of seed and the different behaviour between seed nodes and non seed nodes generate a lot of confusion, complication and error for users. For example, how to add a seed node into into a cluster, how to promote a non seed node to a seed node, how to choose seeds node in multiple DC setup, edit config files for seeds, why seed node does not bootstrap. If we remove the concept of seed, it will get much easier for users. After this series, seed config option is only used once when a new node joins a cluster. Major changes: - Seed nodes are only used as the initial contact point nodes. - Seed nodes now perform bootstrap. The only exception is the first node in the cluster. - The unsafe auto_bootstrap option is now ignored. - Gossip shadow round now attempts to talk to all nodes instead of just seed nodes. Manual test: - bootstrap n1, n2, n3 (n1 and n2 are listed as seed, check only n1 will skip bootstrap, n2 and n3 will bootstrap) - shtudown n1, n2, n3 - start n2 (check non seed node can boot) - start n1 (check n1 talks to both n2 and n3) - start n3 (check n3 talks to both n1 and n3) Upgrade/Downgrade test: - Initialize cluster Start 3 node with n1, n2, n3 using old version n1 and n2 are listed as seed - Test upgrade starting from seed nodes Rolling restart n1 using new version Rolling restart n2 using new version Rolling restart n3 using new version - Test downgrade to old version Rolling restart n1 using old version Rolling restart n2 using old version Rolling restart n3 using old version - Test upgrade starting from non seed nodes Rolling restart n3 using new version Rolling restart n2 using new version Rolling restart n1 using new version Notes on upgrade procedure: There is no special procedure needed to upgrade to Scylla without seed concept. Rolling upgrade node one by one is good enough. Fixes: #6845 Tests: ./test.py + update_cluster_layout_tests.py + manual test --- gms/gossiper.cc | 98 +++++++++++++++++++------- gms/gossiper.hh | 2 +- service/storage_service.cc | 141 ++++++++++++++----------------------- service/storage_service.hh | 15 ++-- 4 files changed, 139 insertions(+), 117 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 3f47a0de54..701b8065f9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1763,38 +1763,86 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map gossiper::do_shadow_round() { - return seastar::async([this, g = this->shared_from_this()] { - build_seeds_list(); - _in_shadow_round = true; - auto t = clk::now(); - - // When peer node receives a syn message, it will send back a ack message. - // So, we need to register gossip message handlers before sending syn message. - get_gossiper().invoke_on_all([] (gossiper& g) { - return g.init_messaging_service_handler(); +future<> gossiper::do_shadow_round(std::unordered_set nodes, bind_messaging_port do_bind) { + return seastar::async([this, g = this->shared_from_this(), nodes = std::move(nodes), do_bind] () mutable { + get_gossiper().invoke_on_all([do_bind] (gossiper& g) { + return g.init_messaging_service_handler(do_bind); }).get(); - - while (this->_in_shadow_round) { - // send a completely empty syn - for (inet_address seed : _seeds) { - utils::chunked_vector digests; - gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); - auto id = get_msg_addr(seed); - logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - // Do it in the background. - (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { - logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); + nodes.erase(get_broadcast_address()); + gossip_get_endpoint_states_request request{{ + gms::application_state::STATUS, + gms::application_state::HOST_ID, + gms::application_state::TOKENS, + gms::application_state::SUPPORTED_FEATURES}}; + logger.info("Gossip shadow round started with nodes={}", nodes); + std::unordered_set nodes_talked; + size_t nodes_down = 0; + auto start_time = clk::now(); + bool fall_back_to_syn_msg = false; + std::list responses; + for (;;) { + parallel_for_each(nodes.begin(), nodes.end(), [this, &request, &responses, &nodes_talked, &nodes_down, &fall_back_to_syn_msg] (gms::inet_address node) { + logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states); + return ms().send_gossip_get_endpoint_states(msg_addr(node), std::chrono::milliseconds(5000), request).then( + [node, &nodes_talked, &responses] (gms::gossip_get_endpoint_states_response response) { + logger.debug("Got get_endpoint_states response from {}, response={}", node, response.endpoint_state_map); + responses.push_back(std::move(response)); + nodes_talked.insert(node); + }).handle_exception_type([node, &fall_back_to_syn_msg] (seastar::rpc::unknown_verb_error&) { + logger.warn("Node {} does not support get_endpoint_states verb", node); + fall_back_to_syn_msg = true; + }).handle_exception_type([node, &nodes_down] (seastar::rpc::closed_error&) { + nodes_down++; + logger.warn("Node {} is down for get_endpoint_states verb", node); }); + }).get(); + for (auto& response : responses) { + apply_state_locally_without_listener_notification(response.endpoint_state_map); + } + if (!nodes_talked.empty()) { + break; + } + if (nodes_down == nodes.size()) { + logger.warn("All nodes={} are down for get_endpoint_states verb. Skip ShadowRound.", nodes); + break; + } + if (fall_back_to_syn_msg) { + break; + } + if (clk::now() > start_time + std::chrono::milliseconds(_cfg.shadow_round_ms())) { + throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound).", nodes)); } sleep_abortable(std::chrono::seconds(1), _abort_source).get(); - if (this->_in_shadow_round) { - if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) { - throw std::runtime_error(format("Unable to gossip with any seeds (ShadowRound)")); + logger.info("Connect nodes={} again ... ({} seconds passed)", + nodes, std::chrono::duration_cast(clk::now() - start_time).count()); + } + if (fall_back_to_syn_msg) { + logger.info("Fallback to old method for ShadowRound"); + auto t = clk::now(); + _in_shadow_round = true; + while (this->_in_shadow_round) { + // send a completely empty syn + for (const auto& node : nodes) { + utils::chunked_vector digests; + gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); + auto id = get_msg_addr(node); + logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); + // Do it in the background. + (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); + }); + } + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + if (this->_in_shadow_round) { + if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) { + throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound),", nodes)); + } + logger.info("Connect nodes={} again ... ({} seconds passed)", + nodes, std::chrono::duration_cast(clk::now() - t).count()); } - logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); } } + logger.info("Gossip shadow round finisehd with nodes_talked={}", nodes_talked); }); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 2e52cdce99..59d32df295 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -485,7 +485,7 @@ public: * Do a single 'shadow' round of gossip, where we do not modify any state * Only used when replacing a node, to get and assume its states */ - future<> do_shadow_round(); + future<> do_shadow_round(std::unordered_set nodes = {}, bind_messaging_port do_bind = bind_messaging_port::yes); private: void build_seeds_list(); diff --git a/service/storage_service.cc b/service/storage_service.cc index fab1572a8b..d164d12922 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -206,12 +206,36 @@ bool get_property_load_ring_state() { return get_local_storage_service().db().local().get_config().load_ring_state(); } -bool storage_service::should_bootstrap() const { - return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !_gossiper.get_seeds().count(get_broadcast_address()); +bool storage_service::is_first_node() { + if (db().local().is_replacing()) { + return false; + } + auto seeds = _gossiper.get_seeds(); + if (seeds.empty()) { + return false; + } + // Node with the smallest IP address is chosen as the very first node + // in the cluster. The first node is the only node that does not + // bootstrap in the cluser. All other nodes will bootstrap. + std::vector sorted_seeds(seeds.begin(), seeds.end()); + std::sort(sorted_seeds.begin(), sorted_seeds.end()); + if (sorted_seeds.front() == get_broadcast_address()) { + slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address()); + return true; + } + return false; +} + +bool storage_service::should_bootstrap() { + return !db::system_keyspace::bootstrap_complete() && !is_first_node(); } // Runs inside seastar::async context -void storage_service::prepare_to_join(std::vector loaded_endpoints, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { +void storage_service::prepare_to_join( + std::unordered_set initial_contact_nodes, + std::unordered_set loaded_endpoints, + std::unordered_map loaded_peer_features, + bind_messaging_port do_bind) { std::map app_states; if (db::system_keyspace::was_decommissioned()) { if (db().local().get_config().override_decommission()) { @@ -233,62 +257,20 @@ void storage_service::prepare_to_join(std::vector loaded_endpoints if (db::system_keyspace::bootstrap_complete()) { throw std::runtime_error("Cannot replace address with a node that is already bootstrapped"); } - if (!is_auto_bootstrap()) { - throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); - } - - std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(loaded_peer_features).get0(); + std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(initial_contact_nodes, loaded_peer_features, do_bind).get0(); auto replace_address = db().local().get_replace_address(); replacing_a_node_with_same_ip = replace_address && *replace_address == get_broadcast_address(); replacing_a_node_with_diff_ip = replace_address && *replace_address != get_broadcast_address(); } else if (should_bootstrap()) { - check_for_endpoint_collision(loaded_peer_features).get(); + check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get(); } else { - auto seeds = _gossiper.get_seeds(); - auto my_ep = get_broadcast_address(); auto local_features = _feature_service.known_feature_set(); - - if (seeds.count(my_ep)) { - // This node is a seed node - if (loaded_peer_features.empty()) { - // This is a competely new seed node, skip the check - slogger.info("Checking remote features skipped, since this node is a new seed node which knows nothing about the cluster"); - } else { - // This is a existing seed node - if (seeds.size() == 1) { - // This node is the only seed node, check features with system table - slogger.info("Checking remote features with system table, since this node is the only seed node"); - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - } else { - // More than one seed node in the seed list, do shadow round with other seed nodes - try { - slogger.info("Checking remote features with gossip and system tables"); - _gossiper.do_shadow_round().get(); - } catch (...) { - slogger.info("Shadow round failed with {}, checking remote features with system tables only", - std::current_exception()); - _gossiper.finish_shadow_round(); - } - - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - _gossiper.reset_endpoint_state_map().get(); - for (auto ep : loaded_endpoints) { - _gossiper.add_saved_endpoint(ep); - } - } - } - } else { - // This node is a non-seed node - // Do shadow round to check if this node knows all the features - // advertised by all other nodes, otherwise this node is too old - // (missing features) to join the cluser. - slogger.info("Checking remote features with gossip"); - _gossiper.do_shadow_round().get(); - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - _gossiper.reset_endpoint_state_map().get(); - for (auto ep : loaded_endpoints) { - _gossiper.add_saved_endpoint(ep); - } + slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes); + _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get(); + _gossiper.check_knows_remote_features(local_features, loaded_peer_features); + _gossiper.reset_endpoint_state_map().get(); + for (auto ep : loaded_endpoints) { + _gossiper.add_saved_endpoint(ep); } } @@ -412,14 +394,6 @@ void storage_service::join_token_ring(int delay) { // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. std::unordered_set current; - slogger.debug("Bootstrap variables: {} {} {} {}", - is_auto_bootstrap(), - db::system_keyspace::bootstrap_in_progress(), - db::system_keyspace::bootstrap_complete(), - _gossiper.get_seeds().count(get_broadcast_address())); - if (is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) { - slogger.info("This node will not auto bootstrap because it is configured to be a seed node."); - } if (should_bootstrap()) { bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress(); if (resume_bootstrap) { @@ -547,19 +521,6 @@ void storage_service::join_token_ring(int delay) { } } - - if (!is_auto_bootstrap()) { - slogger.warn("auto_bootstrap set to \"off\". This causes UNDEFINED BEHAVIOR. YOU MAY LOSE DATA."); - } - - if (!db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) { - slogger.warn("Bootstrapping node marked as seed (present in the seed list)." - " This can only be done for the very first node in a new cluster." - " If this is not the first node, YOU MAY LOSE DATA." - " Bootstrapping new nodes into an existing cluster as seeds" - " causes UNDEFINED BEHAVIOR. DO NOT EVER do that."); - } - slogger.debug("Setting tokens to {}", _bootstrap_tokens); // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. @@ -962,8 +923,6 @@ void storage_service::bootstrap() { } } - _gossiper.check_seen_seeds(); - _db.invoke_on_all([this] (database& db) { for (auto& cf : db.get_non_system_column_families()) { cf->notify_bootstrap_or_replace_start(); @@ -1643,7 +1602,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) { // pending ranges when keyspace is chagned _mnotifier.local().register_listener(this); - std::vector loaded_endpoints; + std::unordered_set loaded_endpoints; if (get_property_load_ring_state()) { slogger.info("Loading persisted ring state"); auto loaded_tokens = db::system_keyspace::load_tokens().get0(); @@ -1668,19 +1627,27 @@ future<> storage_service::init_server(bind_messaging_port do_bind) { if (loaded_host_ids.count(ep)) { _token_metadata.update_host_id(loaded_host_ids.at(ep), ep); } - loaded_endpoints.push_back(ep); + loaded_endpoints.insert(ep); _gossiper.add_saved_endpoint(ep); } } } + // Seeds are now only used as the initial contact point nodes. If the + // loaded_endpoints are empty which means this node is a completely new + // node, we use the nodes specified in seeds as the initial contact + // point nodes, otherwise use the peer nodes persisted in system table. + auto seeds = _gossiper.get_seeds(); + auto initial_contact_nodes = loaded_endpoints.empty() ? + std::unordered_set(seeds.begin(), seeds.end()) : + loaded_endpoints; auto loaded_peer_features = db::system_keyspace::load_peer_features().get0(); - slogger.info("loaded_peer_features: peer_features size={}", loaded_peer_features.size()); + slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", + initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { - slogger.info("loaded_peer_features: peer={}, supported_features={}", x.first, x.second); + slogger.info("peer={}, supported_features={}", x.first, x.second); } - - prepare_to_join(std::move(loaded_endpoints), loaded_peer_features, do_bind); + prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), do_bind); }); } @@ -1745,16 +1712,16 @@ future<> storage_service::stop() { }); } -future<> storage_service::check_for_endpoint_collision(const std::unordered_map& loaded_peer_features) { +future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { slogger.debug("Starting shadow gossip round to check for endpoint collision"); - return seastar::async([this, loaded_peer_features] { + return seastar::async([this, initial_contact_nodes, loaded_peer_features, do_bind] { auto t = gms::gossiper::clk::now(); bool found_bootstrapping_node = false; auto local_features = _feature_service.known_feature_set(); do { slogger.info("Checking remote features with gossip"); - _gossiper.do_shadow_round().get(); + _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get(); _gossiper.check_knows_remote_features(local_features, loaded_peer_features); auto addr = get_broadcast_address(); if (!_gossiper.is_safe_for_bootstrap(addr)) { @@ -1807,7 +1774,7 @@ void storage_service::remove_endpoint(inet_address endpoint) { } future -storage_service::prepare_replacement_info(const std::unordered_map& loaded_peer_features) { +storage_service::prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { if (!db().local().get_replace_address()) { throw std::runtime_error(format("replace_address is empty")); } @@ -1823,7 +1790,7 @@ storage_service::prepare_replacement_info(const std::unordered_map, std::optional>; - future prepare_replacement_info(const std::unordered_map& loaded_peer_features); + future prepare_replacement_info(std::unordered_set initial_contact_nodes, + const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); public: future is_initialized(); - future<> check_for_endpoint_collision(const std::unordered_map& loaded_peer_features); + future<> check_for_endpoint_collision(std::unordered_set initial_contact_nodes, + const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); /*! * \brief Init the messaging service part of the service. @@ -345,8 +347,13 @@ public: void flush_column_families(); private: - bool should_bootstrap() const; - void prepare_to_join(std::vector loaded_endpoints, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + bool should_bootstrap(); + bool is_first_node(); + void prepare_to_join( + std::unordered_set initial_contact_nodes, + std::unordered_set loaded_endpoints, + std::unordered_map loaded_peer_features, + bind_messaging_port do_bind = bind_messaging_port::yes); void join_token_ring(int delay); void maybe_start_sys_dist_ks(); public: