Merge 'gms: gossiper: coroutinize code (continued)' from Pavel Solodovnikov

This series continues the effort of https://github.com/scylladb/scylla/pull/9844 to reduce `seastar::async` usage and coroutinize in the gossiper code.

There are mostly trivial conversions from using `.get()` to `co_await`, where appropriate, as well, as elimination of `seastar::async()` wrappers.

A few more functions are not yet converted, though (e.g. `apply_new_states`, `do_apply_state_locally`, `apply_state_locally`, `apply_state_locally_without_listener_notification`, maybe a few others, as well).

The motivation is to be able to call every public API function of `gossiper` class without requiring `seastar::async` context.

Tests: unit(debug, dev), dtest (topology-related tests)

Closes #10032

* github.com:scylladb/scylla:
  gms: gossiper: coroutinize `wait_for_gossip`
  gms: gossiper: coroutinize `advertise_token_removed`
  gms: gossiper: coroutinize `advertise_removing`
  gms: gossiper: don't wrap `convict` calls into `seastar::async`
  gms: gossiper: coroutinize `handle_major_state_change`
  gms: gossiper: coroutinize `handle_shutdown_msg`
  gms: gossiper: coroutinize `mark_as_shutdown` and `convict`
  gms: gossiper: remove comment about requiring thread context in `mark_alive`
  gms: gossiper: don't use `seastar::async` in `mark_alive`
  gms: gossiper: coroutinize `do_on_change_notifications`
  gms: gossiper: coroutinize `do_before_change_notifications`
  gms: gossiper: coroutinize `real_mark_alive`
  gms: gossiper: coroutinize `mark_dead`
This commit is contained in:
Avi Kivity
2022-02-13 11:51:44 +02:00
2 changed files with 138 additions and 159 deletions

View File

@@ -404,29 +404,28 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t
future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t> generation_number_opt) {
if (!is_enabled()) {
logger.debug("Ignoring shutdown message from {} because gossip is disabled", from);
return make_ready_future<>();
co_return;
}
return seastar::async([this, from, generation_number_opt] {
auto permit = this->lock_endpoint(from).get0();
if (generation_number_opt) {
auto es = this->get_endpoint_state_for_endpoint_ptr(from);
if (es) {
int local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
auto permit = co_await this->lock_endpoint(from);
if (generation_number_opt) {
auto es = this->get_endpoint_state_for_endpoint_ptr(from);
if (es) {
int local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
if (local_generation != generation_number_opt.value()) {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
if (local_generation != generation_number_opt.value()) {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation={}",
from, generation_number_opt.value(), local_generation);
return;
}
} else {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found",
from, generation_number_opt.value());
return;
co_return;
}
} else {
logger.warn("Ignoring shutdown message from {} because generation number does not match, received_generation={}, local_generation=not found",
from, generation_number_opt.value());
co_return;
}
this->mark_as_shutdown(from);
});
}
co_await this->mark_as_shutdown(from);
}
future<gossip_get_endpoint_states_response>
@@ -545,7 +544,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
if (listener_notification) {
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
// major state change will handle the update by inserting the remote state directly
this->handle_major_state_change(node, remote_state);
this->handle_major_state_change(node, remote_state).get();
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
endpoint_state_map[node] = remote_state;
@@ -583,7 +582,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
}
} else {
if (listener_notification) {
this->handle_major_state_change(node, remote_state);
this->handle_major_state_change(node, remote_state).get();
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
endpoint_state_map[node] = remote_state;
@@ -754,9 +753,7 @@ future<> gossiper::failure_detector_loop_for_node(gms::inet_address node, int64_
if (diff > max_duration) {
logger.info("failure_detector_loop: Mark node {} as DOWN", node);
co_await container().invoke_on(0, [node] (gms::gossiper& g) {
return seastar::async([node, &g] {
g.convict(node);
});
return g.convict(node);
});
co_return;
}
@@ -812,11 +809,9 @@ future<> gossiper::failure_detector_loop() {
if (!nodes_down.empty()) {
logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}",
nodes, _live_endpoints, nodes_down);
co_await seastar::async([this, &nodes_down] {
for (const auto& node : nodes_down) {
convict(node);
}
});
for (const auto& node : nodes_down) {
co_await convict(node);
}
}
// Make sure _live_endpoints do not change when nodes_down are being convicted above. This guarantees no down nodes will miss the convict.
logger.debug("failure_detector_loop: previous_live_nodes={}, current_live_nodes={}, nodes_down={}, version_before={}, version_after={}",
@@ -1046,17 +1041,15 @@ int64_t gossiper::get_endpoint_downtime(inet_address ep) const noexcept {
// Depends on
// - on_dead callbacks
// It is called from failure_detector
//
// Runs inside seastar::async context
void gossiper::convict(inet_address endpoint) {
future<> gossiper::convict(inet_address endpoint) {
auto* state = get_endpoint_state_for_endpoint_ptr(endpoint);
if (!state || !state->is_alive()) {
return;
co_return;
}
if (is_shutdown(endpoint)) {
mark_as_shutdown(endpoint);
co_await mark_as_shutdown(endpoint);
} else {
mark_dead(endpoint, *state);
co_await mark_dead(endpoint, *state);
}
}
@@ -1155,45 +1148,41 @@ future<> gossiper::replicate(inet_address ep, application_state key, const versi
}
future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] {
auto& state = get_endpoint_state(endpoint);
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms());
logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint);
sleep_abortable(ring_delay, _abort_source).get();
// make sure it did not change
auto& eps = get_endpoint_state(endpoint);
if (eps.get_heart_beat_state().get_generation() != generation) {
throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint));
}
auto& state = get_endpoint_state(endpoint);
// remember this node's generation
int generation = state.get_heart_beat_state().get_generation();
logger.info("Removing host: {}", host_id);
auto ring_delay = std::chrono::milliseconds(_cfg.ring_delay_ms());
logger.info("Sleeping for {}ms to ensure {} does not change", ring_delay.count(), endpoint);
co_await sleep_abortable(ring_delay, _abort_source);
// make sure it did not change
auto& eps = get_endpoint_state(endpoint);
if (eps.get_heart_beat_state().get_generation() != generation) {
throw std::runtime_error(format("Endpoint {} generation changed while trying to remove it", endpoint));
}
// update the other node's generation to mimic it as if it had changed it itself
logger.info("Advertising removal for {}", endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id));
eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id));
endpoint_state_map[endpoint] = eps;
replicate(endpoint, eps).get();
});
// update the other node's generation to mimic it as if it had changed it itself
logger.info("Advertising removal for {}", endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
eps.add_application_state(application_state::STATUS, versioned_value::removing_nonlocal(host_id));
eps.add_application_state(application_state::REMOVAL_COORDINATOR, versioned_value::removal_coordinator(local_host_id));
endpoint_state_map[endpoint] = eps;
co_await replicate(endpoint, eps);
}
future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_id) {
return seastar::async([this, g = this->shared_from_this(), endpoint, host_id] {
auto& eps = get_endpoint_state(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
replicate(endpoint, eps).get();
// ensure at least one gossip round occurs before returning
sleep_abortable(INTERVAL * 2, _abort_source).get();
});
auto& eps = get_endpoint_state(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
co_await replicate(endpoint, eps);
// ensure at least one gossip round occurs before returning
co_await sleep_abortable(INTERVAL * 2, _abort_source);
}
future<> gossiper::unsafe_assassinate_endpoint(sstring address) {
@@ -1246,7 +1235,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
std::unordered_set<dht::token> tokens_set(tokens.begin(), tokens.end());
auto expire_time = gossiper.compute_expire_time();
ep_state.add_application_state(application_state::STATUS, versioned_value::left(tokens_set, expire_time.time_since_epoch().count()));
gossiper.handle_major_state_change(endpoint, ep_state);
gossiper.handle_major_state_change(endpoint, ep_state).get();
sleep_abortable(INTERVAL * 4, gossiper._abort_source).get();
logger.warn("Finished assassinating {}", endpoint);
});
@@ -1470,7 +1459,6 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
}
}
// Runs inside seastar::async context
void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
// if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) {
// real_mark_alive(addr, local_state);
@@ -1493,19 +1481,18 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
// Do it in the background.
(void)_messaging.send_gossip_echo(id, generation, std::chrono::milliseconds(15000)).then([this, addr] {
logger.trace("Got EchoMessage Reply");
return seastar::async([this, addr] {
// After sending echo message, the Node might not be in the
// endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto es = get_endpoint_state_for_endpoint_ptr(addr);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", addr);
} else {
endpoint_state& state = *es;
logger.debug("Mark Node {} alive after EchoMessage", addr);
real_mark_alive(addr, state);
}
});
// After sending echo message, the Node might not be in the
// endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto es = get_endpoint_state_for_endpoint_ptr(addr);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", addr);
} else {
endpoint_state& state = *es;
logger.debug("Mark Node {} alive after EchoMessage", addr);
return real_mark_alive(addr, state);
}
return make_ready_future();
}).finally([this, addr] {
_pending_mark_alive_endpoints.erase(addr);
}).handle_exception([addr] (auto ep) {
@@ -1513,15 +1500,14 @@ void gossiper::mark_alive(inet_address addr, endpoint_state& local_state) {
});
}
// Runs inside seastar::async context
void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
future<> gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as alive {}", addr);
// Do not mark a node with status shutdown as UP.
auto status = sstring(get_gossip_status(local_state));
if (status == sstring(versioned_value::SHUTDOWN)) {
logger.warn("Skip marking node {} with status = {} as UP", addr, status);
return;
co_return;
}
local_state.mark_alive();
@@ -1534,13 +1520,13 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
auto it_ = std::find(_live_endpoints.begin(), _live_endpoints.end(), addr);
bool was_live = it_ != _live_endpoints.end();
if (was_live) {
return;
co_return;
}
// Make a copy for endpoint_state because the code below can yield
endpoint_state state = local_state;
_live_endpoints.push_back(addr);
update_live_endpoints_version().get();
co_await update_live_endpoints_version();
if (_endpoints_to_talk_with.empty()) {
_endpoints_to_talk_with.push_back({addr});
} else {
@@ -1551,29 +1537,27 @@ void gossiper::real_mark_alive(inet_address addr, endpoint_state& local_state) {
logger.info("InetAddress {} is now UP, status = {}", addr, status);
}
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await _subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_alive(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
}).get();
});
}
// Runs inside seastar::async context
void gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state) {
logger.trace("marking as down {}", addr);
local_state.mark_dead();
endpoint_state state = local_state;
_live_endpoints.resize(std::distance(_live_endpoints.begin(), std::remove(_live_endpoints.begin(), _live_endpoints.end(), addr)));
update_live_endpoints_version().get();
co_await update_live_endpoints_version();
_unreachable_endpoints[addr] = now();
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
_subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await _subscribers.for_each([addr, state] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) -> future<> {
co_await subscriber->on_dead(addr, state);
logger.trace("Notified {}", fmt::ptr(subscriber.get()));
}).get();
});
}
// Runs inside seastar::async context
void gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) {
future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_state& eps) {
auto eps_old = get_endpoint_state_for_endpoint(ep);
if (!is_dead_state(eps) && !is_in_shadow_round()) {
if (endpoint_state_map.contains(ep)) {
@@ -1584,7 +1568,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
endpoint_state_map[ep] = eps;
replicate(ep, eps).get();
co_await replicate(ep, eps);
if (is_in_shadow_round()) {
// In shadow round, we only interested in the peer's endpoint_state,
@@ -1593,14 +1577,14 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
// procedure with EchoMessage gossip message. We will do them during
// normal gossip runs anyway.
logger.debug("In shadow round addr={}, eps={}", ep, eps);
return;
co_return;
}
if (eps_old) {
// the node restarted: it is up to the subscriber to take whatever action is necessary
_subscribers.for_each([ep, eps_old] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
co_await _subscribers.for_each([ep, eps_old] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_restart(ep, *eps_old);
}).get();
});
}
auto& ep_state = endpoint_state_map.at(ep);
@@ -1608,18 +1592,18 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
mark_alive(ep, ep_state);
} else {
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
mark_dead(ep, ep_state);
co_await mark_dead(ep, ep_state);
}
auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep);
if (eps_new) {
_subscribers.for_each([ep, eps_new] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
co_await _subscribers.for_each([ep, eps_new] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_join(ep, *eps_new);
}).get();
});
}
// check this at the end so nodes will learn about the endpoint
if (is_shutdown(ep)) {
mark_as_shutdown(ep);
co_await mark_as_shutdown(ep);
}
}
@@ -1680,7 +1664,7 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state,
// Listeners should decide which failures are non-fatal and swallow them.
auto run_listeners = seastar::defer([&] () noexcept {
for (auto&& key : changed) {
do_on_change_notifications(addr, key, remote_map.at(key));
do_on_change_notifications(addr, key, remote_map.at(key)).get();
}
});
@@ -1713,18 +1697,16 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state,
}
}
// Runs inside seastar::async context
void gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) {
_subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
future<> gossiper::do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value) {
co_await _subscribers.for_each([addr, ep_state, ap_state, new_value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->before_change(addr, ep_state, ap_state, new_value);
}).get();
});
}
// Runs inside seastar::async context
void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
_subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
co_await _subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_change(addr, state, value);
}).get();
});
}
void gossiper::request_all(gossip_digest& g_digest,
@@ -2056,7 +2038,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
auto& value = p.second;
// Fire "before change" notifications:
// Not explicit, but apparently we allow this to defer (inside out implicit seastar::async)
gossiper.do_before_change_notifications(ep_addr, ep_state_before, state, value);
gossiper.do_before_change_notifications(ep_addr, ep_state_before, state, value).get();
}
es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
@@ -2082,7 +2064,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
// ensured the whole set of values are monotonically versioned and
// applied to endpoint state.
gossiper.replicate(ep_addr, state, value).get();
gossiper.do_on_change_notifications(ep_addr, state, value);
gossiper.do_on_change_notifications(ep_addr, state, value).get();
}
}).handle_exception([] (auto ep) {
logger.warn("Fail to apply application_state: {}", ep);
@@ -2257,16 +2239,15 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
* This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it
* @param endpoint endpoint that has shut itself down
*/
// Runs inside seastar::async context
void gossiper::mark_as_shutdown(const inet_address& endpoint) {
future<> gossiper::mark_as_shutdown(const inet_address& endpoint) {
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
if (es) {
auto& ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
replicate(endpoint, ep_state).get();
mark_dead(endpoint, ep_state);
convict(endpoint);
co_await replicate(endpoint, ep_state);
co_await mark_dead(endpoint, ep_state);
co_await convict(endpoint);
}
}
@@ -2307,42 +2288,40 @@ future<> gossiper::wait_for_gossip(std::chrono::milliseconds initial_delay, std:
if (force_after && *force_after == 0) {
logger.warn("Skipped to wait for gossip to settle by user request since skip_wait_for_gossip_to_settle is set zero. Do not use this in production!");
return make_ready_future<>();
co_return;
}
return seastar::async([this, initial_delay, force_after] {
int32_t total_polls = 0;
int32_t num_okay = 0;
int32_t ep_size = endpoint_state_map.size();
int32_t total_polls = 0;
int32_t num_okay = 0;
int32_t ep_size = endpoint_state_map.size();
auto delay = initial_delay;
auto delay = initial_delay;
sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source).get();
while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
sleep_abortable(delay, _abort_source).get();
delay = GOSSIP_SETTLE_POLL_INTERVAL_MS;
co_await sleep_abortable(GOSSIP_SETTLE_MIN_WAIT_MS, _abort_source);
while (num_okay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
co_await sleep_abortable(delay, _abort_source);
delay = GOSSIP_SETTLE_POLL_INTERVAL_MS;
int32_t current_size = endpoint_state_map.size();
total_polls++;
if (current_size == ep_size && _msg_processing == 0) {
logger.debug("Gossip looks settled");
num_okay++;
} else {
logger.info("Gossip not settled after {} polls.", total_polls);
num_okay = 0;
}
ep_size = current_size;
if (force_after && *force_after > 0 && total_polls > *force_after) {
logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls);
break;
}
}
if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
int32_t current_size = endpoint_state_map.size();
total_polls++;
if (current_size == ep_size && _msg_processing == 0) {
logger.debug("Gossip looks settled");
num_okay++;
} else {
logger.info("No gossip backlog; proceeding");
logger.info("Gossip not settled after {} polls.", total_polls);
num_okay = 0;
}
});
ep_size = current_size;
if (force_after && *force_after > 0 && total_polls > *force_after) {
logger.warn("Gossip not settled but startup forced by skip_wait_for_gossip_to_settle. Gossp total polls: {}", total_polls);
break;
}
}
if (total_polls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED) {
logger.info("Gossip settled after {} extra polls; proceeding", total_polls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
} else {
logger.info("No gossip backlog; proceeding");
}
}
future<> gossiper::wait_for_gossip_to_settle() {

View File

@@ -260,7 +260,7 @@ public:
/**
* @param endpoint end point that is convicted.
*/
void convict(inet_address endpoint);
future<> convict(inet_address endpoint);
/**
* Return either: the greatest heartbeat or application state
@@ -401,9 +401,9 @@ private:
void mark_alive(inet_address addr, endpoint_state& local_state);
void real_mark_alive(inet_address addr, endpoint_state& local_state);
future<> real_mark_alive(inet_address addr, endpoint_state& local_state);
void mark_dead(inet_address addr, endpoint_state& local_state);
future<> mark_dead(inet_address addr, endpoint_state& local_state);
/**
* This method is called whenever there is a "big" change in ep state (a generation change for a known node).
@@ -411,7 +411,7 @@ private:
* @param ep endpoint
* @param ep_state EndpointState for the endpoint
*/
void handle_major_state_change(inet_address ep, const endpoint_state& eps);
future<> handle_major_state_change(inet_address ep, const endpoint_state& eps);
public:
bool is_alive(inet_address ep) const;
@@ -430,10 +430,10 @@ private:
void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);
// notify that a local application state is going to change (doesn't get triggered for remote changes)
void do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value);
future<> do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value);
// notify that an application state has changed
void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
/* Request all the state for the endpoint in the g_digest */
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, int remote_generation);
@@ -554,7 +554,7 @@ public:
bool is_normal_ring_member(const inet_address& endpoint) const;
bool is_cql_ready(const inet_address& endpoint) const;
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
void mark_as_shutdown(const inet_address& endpoint);
future<> mark_as_shutdown(const inet_address& endpoint);
void force_newer_generation();
public:
std::string_view get_gossip_status(const endpoint_state& ep_state) const noexcept;