From ce2b8724ed4b775ff03240282923eca4d1e15f08 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 19 Aug 2023 09:38:20 +0300 Subject: [PATCH] gossiper: replicate_live_endpoints_on_change: use _live_endpoints_version to detect change Rather than keeping an expensive copy of `_live_endpoint` and `_unreachable_endpoints` in shadow members, while they aren't currently used for their content anyhow, just to detect when their corresponding members change. With that, it is renamed to replicate_live_and_unreachable_endpoints. This still doesn't provide strong exception safety guarantees, but at least we don't "cheat" about shard state and we don't mark shard 0 state as "replicated" by updating the shadow members. Also, we save some unneeded allocations. Refs scylladb/scylladb#15089 Refs scylladb/scylladb#15088 Signed-off-by: Benny Halevy --- gms/gossiper.cc | 58 +++++++++++++++++++------------------------------ gms/gossiper.hh | 6 ++--- 2 files changed, 25 insertions(+), 39 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 846ca12e74..470332adef 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -947,44 +947,32 @@ future<> gossiper::replicate_live_endpoints_on_change() { // // Gossiper task runs only on CPU0: // - // - If endpoint_state_map or _live_endpoints have changed - replicate - // them across all other shards. - // - Reschedule the gossiper only after execution on all nodes is done. + // - replicate _live_endpoints and _unreachable_endpoints + // across all other shards. + // - use _live_endpoints_version on each shard + // to determine if it has the latest copy, and replicate the respective + // member from shard 0, if the shard is outdated. // - bool live_endpoint_changed = (_live_endpoints != _shadow_live_endpoints); - bool unreachable_endpoint_changed = (_unreachable_endpoints != _shadow_unreachable_endpoints); + logger.debug("replicating live and unreachable endpoints to other shards"); - if (live_endpoint_changed || unreachable_endpoint_changed) { - logger.debug("replicating live endpoints to other shards"); + co_await container().invoke_on_others([this, + es = _endpoint_state_map] (gossiper& local_gossiper) { + if (local_gossiper._live_endpoints_version != _live_endpoints_version) { + local_gossiper._live_endpoints = _live_endpoints; + local_gossiper._unreachable_endpoints = _unreachable_endpoints; - if (live_endpoint_changed) { - _shadow_live_endpoints = _live_endpoints; - } - - if (unreachable_endpoint_changed) { - _shadow_unreachable_endpoints = _unreachable_endpoints; - } - - co_await container().invoke_on_all([this, live_endpoint_changed, unreachable_endpoint_changed, - es = _endpoint_state_map] (gossiper& local_gossiper) { - // Don't copy gossiper(CPU0) maps into themselves! - if (this_shard_id() != 0) { - if (live_endpoint_changed) { - local_gossiper._live_endpoints = _shadow_live_endpoints; - } - - if (unreachable_endpoint_changed) { - local_gossiper._unreachable_endpoints = _shadow_unreachable_endpoints; - } - - for (auto&& e : es) { - local_gossiper._endpoint_state_map[e.first].set_alive(e.second.is_alive()); - } - - local_gossiper._live_endpoints_version = _live_endpoints_version; + for (auto&& e : es) { + local_gossiper._endpoint_state_map[e.first].set_alive(e.second.is_alive()); } - }); - } + + // Set local _live_endpoints_version last to protect against exceptions + // on this shards. In this case replicate_live_endpoints_on_change will + // retry setting the above next time around. + local_gossiper._live_endpoints_version = _live_endpoints_version; + } else { + logger.trace("shard already has the latest live and unreachable endpoints"); + } + }); } // Depends on: @@ -1405,8 +1393,6 @@ future<> gossiper::reset_endpoint_state_map() { g._unreachable_endpoints.clear(); g._live_endpoints.clear(); g._live_endpoints_version = version; - g._shadow_unreachable_endpoints.clear(); - g._shadow_live_endpoints.clear(); g._endpoint_state_map.clear(); }); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 544e53a079..9dc9b9fe41 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -244,8 +244,8 @@ private: bool _in_shadow_round = false; - std::unordered_map _shadow_unreachable_endpoints; - std::unordered_set _shadow_live_endpoints; + uint64_t _shadow_live_endpoints_version = 0; + uint64_t _shadow_unreachable_endpoints_version = 0; // Must be called on shard 0. future> lock_endpoint_update_semaphore(); @@ -263,7 +263,7 @@ private: // they all shards are consistent with each other. future<> mutate_live_and_unreachable_endpoints(std::function); - // replicate shard 0 live endpoints across all other shards. + // replicate shard 0 live and unreachable endpoints sets across all other shards. // _endpoint_update_semaphore must be held for the whole duration future<> replicate_live_endpoints_on_change();