mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-28 18:50:53 +00:00
gossiper: coroutinize do_stop_gossiping
Simplify the function. It does not need to spawn a seastar thread. While at it, declare it as private since it's called only internally by the gossiper (and on shard 0). Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -2234,9 +2234,9 @@ future<> gossiper::do_stop_gossiping() {
|
||||
// before _enabled is set to false down below.
|
||||
if (!_enabled) {
|
||||
logger.info("gossip is already stopped");
|
||||
return make_ready_future<>();
|
||||
co_return;
|
||||
}
|
||||
return seastar::async([this, g = this->shared_from_this()] {
|
||||
// FIXME: indentation
|
||||
auto my_ep_state = get_endpoint_state_ptr(get_broadcast_address());
|
||||
if (my_ep_state) {
|
||||
logger.info("My status = {}", get_gossip_status(*my_ep_state));
|
||||
@@ -2244,36 +2244,33 @@ future<> gossiper::do_stop_gossiping() {
|
||||
if (my_ep_state && !is_silent_shutdown_state(*my_ep_state)) {
|
||||
auto local_generation = my_ep_state->get_heart_beat_state().get_generation();
|
||||
logger.info("Announcing shutdown");
|
||||
add_local_application_state(application_state::STATUS, versioned_value::shutdown(true)).get();
|
||||
co_await add_local_application_state(application_state::STATUS, versioned_value::shutdown(true));
|
||||
auto live_endpoints = _live_endpoints;
|
||||
for (inet_address addr : live_endpoints) {
|
||||
msg_addr id = get_msg_addr(addr);
|
||||
logger.info("Sending a GossipShutdown to {} with generation {}", id.addr, local_generation);
|
||||
_messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation.value()).then_wrapped([id] (auto&&f) {
|
||||
try {
|
||||
f.get();
|
||||
try {
|
||||
co_await _messaging.send_gossip_shutdown(id, get_broadcast_address(), local_generation.value());
|
||||
logger.trace("Got GossipShutdown Reply");
|
||||
} catch (...) {
|
||||
logger.warn("Fail to send GossipShutdown to {}: {}", id, std::current_exception());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}).get();
|
||||
}
|
||||
sleep(std::chrono::milliseconds(_gcfg.shutdown_announce_ms)).get();
|
||||
co_await sleep(std::chrono::milliseconds(_gcfg.shutdown_announce_ms));
|
||||
} else {
|
||||
logger.warn("No local state or state is in silent shutdown, not announcing shutdown");
|
||||
}
|
||||
logger.info("Disable and wait for gossip loop started");
|
||||
// Set disable flag and cancel the timer makes sure gossip loop will not be scheduled
|
||||
container().invoke_on_all([] (gms::gossiper& g) {
|
||||
co_await container().invoke_on_all([] (gms::gossiper& g) {
|
||||
g._enabled = false;
|
||||
}).get();
|
||||
});
|
||||
_scheduled_gossip_task.cancel();
|
||||
// Take the semaphore makes sure existing gossip loop is finished
|
||||
get_units(_callback_running, 1).get0();
|
||||
container().invoke_on_all([] (auto& g) {
|
||||
auto units = co_await get_units(_callback_running, 1);
|
||||
co_await container().invoke_on_all([] (auto& g) {
|
||||
return std::move(g._failure_detector_loop_done);
|
||||
}).get();
|
||||
});
|
||||
logger.info("Gossip is now stopped");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -598,6 +598,8 @@ public:
|
||||
|
||||
private:
|
||||
void build_seeds_list();
|
||||
// Must be called on shard 0
|
||||
future<> do_stop_gossiping();
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -622,7 +624,6 @@ public:
|
||||
future<> shutdown();
|
||||
// Needed by seastar::sharded
|
||||
future<> stop();
|
||||
future<> do_stop_gossiping();
|
||||
|
||||
public:
|
||||
bool is_enabled() const;
|
||||
|
||||
Reference in New Issue
Block a user