diff --git a/gms/gossiper.cc b/gms/gossiper.cc index aac03a12e8..30dcf46172 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -34,6 +34,7 @@ #include #include #include +#include #include #include "locator/host_id.hh" #include @@ -717,11 +718,12 @@ future<> gossiper::apply_state_locally(std::map ma } future<> gossiper::force_remove_endpoint(locator::host_id id, permit_id pid) { - return container().invoke_on(0, [this, pid, id] (auto& gossiper) mutable -> future<> { + co_await coroutine::switch_to(_gcfg.gossip_scheduling_group); + co_await container().invoke_on(0, [pid, id] (auto& gossiper) mutable -> future<> { auto permit = co_await gossiper.lock_endpoint(id, pid); pid = permit.id(); try { - if (id == my_host_id()) { + if (id == gossiper.my_host_id()) { throw std::runtime_error(format("Can not force remove node {} itself", id)); } if (!gossiper._endpoint_state_map.contains(id)) { @@ -891,6 +893,10 @@ gossiper::endpoint_lock_entry::endpoint_lock_entry() noexcept {} future gossiper::lock_endpoint(locator::host_id ep, permit_id pid, seastar::compat::source_location l) { + if (current_scheduling_group() != _gcfg.gossip_scheduling_group) { + logger.warn("Incorrect scheduling group used for gossiper::lock_endpoint: {}, should be {}, backtrace {}", current_scheduling_group().name(), _gcfg.gossip_scheduling_group.name(), current_backtrace()); + } + if (this_shard_id() != 0) { on_internal_error(logger, "lock_endpoint must be called on shard 0"); } @@ -2082,6 +2088,7 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l } future<> gossiper::start_gossiping(gms::generation_type generation_nbr, application_state_map preload_local_states) { + co_await coroutine::switch_to(_gcfg.gossip_scheduling_group); auto permit = co_await lock_endpoint(my_host_id(), null_permit_id); build_seeds_list(); @@ -2141,6 +2148,7 @@ future<> gossiper::advertise_to_nodes(generation_for_nodes advertise_to_nodes) { } future<> gossiper::do_shadow_round(std::unordered_set nodes, mandatory is_mandatory) { + co_await coroutine::switch_to(_gcfg.gossip_scheduling_group); nodes.erase(get_broadcast_address()); gossip_get_endpoint_states_request request{{ gms::application_state::STATUS, @@ -2227,6 +2235,7 @@ void gossiper::build_seeds_list() { } future<> gossiper::add_saved_endpoint(locator::host_id host_id, gms::loaded_endpoint_state st, permit_id pid) { + co_await coroutine::switch_to(_gcfg.gossip_scheduling_group); if (host_id == my_host_id()) { logger.debug("Attempt to add self as saved endpoint"); co_return; @@ -2299,7 +2308,8 @@ future<> gossiper::add_local_application_state(application_state_map states) { co_return; } try { - co_await container().invoke_on(0, [&] (gossiper& gossiper) mutable -> future<> { + co_await coroutine::switch_to(_gcfg.gossip_scheduling_group); + co_await container().invoke_on(0, [&](gossiper& gossiper) mutable -> future<> { inet_address ep_addr = gossiper.get_broadcast_address(); auto ep_id = gossiper.my_host_id(); // for symmetry with other apply, use endpoint lock for our own address.