diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 74742c3406..b5056a28e5 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -42,6 +42,7 @@ #include #include "gms/generation-number.hh" #include "locator/token_metadata.hh" +#include "seastar/rpc/rpc_types.hh" #include "utils/exceptions.hh" #include "utils/error_injection.hh" #include "utils/to_string.hh" @@ -409,7 +410,7 @@ future<> gossiper::handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg) { co_await apply_state_locally(std::move(remote_ep_state_map)); } -future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional generation_number_opt) { +future<> gossiper::handle_echo_msg(gms::inet_address from, seastar::rpc::opt_time_point timeout, std::optional generation_number_opt, bool notify_up) { bool respond = true; if (!_advertise_myself) { respond = false; @@ -434,9 +435,22 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional future<> { + try { + while (rpc::rpc_clock_type::now() < *timeout && !g.is_alive(from)) { + co_await sleep_abortable(std::chrono::milliseconds(100), g._abort_source); + } + } catch(...) { + logger.warn("handle_echo_msg: UP notification from {} failed with {}", from, std::current_exception()); + } + }); } - return make_ready_future<>(); } future<> gossiper::handle_shutdown_msg(inet_address from, std::optional generation_number_opt) { @@ -513,9 +527,9 @@ void gossiper::init_messaging_service_handler() { return gossiper.handle_ack2_msg(from, std::move(msg)); }); }); - ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point, rpc::optional generation_number_opt) { + ser::gossip_rpc_verbs::register_gossip_echo(&_messaging, [this] (const rpc::client_info& cinfo, seastar::rpc::opt_time_point timeout, rpc::optional generation_number_opt, rpc::optional notify_up_opt) { auto from = cinfo.retrieve_auxiliary("baddr"); - return handle_echo_msg(from, generation_number_opt); + return handle_echo_msg(from, timeout, generation_number_opt, notify_up_opt.value_or(false)); }); ser::gossip_rpc_verbs::register_gossip_shutdown(&_messaging, [this] (inet_address from, rpc::optional generation_number_opt) { return background_msg("GOSSIP_SHUTDOWN", [from, generation_number_opt] (gms::gossiper& gossiper) { @@ -951,7 +965,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, genera bool failed = false; try { logger.debug("failure_detector_loop: Send echo to node {}, status = started", node); - co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value()); + co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(node), netw::messaging_service::clock_type::now() + max_duration, gossip_generation.value(), false); logger.debug("failure_detector_loop: Send echo to node {}, status = ok", node); } catch (...) { failed = true; @@ -1651,6 +1665,19 @@ void gossiper::update_timestamp_for_nodes(const std::map gossiper::notify_nodes_on_up(std::unordered_set dsts) { + co_await coroutine::parallel_for_each(dsts, [this] (inet_address dst) -> future<> { + if (dst != get_broadcast_address()) { + try { + auto generation = my_endpoint_state().get_heart_beat_state().get_generation(); + co_await ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, netw::msg_addr(dst), netw::messaging_service::clock_type::now() + std::chrono::seconds(10), generation.value(), true); + } catch (...) { + logger.warn("Failed to notify node {} that I am UP: {}", dst, std::current_exception()); + } + } + }); +} + void gossiper::mark_alive(inet_address addr) { // Enter the _background_msg gate so stop() would wait on it auto inserted = _pending_mark_alive_endpoints.insert(addr).second; @@ -1673,7 +1700,7 @@ void gossiper::mark_alive(inet_address addr) { // Enter the _background_msg gate so stop() would wait on it auto gh = _background_msg.hold(); logger.debug("Sending a EchoMessage to {}, with generation_number={}", id, generation); - (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value()).then([this, addr] { + (void) ser::gossip_rpc_verbs::send_gossip_echo(&_messaging, id, netw::messaging_service::clock_type::now() + std::chrono::milliseconds(15000), generation.value(), false).then([this, addr] { logger.trace("Got EchoMessage Reply"); return real_mark_alive(addr); }).handle_exception([addr, gh = std::move(gh), unmark_pending = std::move(unmark_pending)] (auto ep) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 8862ef3fa0..4f147b8fd9 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -111,7 +111,7 @@ private: future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg); future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg); future<> handle_ack2_msg(msg_addr from, gossip_digest_ack2 msg); - future<> handle_echo_msg(inet_address from, std::optional generation_number_opt); + future<> handle_echo_msg(inet_address from, seastar::rpc::opt_time_point, std::optional generation_number_opt, bool notify_up); future<> handle_shutdown_msg(inet_address from, std::optional generation_number_opt); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); @@ -704,6 +704,8 @@ public: void check_snitch_name_matches(sstring local_snitch_name) const; int get_down_endpoint_count() const noexcept; int get_up_endpoint_count() const noexcept; + // Send UP notification to all nodes in the set + future<> notify_nodes_on_up(std::unordered_set); private: future<> failure_detector_loop(); future<> failure_detector_loop_for_node(gms::inet_address node, generation_type gossip_generation, uint64_t live_endpoints_version); diff --git a/idl/gossip.idl.hh b/idl/gossip.idl.hh index 1f499e016b..2a2124cc36 100644 --- a/idl/gossip.idl.hh +++ b/idl/gossip.idl.hh @@ -9,7 +9,7 @@ #include "gms/gossip_digest_syn.hh" namespace gms { -verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]]) +verb [[with_client_info, with_timeout]] gossip_echo (int64_t generation_number [[version 4.6.0]], bool notify_up [[version 6.1.0]]) verb [[one_way]] gossip_shutdown (gms::inet_address from, int64_t generation_number [[version 4.6.0]]) verb [[with_client_info, one_way]] gossip_digest_syn (gms::gossip_digest_syn syn) verb [[with_client_info, one_way]] gossip_digest_ack (gms::gossip_digest_ack ask) diff --git a/service/storage_service.cc b/service/storage_service.cc index fce7af4bf7..e132d52d62 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1824,6 +1824,17 @@ future<> storage_service::join_token_ring(sharded ips; + const auto& am = _group0->address_map(); + for (auto id : _topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys) { + auto ip = am.find(id); + if (ip) { + ips.insert(*ip); + } + } + + co_await _gossiper.notify_nodes_on_up(std::move(ips)); + co_return; } @@ -1996,6 +2007,15 @@ future<> storage_service::join_token_ring(sharded ips; + _gossiper.for_each_endpoint_state([this, &ips] (const inet_address& addr, const gms::endpoint_state&) { + if (_gossiper.is_normal(addr)) { + ips.insert(addr); + } + }); + + co_await _gossiper.notify_nodes_on_up(std::move(ips)); } future<> storage_service::track_upgrade_progress_to_topology_coordinator(sharded& sys_dist_ks, sharded& proxy) { diff --git a/test/manual/message.cc b/test/manual/message.cc index 846f57cba8..3188ad3fb1 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -109,7 +109,7 @@ public: return make_ready_future(netw::messaging_service::no_wait()); }); - ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional gen_opt) { + ser::gossip_rpc_verbs::register_gossip_echo(&ms, [] (const rpc::client_info& cinfo, rpc::opt_time_point, rpc::optional gen_opt, rpc::optional notify_up) { test_logger.info("Server got gossip echo msg"); throw std::runtime_error("I'm throwing runtime_error exception"); return make_ready_future<>(); @@ -155,7 +155,7 @@ public: test_logger.info("=== {} ===", __func__); auto id = get_msg_addr(); int64_t gen = 0x1; - return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen).then_wrapped([] (auto&& f) { + return ser::gossip_rpc_verbs::send_gossip_echo(&ms, id, netw::messaging_service::clock_type::now() + std::chrono::seconds(10), gen, false).then_wrapped([] (auto&& f) { try { f.get(); return make_ready_future<>();