diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 06aee5f5b5..568e441aa8 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2136,6 +2136,32 @@ bool gossiper::is_alive(inet_address ep) const { return false; } +// Runs inside seastar::async context +void gossiper::wait_alive(std::vector nodes, std::chrono::milliseconds timeout) { + auto start_time = std::chrono::steady_clock::now(); + for (;;) { + std::vector live_nodes; + for (const auto& node: nodes) { + size_t nr_alive = container().map_reduce0([node] (gossiper& g) -> size_t { + return g.is_alive(node) ? 1 : 0; + }, 0, std::plus()).get0(); + logger.debug("Marked node={} as alive on {} out of {} shards", node, nr_alive, smp::count); + if (nr_alive == smp::count) { + live_nodes.push_back(node); + } + } + logger.debug("Waited for marking node as up, replace_nodes={}, live_nodes={}", nodes, live_nodes); + if (live_nodes.size() == nodes.size()) { + break; + } + if (std::chrono::steady_clock::now() > timeout + start_time) { + throw std::runtime_error(format("Failed to mark node as alive in {} ms, nodes={}, live_nodes={}", + timeout.count(), nodes, live_nodes)); + } + sleep_abortable(std::chrono::milliseconds(100), _abort_source).get(); + } +} + const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept { auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint)); if (!eps) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 904006c596..32cdcfaf57 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -442,6 +442,8 @@ private: public: bool is_alive(inet_address ep) const; bool is_dead_state(const endpoint_state& eps) const; + // Wait for nodes to be alive on all shards + void wait_alive(std::vector nodes, std::chrono::milliseconds timeout); future<> apply_state_locally(std::map map);