gms: gossiper: coroutinize do_apply_state_locally

Signed-off-by: Pavel Solodovnikov <pa.solodovnikov@scylladb.com>
This commit is contained in:
Pavel Solodovnikov
2022-01-16 15:49:50 +03:00
parent c48dcf607a
commit b7322c3f5d
2 changed files with 8 additions and 9 deletions

View File

@@ -532,11 +532,10 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address>
}
// Runs inside seastar::async context
void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) {
future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) {
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = this->lock_endpoint(node).get0();
auto permit = co_await this->lock_endpoint(node);
auto es = this->get_endpoint_state_for_endpoint_ptr(node);
if (es) {
endpoint_state& local_state = *es;
@@ -551,7 +550,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
if (listener_notification) {
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
// major state change will handle the update by inserting the remote state directly
this->handle_major_state_change(node, remote_state).get();
co_await this->handle_major_state_change(node, remote_state);
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
endpoint_state_map[node] = remote_state;
@@ -563,7 +562,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
if (remote_max_version > local_max_version) {
// apply states, but do not notify since there is no major change
this->apply_new_states(node, local_state, remote_state).get();
co_await this->apply_new_states(node, local_state, remote_state);
} else {
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
}
@@ -589,7 +588,7 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
}
} else {
if (listener_notification) {
this->handle_major_state_change(node, remote_state).get();
co_await this->handle_major_state_change(node, remote_state);
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
endpoint_state_map[node] = remote_state;
@@ -602,7 +601,7 @@ void gossiper::apply_state_locally_without_listener_notification(std::unordered_
for (auto& x : map) {
const inet_address& node = x.first;
const endpoint_state& remote_state = x.second;
do_apply_state_locally(node, remote_state, false);
do_apply_state_locally(node, remote_state, false).get();
}
}
@@ -625,7 +624,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
}
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, &ep, &map] () mutable {
return seastar::async([this, &ep, &map] () mutable {
do_apply_state_locally(ep, map[ep], true);
do_apply_state_locally(ep, map[ep], true).get();
});
});
});

View File

@@ -438,7 +438,7 @@ public:
// filter `endpoints` by `local_rack`
inet_address_vector_replica_set endpoint_filter(const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints);
private:
void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification);
future<> do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification);
void apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map);
future<> apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);