diff --git a/gms/gossiper.cc b/gms/gossiper.cc index f11ea00dd8..3a11eec030 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -404,29 +404,28 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional gossiper::handle_shutdown_msg(inet_address from, std::optional generation_number_opt) { if (!is_enabled()) { logger.debug("Ignoring shutdown message from {} because gossip is disabled", from); - return make_ready_future<>(); + co_return; } - return seastar::async([this, from, generation_number_opt] { - auto permit = this->lock_endpoint(from).get0(); - if (generation_number_opt) { - auto es = this->get_endpoint_state_for_endpoint_ptr(from); - if (es) { - int local_generation = es->get_heart_beat_state().get_generation(); - logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", + + auto permit = co_await this->lock_endpoint(from); + if (generation_number_opt) { + auto es = this->get_endpoint_state_for_endpoint_ptr(from); + if (es) { + int local_generation = es->get_heart_beat_state().get_generation(); + logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", + from, generation_number_opt.value(), local_generation); + if (local_generation != generation_number_opt.value()) { + logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}", from, generation_number_opt.value(), local_generation); - if (local_generation != generation_number_opt.value()) { - logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}", - from, generation_number_opt.value(), local_generation); - return; - } - } else { - logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found", - from, generation_number_opt.value()); - return; + co_return; } + } else { + logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found", + from, generation_number_opt.value()); + co_return; } - this->mark_as_shutdown(from); - }); + } + co_await this->mark_as_shutdown(from); } future @@ -545,7 +544,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta if (listener_notification) { logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node); // major state change will handle the update by inserting the remote state directly - this->handle_major_state_change(node, remote_state); + this->handle_major_state_change(node, remote_state).get(); } else { logger.debug("Applying remote_state for node {} (remote generation > local generation)", node); endpoint_state_map[node] = remote_state; @@ -583,7 +582,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta } } else { if (listener_notification) { - this->handle_major_state_change(node, remote_state); + this->handle_major_state_change(node, remote_state).get(); } else { logger.debug("Applying remote_state for node {} (new node)", node); endpoint_state_map[node] = remote_state; @@ -754,9 +753,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_ if (diff > max_duration) { logger.info("failure_detector_loop: Mark node {} as DOWN", node); co_await container().invoke_on(0, [node] (gms::gossiper& g) { - return seastar::async([node, &g] { - g.convict(node); - }); + return g.convict(node); }); co_return; } @@ -812,11 +809,9 @@ future<> gossiper::failure_detector_loop() { if (!nodes_down.empty()) { logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}", nodes, _live_endpoints, nodes_down); - co_await seastar::async([this, &nodes_down] { - for (const auto& node : nodes_down) { - convict(node); - } - }); + for (const auto& node : nodes_down) { + co_await convict(node); + } } // Make sure _live_endpoints do not change when nodes_down are being convicted above. This guarantees no down nodes will miss the convict. logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}, version_before={}, version_after={}", @@ -1046,17 +1041,15 @@ int64_t gossiper::get_endpoint_downtime(inet_address ep) const noexcept { // Depends on // - on_dead callbacks // It is called from failure_detector -// -// Runs inside seastar::async context -void gossiper::convict(inet_address endpoint) { +future<> gossiper::convict(inet_address endpoint) { auto* state = get_endpoint_state_for_endpoint_ptr(endpoint); if (!state || !state->is_alive()) { - return; + co_return; } if (is_shutdown(endpoint)) { - mark_as_shutdown(endpoint); + co_await mark_as_shutdown(endpoint); } else { - mark_dead(endpoint, *state); + co_await mark_dead(endpoint, *state); } } @@ -1155,45 +1148,41 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi } future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) { - return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] { - auto& state = get_endpoint_state(endpoint); - // remember this node's generation - int generation = state.get_heart_beat_state().get_generation(); - logger.info("Removing host: {}", host_id); - auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms()); - logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); - sleep_abortable(ring_delay, _abort_source).get(); - // make sure it did not change - auto& eps = get_endpoint_state(endpoint); - if (eps.get_heart_beat_state().get_generation() != generation) { - throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint)); - } + auto& state = get_endpoint_state(endpoint); + // remember this node's generation + int generation = state.get_heart_beat_state().get_generation(); + logger.info("Removing host: {}", host_id); + auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms()); + logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint); + co_await sleep_abortable(ring_delay, _abort_source); + // make sure it did not change + auto& eps = get_endpoint_state(endpoint); + if (eps.get_heart_beat_state().get_generation() != generation) { + throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint)); + } - // update the other node's generation to mimic it as if it had changed it itself - logger.info("Advertising removal for {}", endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id)); - eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id)); - endpoint_state_map[endpoint] = eps; - replicate(endpoint, eps).get(); - }); + // update the other node's generation to mimic it as if it had changed it itself + logger.info("Advertising removal for {}", endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id)); + eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id)); + endpoint_state_map[endpoint] = eps; + co_await replicate(endpoint, eps); } future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) { - return seastar::async([this, g = this->shared_from_this(), endpoint, host_id] { - auto& eps = get_endpoint_state(endpoint); - eps.update_timestamp(); // make sure we don't evict it too soon - eps.get_heart_beat_state().force_newer_generation_unsafe(); - auto expire_time = compute_expire_time(); - eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count())); - logger.info("Completing removal of {}", endpoint); - add_expire_time_for_endpoint(endpoint, expire_time); - endpoint_state_map[endpoint] = eps; - replicate(endpoint, eps).get(); - // ensure at least one gossip round occurs before returning - sleep_abortable(INTERVAL * 2, _abort_source).get(); - }); + auto& eps = get_endpoint_state(endpoint); + eps.update_timestamp(); // make sure we don't evict it too soon + eps.get_heart_beat_state().force_newer_generation_unsafe(); + auto expire_time = compute_expire_time(); + eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count())); + logger.info("Completing removal of {}", endpoint); + add_expire_time_for_endpoint(endpoint, expire_time); + endpoint_state_map[endpoint] = eps; + co_await replicate(endpoint, eps); + // ensure at least one gossip round occurs before returning + co_await sleep_abortable(INTERVAL * 2, _abort_source); } future<> gossiper::unsafe_assassinate_endpoint(sstring address) { @@ -1246,7 +1235,7 @@ future<> gossiper::assassinate_endpoint(sstring address) { std::unordered_set tokens_set(tokens.begin(), tokens.end()); auto expire_time = gossiper.compute_expire_time(); ep_state.add_application_state(application_state::STATUS, versioned_value::left(tokens_set, expire_time.time_since_epoch().count())); - gossiper.handle_major_state_change(endpoint, ep_state); + gossiper.handle_major_state_change(endpoint, ep_state).get(); sleep_abortable(INTERVAL * 4, gossiper._abort_source).get(); logger.warn("Finished assassinating {}", endpoint); }); @@ -1470,7 +1459,6 @@ void gossiper::update_timestamp_for_nodes(const std::map gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { logger.trace("marking as alive {}", addr); // Do not mark a node with status shutdown as UP. auto status = sstring(get_gossip_status(local_state)); if (status == sstring(versioned_value::SHUTDOWN)) { logger.warn("Skip marking node {} with status = {} as UP", addr, status); - return; + co_return; } local_state.mark_alive(); @@ -1534,13 +1520,13 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { auto it_ = std::find(_live_endpoints.begin(), _live_endpoints.end(), addr); bool was_live = it_ != _live_endpoints.end(); if (was_live) { - return; + co_return; } // Make a copy for endpoint_state because the code below can yield endpoint_state state = local_state; _live_endpoints.push_back(addr); - update_live_endpoints_version().get(); + co_await update_live_endpoints_version(); if (_endpoints_to_talk_with.empty()) { _endpoints_to_talk_with.push_back({addr}); } else { @@ -1551,29 +1537,27 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) { logger.info("InetAddress {} is now UP, status = {}", addr, status); } - _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { co_await subscriber->on_alive(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }).get(); + }); } -// Runs inside seastar::async context -void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { +future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) { logger.trace("marking as down {}", addr); local_state.mark_dead(); endpoint_state state = local_state; _live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr))); - update_live_endpoints_version().get(); + co_await update_live_endpoints_version(); _unreachable_endpoints[addr] = now(); logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state)); - _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { + co_await _subscribers.for_each([addr, state] (shared_ptr subscriber) -> future<> { co_await subscriber->on_dead(addr, state); logger.trace("Notified {}", fmt::ptr(subscriber.get())); - }).get(); + }); } -// Runs inside seastar::async context -void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) { +future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) { auto eps_old = get_endpoint_state_for_endpoint(ep); if (!is_dead_state(eps) && !is_in_shadow_round()) { if (endpoint_state_map.contains(ep)) { @@ -1584,7 +1568,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& } logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps)); endpoint_state_map[ep] = eps; - replicate(ep, eps).get(); + co_await replicate(ep, eps); if (is_in_shadow_round()) { // In shadow round, we only interested in the peer's endpoint_state, @@ -1593,14 +1577,14 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& // procedure with EchoMessage gossip message. We will do them during // normal gossip runs anyway. logger.debug("In shadow round addr={}, eps={}", ep, eps); - return; + co_return; } if (eps_old) { // the node restarted: it is up to the subscriber to take whatever action is necessary - _subscribers.for_each([ep, eps_old] (shared_ptr subscriber) { + co_await _subscribers.for_each([ep, eps_old] (shared_ptr subscriber) { return subscriber->on_restart(ep, *eps_old); - }).get(); + }); } auto& ep_state = endpoint_state_map.at(ep); @@ -1608,18 +1592,18 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& mark_alive(ep, ep_state); } else { logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps)); - mark_dead(ep, ep_state); + co_await mark_dead(ep, ep_state); } auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep); if (eps_new) { - _subscribers.for_each([ep, eps_new] (shared_ptr subscriber) { + co_await _subscribers.for_each([ep, eps_new] (shared_ptr subscriber) { return subscriber->on_join(ep, *eps_new); - }).get(); + }); } // check this at the end so nodes will learn about the endpoint if (is_shutdown(ep)) { - mark_as_shutdown(ep); + co_await mark_as_shutdown(ep); } } @@ -1680,7 +1664,7 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, // Listeners should decide which failures are non-fatal and swallow them. auto run_listeners = seastar::defer([&] () noexcept { for (auto&& key : changed) { - do_on_change_notifications(addr, key, remote_map.at(key)); + do_on_change_notifications(addr, key, remote_map.at(key)).get(); } }); @@ -1713,18 +1697,16 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, } } -// Runs inside seastar::async context -void gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { - _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { +future<> gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) { + co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr subscriber) { return subscriber->before_change(addr, ep_state, ap_state, new_value); - }).get(); + }); } -// Runs inside seastar::async context -void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { - _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { +future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { + co_await _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { return subscriber->on_change(addr, state, value); - }).get(); + }); } void gossiper::request_all(gossip_digest& g_digest, @@ -2056,7 +2038,7 @@ future<> gossiper::add_local_application_state(std::list gossiper::add_local_application_state(std::list gossiper::mark_as_shutdown(const inet_address& endpoint) { auto es = get_endpoint_state_for_endpoint_ptr(endpoint); if (es) { auto& ep_state = *es; ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); - replicate(endpoint, ep_state).get(); - mark_dead(endpoint, ep_state); - convict(endpoint); + co_await replicate(endpoint, ep_state); + co_await mark_dead(endpoint, ep_state); + co_await convict(endpoint); } } @@ -2307,42 +2288,40 @@ future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std: if (force_after && *force_after == 0) { logger.warn("Skipped to wait for gossip to settle by user request since skip_wait_for_gossip_to_settle is set zero. Do not use this in production!"); - return make_ready_future<>(); + co_return; } - return seastar::async([this, initial_delay, force_after] { - int32_t total_polls = 0; - int32_t num_okay = 0; - int32_t ep_size = endpoint_state_map.size(); + int32_t total_polls = 0; + int32_t num_okay = 0; + int32_t ep_size = endpoint_state_map.size(); - auto delay = initial_delay; + auto delay = initial_delay; - sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source).get(); - while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { - sleep_abortable(delay, _abort_source).get(); - delay = GOSSIP_SETTLE_POLL_INTERVAL_MS; + co_await sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source); + while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { + co_await sleep_abortable(delay, _abort_source); + delay = GOSSIP_SETTLE_POLL_INTERVAL_MS; - int32_t current_size = endpoint_state_map.size(); - total_polls++; - if (current_size == ep_size && _msg_processing == 0) { - logger.debug("Gossip looks settled"); - num_okay++; - } else { - logger.info("Gossip not settled after {} polls.", total_polls); - num_okay = 0; - } - ep_size = current_size; - if (force_after && *force_after > 0 && total_polls > *force_after) { - logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls); - break; - } - } - if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { - logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); + int32_t current_size = endpoint_state_map.size(); + total_polls++; + if (current_size == ep_size && _msg_processing == 0) { + logger.debug("Gossip looks settled"); + num_okay++; } else { - logger.info("No gossip backlog; proceeding"); + logger.info("Gossip not settled after {} polls.", total_polls); + num_okay = 0; } - }); + ep_size = current_size; + if (force_after && *force_after > 0 && total_polls > *force_after) { + logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls); + break; + } + } + if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) { + logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED); + } else { + logger.info("No gossip backlog; proceeding"); + } } future<> gossiper::wait_for_gossip_to_settle() { diff --git a/gms/gossiper.hh b/gms/gossiper.hh index a32887d8d1..40603a01de 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -260,7 +260,7 @@ public: /** * @param endpoint end point that is convicted. */ - void convict(inet_address endpoint); + future<> convict(inet_address endpoint); /** * Return either: the greatest heartbeat or application state @@ -401,9 +401,9 @@ private: void mark_alive(inet_address addr, endpoint_state& local_state); - void real_mark_alive(inet_address addr, endpoint_state& local_state); + future<> real_mark_alive(inet_address addr, endpoint_state& local_state); - void mark_dead(inet_address addr, endpoint_state& local_state); + future<> mark_dead(inet_address addr, endpoint_state& local_state); /** * This method is called whenever there is a "big" change in ep state (a generation change for a known node). @@ -411,7 +411,7 @@ private: * @param ep endpoint * @param ep_state EndpointState for the endpoint */ - void handle_major_state_change(inet_address ep, const endpoint_state& eps); + future<> handle_major_state_change(inet_address ep, const endpoint_state& eps); public: bool is_alive(inet_address ep) const; @@ -430,10 +430,10 @@ private: void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state); // notify that a local application state is going to change (doesn't get triggered for remote changes) - void do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); + future<> do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); // notify that an application state has changed - void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); + future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); /* Request all the state for the endpoint in the g_digest */ void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, int remote_generation); @@ -554,7 +554,7 @@ public: bool is_normal_ring_member(const inet_address& endpoint) const; bool is_cql_ready(const inet_address& endpoint) const; bool is_silent_shutdown_state(const endpoint_state& ep_state) const; - void mark_as_shutdown(const inet_address& endpoint); + future<> mark_as_shutdown(const inet_address& endpoint); void force_newer_generation(); public: std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;