diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 808d458018..62c80c1f1b 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1678,7 +1678,7 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, // Listeners should decide which failures are non-fatal and swallow them. auto run_listeners = seastar::defer([&] () noexcept { for (auto&& key : changed) { - do_on_change_notifications(addr, key, remote_map.at(key)); + do_on_change_notifications(addr, key, remote_map.at(key)).get(); } }); @@ -1717,11 +1717,10 @@ future<> gossiper::do_before_change_notifications(inet_address addr, const endpo }); } -// Runs inside seastar::async context -void gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { - _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { +future<> gossiper::do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value) { + co_await _subscribers.for_each([addr, state, value] (shared_ptr subscriber) { return subscriber->on_change(addr, state, value); - }).get(); + }); } void gossiper::request_all(gossip_digest& g_digest, @@ -2079,7 +2078,7 @@ future<> gossiper::add_local_application_state(std::list do_before_change_notifications(inet_address addr, const endpoint_state& ep_state, const application_state& ap_state, const versioned_value& new_value); // notify that an application state has changed - void do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); + future<> do_on_change_notifications(inet_address addr, const application_state& state, const versioned_value& value); /* Request all the state for the endpoint in the g_digest */ void request_all(gossip_digest& g_digest, utils::chunked_vector& delta_gossip_digest_list, int remote_generation);