From c929f23b8dbbf1b4805efc49c644a94b3b6c3109 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 13:14:17 +0300 Subject: [PATCH 01/13] gms: gossiper: coroutinize `mark_dead` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 15 +++++++-------- gms/gossiper.hh | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index f11ea00dd8..61182c5a61 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1056,7 +1056,7 @@ void gossiper::convict(inet_address endpoint) { if (is_shutdown(endpoint)) { mark_as_shutdown(endpoint); } else { - mark_dead(endpoint, *state); + mark_dead(endpoint, *state).get(); } } @@ -1557,19 +1557,18 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { }).get(); } -// Runs inside seastar::async context -void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { +future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { logger.trace("marking as down {}", addr); local_state.mark_dead(); endpoint_state state = local_state; _live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr))); - update_live_endpoints_version().get(); + co_await update_live_endpoints_version(); _unreachable_endpoints[addr] = now(); logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state)); - _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { co_await subscriber->on_dead(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }).get(); + }); } // Runs inside seastar::async context @@ -1608,7 +1607,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& mark_alive(ep, ep_state); } else { logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps)); - mark_dead(ep, ep_state); + mark_dead(ep, ep_state).get(); } auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep); @@ -2265,7 +2264,7 @@ void gossiper::mark_as_shutdown(const inet_address& endpoint) { ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); replicate(endpoint, ep_state).get(); - mark_dead(endpoint, ep_state); + mark_dead(endpoint, ep_state).get(); convict(endpoint); } } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a32887d8d1..e20d2a585b 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -403,7 +403,7 @@ private: void real_mark_alive(inet_address addr, endpoint_state& local_state); - void mark_dead(inet_address addr, endpoint_state& local_state); + future<> mark_dead(inet_address addr, endpoint_state& local_state); /** * This method is called whenever there is a "big" change in ep state (a generation change for a known node). From 231d8a3ad415659f98a5f03019a1808f9540f08d Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 13:20:06 +0300 Subject: [PATCH 02/13] gms: gossiper: coroutinize `real_mark_alive` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 15 +++++++-------- gms/gossiper.hh | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 61182c5a61..1f586832cd 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1503,7 +1503,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { } else { endpoint_state& state = *es; logger.debug("Mark Node {} alive after EchoMessage", addr); - real_mark_alive(addr, state); + real_mark_alive(addr, state).get(); } }); }).finally([this, addr] { @@ -1513,15 +1513,14 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { }); } -// Runs inside seastar::async context -void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { +future<> gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { logger.trace("marking as alive {}", addr); // Do not mark a node with status shutdown as UP. auto status = sstring(get_gossip_status(local_state)); if (status == sstring(versioned_value::SHUTDOWN)) { logger.warn("Skip marking node {} with status = {} as UP", addr, status); - return; + co_return; } local_state.mark_alive(); @@ -1534,13 +1533,13 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { auto it_ = std::find(_live_endpoints.begin(), _live_endpoints.end(), addr); bool was_live = it_ != _live_endpoints.end(); if (was_live) { - return; + co_return; } // Make a copy for endpoint_state because the code below can yield endpoint_state state = local_state; _live_endpoints.push_back(addr); - update_live_endpoints_version().get(); + co_await update_live_endpoints_version(); if (_endpoints_to_talk_with.empty()) { _endpoints_to_talk_with.push_back({addr}); } else { @@ -1551,10 +1550,10 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { logger.info("InetAddress {} is now UP, status = {}", addr, status); } - _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { co_await subscriber->on_alive(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }).get(); + }); } future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index e20d2a585b..0be430dd14 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -401,7 +401,7 @@ private: void mark_alive(inet_address addr, endpoint_state& local_state); - void real_mark_alive(inet_address addr, endpoint_state& local_state); + future<> real_mark_alive(inet_address addr, endpoint_state& local_state); future<> mark_dead(inet_address addr, endpoint_state& local_state); From 37066039dfb0d856ed2d56f13ccb540fbee4606d Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 13:55:59 +0300 Subject: [PATCH 03/13] gms: gossiper: coroutinize `do_before_change_notifications` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 9 ++++----- gms/gossiper.hh | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 1f586832cd..808d458018 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1711,11 +1711,10 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, } } -// Runs inside seastar::async context -void gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { - _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { +future<> gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { + co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { return subscriber->before_change(addr, ep_state, ap_state, new_value); - }).get(); + }); } // Runs inside seastar::async context @@ -2054,7 +2053,7 @@ future<> gossiper::add_local_application_state(std::list do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); // notify that an application state has changed void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); From 529f4d0f98dfe5cf7a59a86ea3becb85a8a5cf0f Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 13:59:30 +0300 Subject: [PATCH 04/13] gms: gossiper: coroutinize `do_on_change_notifications` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 11 +++++------ gms/gossiper.hh | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 808d458018..62c80c1f1b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1678,7 +1678,7 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, // Listeners should decide which failures are non-fatal and swallow them. auto run_listeners = seastar::defer([&] () noexcept { for (auto&& key : changed) { - do_on_change_notifications(addr, key, remote_map.at(key)); + do_on_change_notifications(addr, key, remote_map.at(key)).get(); } }); @@ -1717,11 +1717,10 @@ future<> gossiper::do_before_change_notifications(inet_address addr, const endpo }); } -// Runs inside seastar::async context -void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { - _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { +future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { + co_await _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { return subscriber->on_change(addr, state, value); - }).get(); + }); } void gossiper::request_all(gossip_digest& g_digest, @@ -2079,7 +2078,7 @@ future<> gossiper::add_local_application_state(std::list do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); // notify that an application state has changed - void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); + future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); /* Request all the state for the endpoint in the g_digest */ void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, int remote_generation); From ee30d0a385aace85eec8f7f64a3fc967407a0361 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 14:14:03 +0300 Subject: [PATCH 05/13] gms: gossiper: don't use `seastar::async` in `mark_alive` Since `real_mark_alive` does not require `seastar::async` now, we can eliminate the wrapping async call, as well. Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 62c80c1f1b..f241932d77 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1493,19 +1493,18 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) { // Do it in the background. (void)_messaging.send_gossip_echo(id, generation, std::chrono::milliseconds(15000)).then([this, addr] { logger.trace("Got EchoMessage Reply"); - return seastar::async([this, addr] { - // After sending echo message, the Node might not be in the - // endpoint_state_map anymore, use the reference of local_state - // might cause user-after-free - auto es = get_endpoint_state_for_endpoint_ptr(addr); - if (!es) { - logger.info("Node {} is not in endpoint_state_map anymore", addr); - } else { - endpoint_state& state = *es; - logger.debug("Mark Node {} alive after EchoMessage", addr); - real_mark_alive(addr, state).get(); - } - }); + // After sending echo message, the Node might not be in the + // endpoint_state_map anymore, use the reference of local_state + // might cause user-after-free + auto es = get_endpoint_state_for_endpoint_ptr(addr); + if (!es) { + logger.info("Node {} is not in endpoint_state_map anymore", addr); + } else { + endpoint_state& state = *es; + logger.debug("Mark Node {} alive after EchoMessage", addr); + return real_mark_alive(addr, state); + } + return make_ready_future(); }).finally([this, addr] { _pending_mark_alive_endpoints.erase(addr); }).handle_exception([addr] (auto ep) { From c584a9cc1fff7a82fba6bf2626d43362ccb23d49 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 14:18:18 +0300 Subject: [PATCH 06/13] gms: gossiper: remove comment about requiring thread context in `mark_alive` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index f241932d77..9ce2f228c0 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1470,7 +1470,6 @@ void gossiper::update_timestamp_for_nodes(const std::map Date: Sun, 16 Jan 2022 14:27:00 +0300 Subject: [PATCH 07/13] gms: gossiper: coroutinize `mark_as_shutdown` and `convict` Since these two functions call each other, convert to coroutines and eliminate the dependency on `seastar::async` for both of them at the same time. Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 27 ++++++++++++--------------- gms/gossiper.hh | 4 ++-- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 9ce2f228c0..317250fdb0 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -425,7 +425,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional return; } } - this->mark_as_shutdown(from); + this->mark_as_shutdown(from).get(); }); } @@ -755,7 +755,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_ logger.info("failure_detector_loop: Mark node {} as DOWN", node); co_await container().invoke_on(0, [node] (gms::gossiper& g) { return seastar::async([node, &g] { - g.convict(node); + g.convict(node).get(); }); }); co_return; @@ -814,7 +814,7 @@ future<> gossiper::failure_detector_loop() { nodes, _live_endpoints, nodes_down); co_await seastar::async([this, &nodes_down] { for (const auto& node : nodes_down) { - convict(node); + convict(node).get(); } }); } @@ -1046,17 +1046,15 @@ int64_t gossiper::get_endpoint_downtime(inet_address ep) const noexcept { // Depends on // - on_dead callbacks // It is called from failure_detector -// -// Runs inside seastar::async context -void gossiper::convict(inet_address endpoint) { +future<> gossiper::convict(inet_address endpoint) { auto* state = get_endpoint_state_for_endpoint_ptr(endpoint); if (!state || !state->is_alive()) { - return; + co_return; } if (is_shutdown(endpoint)) { - mark_as_shutdown(endpoint); + co_await mark_as_shutdown(endpoint); } else { - mark_dead(endpoint, *state).get(); + co_await mark_dead(endpoint, *state); } } @@ -1615,7 +1613,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& } // check this at the end so nodes will learn about the endpoint if (is_shutdown(ep)) { - mark_as_shutdown(ep); + mark_as_shutdown(ep).get(); } } @@ -2251,16 +2249,15 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application * This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it * @param endpoint endpoint that has shut itself down */ -// Runs inside seastar::async context -void gossiper::mark_as_shutdown(const inet_address& endpoint) { +future<> gossiper::mark_as_shutdown(const inet_address& endpoint) { auto es = get_endpoint_state_for_endpoint_ptr(endpoint); if (es) { auto& ep_state = *es; ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); - replicate(endpoint, ep_state).get(); - mark_dead(endpoint, ep_state).get(); - convict(endpoint); + co_await replicate(endpoint, ep_state); + co_await mark_dead(endpoint, ep_state); + co_await convict(endpoint); } } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 1565f0c938..f2466156a6 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -260,7 +260,7 @@ public: /** * @param endpoint end point that is convicted. */ - void convict(inet_address endpoint); + future<> convict(inet_address endpoint); /** * Return either: the greatest heartbeat or application state @@ -554,7 +554,7 @@ public: bool is_normal_ring_member(const inet_address& endpoint) const; bool is_cql_ready(const inet_address& endpoint) const; bool is_silent_shutdown_state(const endpoint_state& ep_state) const; - void mark_as_shutdown(const inet_address& endpoint); + future<> mark_as_shutdown(const inet_address& endpoint); void force_newer_generation(); public: std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept; From 705a7598911be3d06e7f7595300c3e53216a44df Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 14:39:41 +0300 Subject: [PATCH 08/13] gms: gossiper: coroutinize `handle_shutdown_msg` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 317250fdb0..7253090a44 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -404,29 +404,28 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional gossiper::handle_shutdown_msg(inet_address from, std::optional generation_number_opt) { if (!is_enabled()) { logger.debug("Ignoring shutdown message from {} because gossip is disabled", from); - return make_ready_future<>(); + co_return; } - return seastar::async([this, from, generation_number_opt] { - auto permit = this->lock_endpoint(from).get0(); - if (generation_number_opt) { - auto es = this->get_endpoint_state_for_endpoint_ptr(from); - if (es) { - int local_generation = es->get_heart_beat_state().get_generation(); - logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", + + auto permit = co_await this->lock_endpoint(from); + if (generation_number_opt) { + auto es = this->get_endpoint_state_for_endpoint_ptr(from); + if (es) { + int local_generation = es->get_heart_beat_state().get_generation(); + logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", + from, generation_number_opt.value(), local_generation); + if (local_generation != generation_number_opt.value()) { + logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}", from, generation_number_opt.value(), local_generation); - if (local_generation != generation_number_opt.value()) { - logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}", - from, generation_number_opt.value(), local_generation); - return; - } - } else { - logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found", - from, generation_number_opt.value()); - return; + co_return; } + } else { + logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found", + from, generation_number_opt.value()); + co_return; } - this->mark_as_shutdown(from).get(); - }); + } + co_await this->mark_as_shutdown(from); } future From e26829e202b7347de846d3206f0198cc116ff61a Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Sun, 16 Jan 2022 14:46:21 +0300 Subject: [PATCH 09/13] gms: gossiper: coroutinize `handle_major_state_change` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 25 ++++++++++++------------- gms/gossiper.hh | 2 +- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 7253090a44..c67ddb66d2 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -544,7 +544,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta if (listener_notification) { logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node); // major state change will handle the update by inserting the remote state directly - this->handle_major_state_change(node, remote_state); + this->handle_major_state_change(node, remote_state).get(); } else { logger.debug("Applying remote_state for node {} (remote generation > local generation)", node); endpoint_state_map[node] = remote_state; @@ -582,7 +582,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta } } else { if (listener_notification) { - this->handle_major_state_change(node, remote_state); + this->handle_major_state_change(node, remote_state).get(); } else { logger.debug("Applying remote_state for node {} (new node)", node); endpoint_state_map[node] = remote_state; @@ -1243,7 +1243,7 @@ future<> gossiper::assassinate_endpoint(sstring address) { std::unordered_set tokens_set(tokens.begin(), tokens.end()); auto expire_time = gossiper.compute_expire_time(); ep_state.add_application_state(application_state::STATUS, versioned_value::left(tokens_set, expire_time.time_since_epoch().count())); - gossiper.handle_major_state_change(endpoint, ep_state); + gossiper.handle_major_state_change(endpoint, ep_state).get(); sleep_abortable(INTERVAL * 4, gossiper._abort_source).get(); logger.warn("Finished assassinating {}", endpoint); }); @@ -1565,8 +1565,7 @@ future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { }); } -// Runs inside seastar::async context -void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) { +future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) { auto eps_old = get_endpoint_state_for_endpoint(ep); if (!is_dead_state(eps) && !is_in_shadow_round()) { if (endpoint_state_map.contains(ep)) { @@ -1577,7 +1576,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& } logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps)); endpoint_state_map[ep] = eps; - replicate(ep, eps).get(); + co_await replicate(ep, eps); if (is_in_shadow_round()) { // In shadow round, we only interested in the peer's endpoint_state, @@ -1586,14 +1585,14 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& // procedure with EchoMessage gossip message. We will do them during // normal gossip runs anyway. logger.debug("In shadow round addr={}, eps={}", ep, eps); - return; + co_return; } if (eps_old) { // the node restarted: it is up to the subscriber to take whatever action is necessary - _subscribers.for_each([ep, eps_old] (shared_ptr subscriber) { + co_await _subscribers.for_each([ep, eps_old] (shared_ptr subscriber) { return subscriber->on_restart(ep, *eps_old); - }).get(); + }); } auto& ep_state = endpoint_state_map.at(ep); @@ -1601,18 +1600,18 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& mark_alive(ep, ep_state); } else { logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps)); - mark_dead(ep, ep_state).get(); + co_await mark_dead(ep, ep_state); } auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep); if (eps_new) { - _subscribers.for_each([ep, eps_new] (shared_ptr subscriber) { + co_await _subscribers.for_each([ep, eps_new] (shared_ptr subscriber) { return subscriber->on_join(ep, *eps_new); - }).get(); + }); } // check this at the end so nodes will learn about the endpoint if (is_shutdown(ep)) { - mark_as_shutdown(ep).get(); + co_await mark_as_shutdown(ep); } } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index f2466156a6..40603a01de 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -411,7 +411,7 @@ private: * @param ep endpoint * @param ep_state EndpointState for the endpoint */ - void handle_major_state_change(inet_address ep, const endpoint_state& eps); + future<> handle_major_state_change(inet_address ep, const endpoint_state& eps); public: bool is_alive(inet_address ep) const; From e9f5da9507d2e8c0aa4b04242a42e8b89ab531e8 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Mon, 17 Jan 2022 08:26:23 +0300 Subject: [PATCH 10/13] gms: gossiper: don't wrap `convict` calls into `seastar::async` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index c67ddb66d2..d05713bf70 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -753,9 +753,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_ if (diff > max_duration) { logger.info("failure_detector_loop: Mark node {} as DOWN", node); co_await container().invoke_on(0, [node] (gms::gossiper& g) { - return seastar::async([node, &g] { - g.convict(node).get(); - }); + return g.convict(node); }); co_return; } @@ -811,11 +809,9 @@ future<> gossiper::failure_detector_loop() { if (!nodes_down.empty()) { logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}", nodes, _live_endpoints, nodes_down); - co_await seastar::async([this, &nodes_down] { - for (const auto& node : nodes_down) { - convict(node).get(); - } - }); + for (const auto& node : nodes_down) { + co_await convict(node); + } } // Make sure _live_endpoints do not change when nodes_down are being convicted above. This guarantees no down nodes will miss the convict. logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}, version_before={}, version_after={}", From 4416070f56f5afb860122a9c3343ff890a3154ee Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Mon, 17 Jan 2022 08:29:10 +0300 Subject: [PATCH 11/13] gms: gossiper: coroutinize `advertise_removing` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index d05713bf70..49190e67fd 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1148,29 +1148,27 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi } future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) { - return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] { - auto& state = get_endpoint_state(endpoint); - // remember this node's generation - int generation = state.get_heart_beat_state().get_generation(); - logger.info("Removing host: {}", host_id); - auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms()); - logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); - sleep_abortable(ring_delay, _abort_source).get(); - // make sure it did not change - auto& eps = get_endpoint_state(endpoint); - if (eps.get_heart_beat_state().get_generation() != generation) { - throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint)); - } + auto& state = get_endpoint_state(endpoint); + // remember this node's generation + int generation = state.get_heart_beat_state().get_generation(); + logger.info("Removing host: {}", host_id); + auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms()); + logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); + co_await sleep_abortable(ring_delay, _abort_source); + // make sure it did not change + auto& eps = get_endpoint_state(endpoint); + if (eps.get_heart_beat_state().get_generation() != generation) { + throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint)); + } - // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id)); - eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id)); - endpoint_state_map[endpoint] = eps; - replicate(endpoint, eps).get(); - }); + // update the other node's generation to mimic it as if it had changed it itself + logger.info("Advertising removal for {}", endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id)); + eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id)); + endpoint_state_map[endpoint] = eps; + co_await replicate(endpoint, eps); } future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) { From ab41151a41ceb8e6fab8fcfb64876d0ccd5f9cdb Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Mon, 17 Jan 2022 08:31:14 +0300 Subject: [PATCH 12/13] gms: gossiper: coroutinize `advertise_token_removed` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 49190e67fd..694a7367c5 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1172,19 +1172,17 @@ future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id } future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) { - return seastar::async([this, g = this->shared_from_this(), endpoint, host_id] { - auto& eps = get_endpoint_state(endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - auto expire_time = compute_expire_time(); - eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count())); - logger.info("Completing removal of {}", endpoint); - add_expire_time_for_endpoint(endpoint, expire_time); - endpoint_state_map[endpoint] = eps; - replicate(endpoint, eps).get(); - // ensure at least one gossip round occurs before returning - sleep_abortable(INTERVAL * 2, _abort_source).get(); - }); + auto& eps = get_endpoint_state(endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + auto expire_time = compute_expire_time(); + eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count())); + logger.info("Completing removal of {}", endpoint); + add_expire_time_for_endpoint(endpoint, expire_time); + endpoint_state_map[endpoint] = eps; + co_await replicate(endpoint, eps); + // ensure at least one gossip round occurs before returning + co_await sleep_abortable(INTERVAL * 2, _abort_source); } future<> gossiper::unsafe_assassinate_endpoint(sstring address) { From dce3159156f4d312e00941e0a24cd57904cd30f3 Mon Sep 17 00:00:00 2001 From: Pavel Solodovnikov Date: Mon, 17 Jan 2022 08:38:07 +0300 Subject: [PATCH 13/13] gms: gossiper: coroutinize `wait_for_gossip` Signed-off-by: Pavel Solodovnikov --- gms/gossiper.cc | 56 ++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 694a7367c5..3a11eec030 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -2288,42 +2288,40 @@ future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std: if (force_after && *force_after == 0) { logger.warn("Skipped to wait for gossip to settle by user request since skip_wait_for_gossip_to_settle is set zero. Do not use this in production!"); - return make_ready_future<>(); + co_return; } - return seastar::async([this, initial_delay, force_after] { - int32_t total_polls = 0; - int32_t num_okay = 0; - int32_t ep_size = endpoint_state_map.size(); + int32_t total_polls = 0; + int32_t num_okay = 0; + int32_t ep_size = endpoint_state_map.size(); - auto delay = initial_delay; + auto delay = initial_delay; - sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source).get(); - while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { - sleep_abortable(delay, _abort_source).get(); - delay = GOSSIP_SETTLE_POLL_INTERVAL_MS; + co_await sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source); + while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { + co_await sleep_abortable(delay, _abort_source); + delay = GOSSIP_SETTLE_POLL_INTERVAL_MS; - int32_t current_size = endpoint_state_map.size(); - total_polls++; - if (current_size == ep_size && _msg_processing == 0) { - logger.debug("Gossip looks settled"); - num_okay++; - } else { - logger.info("Gossip not settled after {} polls.", total_polls); - num_okay = 0; - } - ep_size = current_size; - if (force_after && *force_after > 0 && total_polls > *force_after) { - logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls); - break; - } - } - if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { - logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); + int32_t current_size = endpoint_state_map.size(); + total_polls++; + if (current_size == ep_size && _msg_processing == 0) { + logger.debug("Gossip looks settled"); + num_okay++; } else { - logger.info("No gossip backlog; proceeding"); + logger.info("Gossip not settled after {} polls.", total_polls); + num_okay = 0; } - }); + ep_size = current_size; + if (force_after && *force_after > 0 && total_polls > *force_after) { + logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls); + break; + } + } + if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { + logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); + } else { + logger.info("No gossip backlog; proceeding"); + } } future<> gossiper::wait_for_gossip_to_settle() {