diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 3f47a0de54..701b8065f9 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1763,38 +1763,86 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map gossiper::do_shadow_round() { - return seastar::async([this, g = this->shared_from_this()] { - build_seeds_list(); - _in_shadow_round = true; - auto t = clk::now(); - - // When peer node receives a syn message, it will send back a ack message. - // So, we need to register gossip message handlers before sending syn message. - get_gossiper().invoke_on_all([] (gossiper& g) { - return g.init_messaging_service_handler(); +future<> gossiper::do_shadow_round(std::unordered_set nodes, bind_messaging_port do_bind) { + return seastar::async([this, g = this->shared_from_this(), nodes = std::move(nodes), do_bind] () mutable { + get_gossiper().invoke_on_all([do_bind] (gossiper& g) { + return g.init_messaging_service_handler(do_bind); }).get(); - - while (this->_in_shadow_round) { - // send a completely empty syn - for (inet_address seed : _seeds) { - utils::chunked_vector digests; - gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); - auto id = get_msg_addr(seed); - logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); - // Do it in the background. - (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { - logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); + nodes.erase(get_broadcast_address()); + gossip_get_endpoint_states_request request{{ + gms::application_state::STATUS, + gms::application_state::HOST_ID, + gms::application_state::TOKENS, + gms::application_state::SUPPORTED_FEATURES}}; + logger.info("Gossip shadow round started with nodes={}", nodes); + std::unordered_set nodes_talked; + size_t nodes_down = 0; + auto start_time = clk::now(); + bool fall_back_to_syn_msg = false; + std::list responses; + for (;;) { + parallel_for_each(nodes.begin(), nodes.end(), [this, &request, &responses, &nodes_talked, &nodes_down, &fall_back_to_syn_msg] (gms::inet_address node) { + logger.debug("Sent get_endpoint_states request to {}, request={}", node, request.application_states); + return ms().send_gossip_get_endpoint_states(msg_addr(node), std::chrono::milliseconds(5000), request).then( + [node, &nodes_talked, &responses] (gms::gossip_get_endpoint_states_response response) { + logger.debug("Got get_endpoint_states response from {}, response={}", node, response.endpoint_state_map); + responses.push_back(std::move(response)); + nodes_talked.insert(node); + }).handle_exception_type([node, &fall_back_to_syn_msg] (seastar::rpc::unknown_verb_error&) { + logger.warn("Node {} does not support get_endpoint_states verb", node); + fall_back_to_syn_msg = true; + }).handle_exception_type([node, &nodes_down] (seastar::rpc::closed_error&) { + nodes_down++; + logger.warn("Node {} is down for get_endpoint_states verb", node); }); + }).get(); + for (auto& response : responses) { + apply_state_locally_without_listener_notification(response.endpoint_state_map); + } + if (!nodes_talked.empty()) { + break; + } + if (nodes_down == nodes.size()) { + logger.warn("All nodes={} are down for get_endpoint_states verb. Skip ShadowRound.", nodes); + break; + } + if (fall_back_to_syn_msg) { + break; + } + if (clk::now() > start_time + std::chrono::milliseconds(_cfg.shadow_round_ms())) { + throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound).", nodes)); } sleep_abortable(std::chrono::seconds(1), _abort_source).get(); - if (this->_in_shadow_round) { - if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) { - throw std::runtime_error(format("Unable to gossip with any seeds (ShadowRound)")); + logger.info("Connect nodes={} again ... ({} seconds passed)", + nodes, std::chrono::duration_cast(clk::now() - start_time).count()); + } + if (fall_back_to_syn_msg) { + logger.info("Fallback to old method for ShadowRound"); + auto t = clk::now(); + _in_shadow_round = true; + while (this->_in_shadow_round) { + // send a completely empty syn + for (const auto& node : nodes) { + utils::chunked_vector digests; + gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests); + auto id = get_msg_addr(node); + logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id); + // Do it in the background. + (void)ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) { + logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep); + }); + } + sleep_abortable(std::chrono::seconds(1), _abort_source).get(); + if (this->_in_shadow_round) { + if (clk::now() > t + std::chrono::milliseconds(_cfg.shadow_round_ms())) { + throw std::runtime_error(format("Unable to gossip with any nodes={} (ShadowRound),", nodes)); + } + logger.info("Connect nodes={} again ... ({} seconds passed)", + nodes, std::chrono::duration_cast(clk::now() - t).count()); } - logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast(clk::now() - t).count()); } } + logger.info("Gossip shadow round finisehd with nodes_talked={}", nodes_talked); }); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 2e52cdce99..59d32df295 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -485,7 +485,7 @@ public: * Do a single 'shadow' round of gossip, where we do not modify any state * Only used when replacing a node, to get and assume its states */ - future<> do_shadow_round(); + future<> do_shadow_round(std::unordered_set nodes = {}, bind_messaging_port do_bind = bind_messaging_port::yes); private: void build_seeds_list(); diff --git a/service/storage_service.cc b/service/storage_service.cc index fab1572a8b..d164d12922 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -206,12 +206,36 @@ bool get_property_load_ring_state() { return get_local_storage_service().db().local().get_config().load_ring_state(); } -bool storage_service::should_bootstrap() const { - return is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && !_gossiper.get_seeds().count(get_broadcast_address()); +bool storage_service::is_first_node() { + if (db().local().is_replacing()) { + return false; + } + auto seeds = _gossiper.get_seeds(); + if (seeds.empty()) { + return false; + } + // Node with the smallest IP address is chosen as the very first node + // in the cluster. The first node is the only node that does not + // bootstrap in the cluser. All other nodes will bootstrap. + std::vector sorted_seeds(seeds.begin(), seeds.end()); + std::sort(sorted_seeds.begin(), sorted_seeds.end()); + if (sorted_seeds.front() == get_broadcast_address()) { + slogger.info("I am the first node in the cluster. Skip bootstrap. Node={}", get_broadcast_address()); + return true; + } + return false; +} + +bool storage_service::should_bootstrap() { + return !db::system_keyspace::bootstrap_complete() && !is_first_node(); } // Runs inside seastar::async context -void storage_service::prepare_to_join(std::vector loaded_endpoints, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { +void storage_service::prepare_to_join( + std::unordered_set initial_contact_nodes, + std::unordered_set loaded_endpoints, + std::unordered_map loaded_peer_features, + bind_messaging_port do_bind) { std::map app_states; if (db::system_keyspace::was_decommissioned()) { if (db().local().get_config().override_decommission()) { @@ -233,62 +257,20 @@ void storage_service::prepare_to_join(std::vector loaded_endpoints if (db::system_keyspace::bootstrap_complete()) { throw std::runtime_error("Cannot replace address with a node that is already bootstrapped"); } - if (!is_auto_bootstrap()) { - throw std::runtime_error("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration"); - } - - std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(loaded_peer_features).get0(); + std::tie(_bootstrap_tokens, _cdc_streams_ts) = prepare_replacement_info(initial_contact_nodes, loaded_peer_features, do_bind).get0(); auto replace_address = db().local().get_replace_address(); replacing_a_node_with_same_ip = replace_address && *replace_address == get_broadcast_address(); replacing_a_node_with_diff_ip = replace_address && *replace_address != get_broadcast_address(); } else if (should_bootstrap()) { - check_for_endpoint_collision(loaded_peer_features).get(); + check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features, do_bind).get(); } else { - auto seeds = _gossiper.get_seeds(); - auto my_ep = get_broadcast_address(); auto local_features = _feature_service.known_feature_set(); - - if (seeds.count(my_ep)) { - // This node is a seed node - if (loaded_peer_features.empty()) { - // This is a competely new seed node, skip the check - slogger.info("Checking remote features skipped, since this node is a new seed node which knows nothing about the cluster"); - } else { - // This is a existing seed node - if (seeds.size() == 1) { - // This node is the only seed node, check features with system table - slogger.info("Checking remote features with system table, since this node is the only seed node"); - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - } else { - // More than one seed node in the seed list, do shadow round with other seed nodes - try { - slogger.info("Checking remote features with gossip and system tables"); - _gossiper.do_shadow_round().get(); - } catch (...) { - slogger.info("Shadow round failed with {}, checking remote features with system tables only", - std::current_exception()); - _gossiper.finish_shadow_round(); - } - - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - _gossiper.reset_endpoint_state_map().get(); - for (auto ep : loaded_endpoints) { - _gossiper.add_saved_endpoint(ep); - } - } - } - } else { - // This node is a non-seed node - // Do shadow round to check if this node knows all the features - // advertised by all other nodes, otherwise this node is too old - // (missing features) to join the cluser. - slogger.info("Checking remote features with gossip"); - _gossiper.do_shadow_round().get(); - _gossiper.check_knows_remote_features(local_features, loaded_peer_features); - _gossiper.reset_endpoint_state_map().get(); - for (auto ep : loaded_endpoints) { - _gossiper.add_saved_endpoint(ep); - } + slogger.info("Checking remote features with gossip, initial_contact_nodes={}", initial_contact_nodes); + _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get(); + _gossiper.check_knows_remote_features(local_features, loaded_peer_features); + _gossiper.reset_endpoint_state_map().get(); + for (auto ep : loaded_endpoints) { + _gossiper.add_saved_endpoint(ep); } } @@ -412,14 +394,6 @@ void storage_service::join_token_ring(int delay) { // We attempted to replace this with a schema-presence check, but you need a meaningful sleep // to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details. std::unordered_set current; - slogger.debug("Bootstrap variables: {} {} {} {}", - is_auto_bootstrap(), - db::system_keyspace::bootstrap_in_progress(), - db::system_keyspace::bootstrap_complete(), - _gossiper.get_seeds().count(get_broadcast_address())); - if (is_auto_bootstrap() && !db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) { - slogger.info("This node will not auto bootstrap because it is configured to be a seed node."); - } if (should_bootstrap()) { bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress(); if (resume_bootstrap) { @@ -547,19 +521,6 @@ void storage_service::join_token_ring(int delay) { } } - - if (!is_auto_bootstrap()) { - slogger.warn("auto_bootstrap set to \"off\". This causes UNDEFINED BEHAVIOR. YOU MAY LOSE DATA."); - } - - if (!db::system_keyspace::bootstrap_complete() && _gossiper.get_seeds().count(get_broadcast_address())) { - slogger.warn("Bootstrapping node marked as seed (present in the seed list)." - " This can only be done for the very first node in a new cluster." - " If this is not the first node, YOU MAY LOSE DATA." - " Bootstrapping new nodes into an existing cluster as seeds" - " causes UNDEFINED BEHAVIOR. DO NOT EVER do that."); - } - slogger.debug("Setting tokens to {}", _bootstrap_tokens); // This node must know about its chosen tokens before other nodes do // since they may start sending writes to this node after it gossips status = NORMAL. @@ -962,8 +923,6 @@ void storage_service::bootstrap() { } } - _gossiper.check_seen_seeds(); - _db.invoke_on_all([this] (database& db) { for (auto& cf : db.get_non_system_column_families()) { cf->notify_bootstrap_or_replace_start(); @@ -1643,7 +1602,7 @@ future<> storage_service::init_server(bind_messaging_port do_bind) { // pending ranges when keyspace is chagned _mnotifier.local().register_listener(this); - std::vector loaded_endpoints; + std::unordered_set loaded_endpoints; if (get_property_load_ring_state()) { slogger.info("Loading persisted ring state"); auto loaded_tokens = db::system_keyspace::load_tokens().get0(); @@ -1668,19 +1627,27 @@ future<> storage_service::init_server(bind_messaging_port do_bind) { if (loaded_host_ids.count(ep)) { _token_metadata.update_host_id(loaded_host_ids.at(ep), ep); } - loaded_endpoints.push_back(ep); + loaded_endpoints.insert(ep); _gossiper.add_saved_endpoint(ep); } } } + // Seeds are now only used as the initial contact point nodes. If the + // loaded_endpoints are empty which means this node is a completely new + // node, we use the nodes specified in seeds as the initial contact + // point nodes, otherwise use the peer nodes persisted in system table. + auto seeds = _gossiper.get_seeds(); + auto initial_contact_nodes = loaded_endpoints.empty() ? + std::unordered_set(seeds.begin(), seeds.end()) : + loaded_endpoints; auto loaded_peer_features = db::system_keyspace::load_peer_features().get0(); - slogger.info("loaded_peer_features: peer_features size={}", loaded_peer_features.size()); + slogger.info("initial_contact_nodes={}, loaded_endpoints={}, loaded_peer_features={}", + initial_contact_nodes, loaded_endpoints, loaded_peer_features.size()); for (auto& x : loaded_peer_features) { - slogger.info("loaded_peer_features: peer={}, supported_features={}", x.first, x.second); + slogger.info("peer={}, supported_features={}", x.first, x.second); } - - prepare_to_join(std::move(loaded_endpoints), loaded_peer_features, do_bind); + prepare_to_join(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), do_bind); }); } @@ -1745,16 +1712,16 @@ future<> storage_service::stop() { }); } -future<> storage_service::check_for_endpoint_collision(const std::unordered_map& loaded_peer_features) { +future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { slogger.debug("Starting shadow gossip round to check for endpoint collision"); - return seastar::async([this, loaded_peer_features] { + return seastar::async([this, initial_contact_nodes, loaded_peer_features, do_bind] { auto t = gms::gossiper::clk::now(); bool found_bootstrapping_node = false; auto local_features = _feature_service.known_feature_set(); do { slogger.info("Checking remote features with gossip"); - _gossiper.do_shadow_round().get(); + _gossiper.do_shadow_round(initial_contact_nodes, gms::bind_messaging_port(bool(do_bind))).get(); _gossiper.check_knows_remote_features(local_features, loaded_peer_features); auto addr = get_broadcast_address(); if (!_gossiper.is_safe_for_bootstrap(addr)) { @@ -1807,7 +1774,7 @@ void storage_service::remove_endpoint(inet_address endpoint) { } future -storage_service::prepare_replacement_info(const std::unordered_map& loaded_peer_features) { +storage_service::prepare_replacement_info(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind) { if (!db().local().get_replace_address()) { throw std::runtime_error(format("replace_address is empty")); } @@ -1823,7 +1790,7 @@ storage_service::prepare_replacement_info(const std::unordered_map, std::optional>; - future prepare_replacement_info(const std::unordered_map& loaded_peer_features); + future prepare_replacement_info(std::unordered_set initial_contact_nodes, + const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); public: future is_initialized(); - future<> check_for_endpoint_collision(const std::unordered_map& loaded_peer_features); + future<> check_for_endpoint_collision(std::unordered_set initial_contact_nodes, + const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); /*! * \brief Init the messaging service part of the service. @@ -345,8 +347,13 @@ public: void flush_column_families(); private: - bool should_bootstrap() const; - void prepare_to_join(std::vector loaded_endpoints, const std::unordered_map& loaded_peer_features, bind_messaging_port do_bind = bind_messaging_port::yes); + bool should_bootstrap(); + bool is_first_node(); + void prepare_to_join( + std::unordered_set initial_contact_nodes, + std::unordered_set loaded_endpoints, + std::unordered_map loaded_peer_features, + bind_messaging_port do_bind = bind_messaging_port::yes); void join_token_ring(int delay); void maybe_start_sys_dist_ks(); public: