gms: gossiper: coroutinize do_on_change_notifications

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2022-01-16 13:59:30 +03:00
parent 37066039df
commit 529f4d0f98
2 changed files with 6 additions and 7 deletions

View File

@@ -1678,7 +1678,7 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state,
// Listeners should decide which failures are non-fatal and swallow them.
auto run_listeners = seastar::defer([&] () noexcept {
for (auto&& key : changed) {
do_on_change_notifications(addr, key, remote_map.at(key));
do_on_change_notifications(addr, key, remote_map.at(key)).get();
}
});
@@ -1717,11 +1717,10 @@ future<> gossiper::do_before_change_notifications(inet_address addr, const endpo
});
}
// Runs inside seastar::async context
void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
_subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) {
co_await _subscribers.for_each([addr, state, value] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_change(addr, state, value);
}).get();
});
}
void gossiper::request_all(gossip_digest& g_digest,
@@ -2079,7 +2078,7 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
// ensured the whole set of values are monotonically versioned and
// applied to endpoint state.
gossiper.replicate(ep_addr, state, value).get();
gossiper.do_on_change_notifications(ep_addr, state, value);
gossiper.do_on_change_notifications(ep_addr, state, value).get();
}
}).handle_exception([] (auto ep) {
logger.warn("Fail to apply application_state: {}", ep);

View File

@@ -433,7 +433,7 @@ private:
future<> do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value);
// notify that an application state has changed
void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value);
/* Request all the state for the endpoint in the g_digest */
void request_all(gossip_digest& g_digest, utils::chunked_vector<gossip_digest>& delta_gossip_digest_list, int remote_generation);