diff --git a/service/storage_service.cc b/service/storage_service.cc index 6018a7d49f..95fe9ffa9e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -78,6 +78,7 @@ #include #include #include +#include using token = dht::token; using UUID = utils::UUID; @@ -1953,6 +1954,10 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi // may trivially finish without waiting for anyone. co_await _gossiper.wait_for_live_nodes_to_show_up(2); + // Note: in Raft topology mode this is unnecessary. + // Node state changes are propagated to the cluster through explicit global barriers. + co_await wait_for_normal_state_handled_on_boot(); + auto ignore_nodes = ri ? parse_node_list(_db.local().get_config().ignore_dead_nodes_for_replace(), get_token_metadata()) // TODO: specify ignore_nodes for bootstrap @@ -1962,10 +1967,6 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi sync_nodes.erase(ri->address); } - // Note: in Raft topology mode this is unnecessary. - // Node state changes are propagated to the cluster through explicit global barriers. - co_await wait_for_normal_state_handled_on_boot(sync_nodes); - // NORMAL doesn't necessarily mean UP (#14042). Wait for these nodes to be UP as well // to reduce flakiness (we need them to be UP to perform CDC generation write and for repair/streaming). // @@ -5706,21 +5707,45 @@ bool storage_service::is_normal_state_handled_on_boot(gms::inet_address node) { return _normal_state_handled_on_boot.contains(node); } -// Wait for normal state handler to finish on boot -future<> storage_service::wait_for_normal_state_handled_on_boot(const std::unordered_set& nodes) { - slogger.info("Started waiting for normal state handler for nodes {}", nodes); +// Wait for normal state handlers to finish on boot +future<> storage_service::wait_for_normal_state_handled_on_boot() { + static logger::rate_limit rate_limit{std::chrono::seconds{5}}; + static auto fmt_nodes_with_statuses = [this] (const auto& eps) { + return boost::algorithm::join( + eps | boost::adaptors::transformed([this] (const auto& ep) { + return ::format("({}, status={})", ep, _gossiper.get_gossip_status(ep)); + }), ", "); + }; + + slogger.info("Started waiting for normal state handlers to finish"); auto start_time = std::chrono::steady_clock::now(); - for (auto& node: nodes) { - while (!is_normal_state_handled_on_boot(node)) { - slogger.debug("Waiting for normal state handler for node {}", node); - co_await sleep_abortable(std::chrono::milliseconds(100), _abort_source); - if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) { - throw std::runtime_error(::format("Node {} did not finish normal state handler, reject the node ops", node)); - } + std::vector eps; + while (true) { + eps = _gossiper.get_endpoints(); + auto it = std::partition(eps.begin(), eps.end(), + [this, me = get_broadcast_address()] (const gms::inet_address& ep) { + return ep == me || !_gossiper.is_normal_ring_member(ep) || is_normal_state_handled_on_boot(ep); + }); + + if (it == eps.end()) { + break; } + + if (std::chrono::steady_clock::now() > start_time + std::chrono::seconds(60)) { + auto err = ::format("Timed out waiting for normal state handlers to finish for nodes {}", + fmt_nodes_with_statuses(boost::make_iterator_range(it, eps.end()))); + slogger.error("{}", err); + throw std::runtime_error{std::move(err)}; + } + + slogger.log(log_level::info, rate_limit, "Normal state handlers not yet finished for nodes {}", + fmt_nodes_with_statuses(boost::make_iterator_range(it, eps.end()))); + + co_await sleep_abortable(std::chrono::milliseconds{100}, _abort_source); } - slogger.info("Finished waiting for normal state handler for nodes {}", nodes); - co_return; + + slogger.info("Finished waiting for normal state handlers; endpoints observed in gossip: {}", + fmt_nodes_with_statuses(eps)); } future storage_service::is_cleanup_allowed(sstring keyspace) { diff --git a/service/storage_service.hh b/service/storage_service.hh index c0ba08ac67..c5eaf5448b 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -749,7 +749,7 @@ public: private: std::unordered_set _normal_state_handled_on_boot; bool is_normal_state_handled_on_boot(gms::inet_address); - future<> wait_for_normal_state_handled_on_boot(const std::unordered_set& nodes); + future<> wait_for_normal_state_handled_on_boot(); friend class group0_state_machine; bool _raft_topology_change_enabled = false;