gossiper: replicate_live_endpoints_on_change: use _live_endpoints_version to detect change

Rather than keeping an expensive copy of `_live_endpoint`
and `_unreachable_endpoints` in shadow members, while they aren't
currently used for their content anyhow, just to detect when
their corresponding members change.

With that, it is renamed to replicate_live_and_unreachable_endpoints.

This still doesn't provide strong exception safety guarantees,
but at least we don't "cheat" about shard state
and we don't mark shard 0 state as "replicated" by
updating the shadow members.
Also, we save some unneeded allocations.

Refs scylladb/scylladb#15089
Refs scylladb/scylladb#15088

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-08-19 09:38:20 +03:00
parent d666fbfe8f
commit ce2b8724ed
2 changed files with 25 additions and 39 deletions

View File

@@ -947,44 +947,32 @@ future<> gossiper::replicate_live_endpoints_on_change() {
//
// Gossiper task runs only on CPU0:
//
// - If endpoint_state_map or _live_endpoints have changed - replicate
// them across all other shards.
// - Reschedule the gossiper only after execution on all nodes is done.
// - replicate _live_endpoints and _unreachable_endpoints
// across all other shards.
// - use _live_endpoints_version on each shard
// to determine if it has the latest copy, and replicate the respective
// member from shard 0, if the shard is outdated.
//
bool live_endpoint_changed = (_live_endpoints != _shadow_live_endpoints);
bool unreachable_endpoint_changed = (_unreachable_endpoints != _shadow_unreachable_endpoints);
logger.debug("replicating live and unreachable endpoints to other shards");
if (live_endpoint_changed || unreachable_endpoint_changed) {
logger.debug("replicating live endpoints to other shards");
co_await container().invoke_on_others([this,
es = _endpoint_state_map] (gossiper& local_gossiper) {
if (local_gossiper._live_endpoints_version != _live_endpoints_version) {
local_gossiper._live_endpoints = _live_endpoints;
local_gossiper._unreachable_endpoints = _unreachable_endpoints;
if (live_endpoint_changed) {
_shadow_live_endpoints = _live_endpoints;
}
if (unreachable_endpoint_changed) {
_shadow_unreachable_endpoints = _unreachable_endpoints;
}
co_await container().invoke_on_all([this, live_endpoint_changed, unreachable_endpoint_changed,
es = _endpoint_state_map] (gossiper& local_gossiper) {
// Don't copy gossiper(CPU0) maps into themselves!
if (this_shard_id() != 0) {
if (live_endpoint_changed) {
local_gossiper._live_endpoints = _shadow_live_endpoints;
}
if (unreachable_endpoint_changed) {
local_gossiper._unreachable_endpoints = _shadow_unreachable_endpoints;
}
for (auto&& e : es) {
local_gossiper._endpoint_state_map[e.first].set_alive(e.second.is_alive());
}
local_gossiper._live_endpoints_version = _live_endpoints_version;
for (auto&& e : es) {
local_gossiper._endpoint_state_map[e.first].set_alive(e.second.is_alive());
}
});
}
// Set local _live_endpoints_version last to protect against exceptions
// on this shards. In this case replicate_live_endpoints_on_change will
// retry setting the above next time around.
local_gossiper._live_endpoints_version = _live_endpoints_version;
} else {
logger.trace("shard already has the latest live and unreachable endpoints");
}
});
}
// Depends on:
@@ -1405,8 +1393,6 @@ future<> gossiper::reset_endpoint_state_map() {
g._unreachable_endpoints.clear();
g._live_endpoints.clear();
g._live_endpoints_version = version;
g._shadow_unreachable_endpoints.clear();
g._shadow_live_endpoints.clear();
g._endpoint_state_map.clear();
});
}

View File

@@ -244,8 +244,8 @@ private:
bool _in_shadow_round = false;
std::unordered_map<inet_address, clk::time_point> _shadow_unreachable_endpoints;
std::unordered_set<inet_address> _shadow_live_endpoints;
uint64_t _shadow_live_endpoints_version = 0;
uint64_t _shadow_unreachable_endpoints_version = 0;
// Must be called on shard 0.
future<semaphore_units<>> lock_endpoint_update_semaphore();
@@ -263,7 +263,7 @@ private:
// they all shards are consistent with each other.
future<> mutate_live_and_unreachable_endpoints(std::function<void(gossiper&)>);
// replicate shard 0 live endpoints across all other shards.
// replicate shard 0 live and unreachable endpoints sets across all other shards.
// _endpoint_update_semaphore must be held for the whole duration
future<> replicate_live_endpoints_on_change();