From 28c0a27467a16eacfa17fa8692fb2e433671055e Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 17 Jun 2024 13:01:02 +0300 Subject: [PATCH] Wait for booting node to be marked UP before complete booting. Currently a node does not wait to be marked UP by other nodes before complete booting which creates a usability issue: during a rolling restart it is not enough to wait for local CQL port to be opened before restarting next node, but it is also needed to check that all other nodes already see this node as alive otherwise if next node is restarted some nodes may see two node as dead instead of one. This patch improves the situation by making sure that boot process does not complete before all other nodes do not see the booting one as alive. This is still a best effort thing: if some nodes are unreachable or gossiper propagation takes too much time the boot process continues anyway. Fixes scylladb/scylladb#19206 --- gms/gossiper.cc | 41 +++++++++++++++++++++++++++++++------- gms/gossiper.hh | 4 +++- idl/gossip.idl.hh | 2 +- service/storage_service.cc | 20 +++++++++++++++++++ test/manual/message.cc | 4 ++-- 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 74742c3406..b5056a28e5 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -42,6 +42,7 @@ #include #include "gms/generation-number.hh" #include "locator/token_metadata.hh" +#include "seastar/rpc/rpc_types.hh" #include "utils/exceptions.hh" #include "utils/error_injection.hh" #include "utils/to_string.hh" @@ -409,7 +410,7 @@ future<> gossiper::handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg) { co_await apply_state_locally(std::move(remote_ep_state_map)); } -future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional generation_number_opt) { +future<> gossiper::handle_echo_msg(gms::inet_address from, seastar::rpc::opt_time_point timeout, std::optional generation_number_opt, bool notify_up) { bool respond = true; if (!_advertise_myself) { respond = false; @@ -434,9 +435,22 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional future<> { + try { + while (rpc::rpc_clock_type::now() < *timeout && !g.is_alive(from)) { + co_await sleep_abortable(std::chrono::milliseconds(100), g._abort_source); + } + } catch(...) { + logger.warn("handle_echo_msg: UP notification from {} failed with {}", from, std::current_exception()); + } + }); } - return make_ready_future<>(); } future<> gossiper::handle_shutdown_msg(inet_address from, std::optional generation_number_opt) { @@ -513,9 +527,9 @@ void gossiper::init_messaging_service_handler() { return gossiper.handle_ack2_msg(from, std::move(msg)); }); }); - ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point, rpc::optional generation_number_opt) { + ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point timeout, rpc::optional generation_number_opt, rpc::optional notify_up_opt) { auto from = cinfo.retrieve_auxiliary("baddr"); - return handle_echo_msg(from, generation_number_opt); + return handle_echo_msg(from, timeout, generation_number_opt, notify_up_opt.value_or(false)); }); ser::gossip_rpc_verbs::register_gossip_shutdown(&_messaging, [this] (inet_address from, rpc::optional generation_number_opt) { return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) { @@ -951,7 +965,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}, status = started", node); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value()); + co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false); logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node); } catch (...) { failed = true; @@ -1651,6 +1665,19 @@ void gossiper::update_timestamp_for_nodes(const std::map gossiper::notify_nodes_on_up(std::unordered_set dsts) { + co_await coroutine::parallel_for_each(dsts, [this] (inet_address dst) -> future<> { + if (dst != get_broadcast_address()) { + try { + auto generation = my_endpoint_state().get_heart_beat_state().get_generation(); + co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(dst), netw::messaging_service::clock_type::now() + std::chrono::seconds(10), generation.value(), true); + } catch (...) { + logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception()); + } + } + }); +} + void gossiper::mark_alive(inet_address addr) { // Enter the _background_msg gate so stop() would wait on it auto inserted = _pending_mark_alive_endpoints.insert(addr).second; @@ -1673,7 +1700,7 @@ void gossiper::mark_alive(inet_address addr) { // Enter the _background_msg gate so stop() would wait on it auto gh = _background_msg.hold(); logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation); - (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value()).then([this, addr] { + (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value(), false).then([this, addr] { logger.trace("Got EchoMessage Reply"); return real_mark_alive(addr); }).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending)] (auto ep) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 8862ef3fa0..4f147b8fd9 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -111,7 +111,7 @@ private: future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); future<> handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg); - future<> handle_echo_msg(inet_address from, std::optional generation_number_opt); + future<> handle_echo_msg(inet_address from, seastar::rpc::opt_time_point, std::optional generation_number_opt, bool notify_up); future<> handle_shutdown_msg(inet_address from, std::optional generation_number_opt); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); @@ -704,6 +704,8 @@ public: void check_snitch_name_matches(sstring local_snitch_name) const; int get_down_endpoint_count() const noexcept; int get_up_endpoint_count() const noexcept; + // Send UP notification to all nodes in the set + future<> notify_nodes_on_up(std::unordered_set); private: future<> failure_detector_loop(); future<> failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version); diff --git a/idl/gossip.idl.hh b/idl/gossip.idl.hh index 1f499e016b..2a2124cc36 100644 --- a/idl/gossip.idl.hh +++ b/idl/gossip.idl.hh @@ -9,7 +9,7 @@ #include "gms/gossip_digest_syn.hh" namespace gms { -verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]]) +verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) verb [[one_way]] gossip_shutdown (gms::inet_address from, int64_t generation_number [[version 4.6.0]]) verb [[with_client_info, one_way]] gossip_digest_syn (gms::gossip_digest_syn syn) verb [[with_client_info, one_way]] gossip_digest_ack (gms::gossip_digest_ack ask) diff --git a/service/storage_service.cc b/service/storage_service.cc index fce7af4bf7..e132d52d62 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1824,6 +1824,17 @@ future<> storage_service::join_token_ring(sharded ips; + const auto& am = _group0->address_map(); + for (auto id : _topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys) { + auto ip = am.find(id); + if (ip) { + ips.insert(*ip); + } + } + + co_await _gossiper.notify_nodes_on_up(std::move(ips)); + co_return; } @@ -1996,6 +2007,15 @@ future<> storage_service::join_token_ring(sharded ips; + _gossiper.for_each_endpoint_state([this, &ips] (const inet_address& addr, const gms::endpoint_state&) { + if (_gossiper.is_normal(addr)) { + ips.insert(addr); + } + }); + + co_await _gossiper.notify_nodes_on_up(std::move(ips)); } future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded& sys_dist_ks, sharded& proxy) { diff --git a/test/manual/message.cc b/test/manual/message.cc index 846f57cba8..3188ad3fb1 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -109,7 +109,7 @@ public: return make_ready_future(netw::messaging_service::no_wait()); }); - ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional gen_opt) { + ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional gen_opt, rpc::optional notify_up) { test_logger.info("Server got gossip echo msg"); throw std::runtime_error("I'm throwing runtime_error exception"); return make_ready_future<>(); @@ -155,7 +155,7 @@ public: test_logger.info("=== {} ===", __func__); auto id = get_msg_addr(); int64_t gen = 0x1; - return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen).then_wrapped([] (auto&& f) { + return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen, false).then_wrapped([] (auto&& f) { try { f.get(); return make_ready_future<>();