Merge 'Fix issues with endpoint state replication to other shards' from Tomasz
Fixes #3798 Fixes #3694 Tests: unit(release), dtest([new] cql_tests.py:TruncateTester.truncate_after_restart_test) * tag 'fix-gossip-shard-replication-v1' of github.com:tgrabiec/scylla: gms/gossiper: Replicate enpoint states in add_saved_endpoint() gms/gossiper: Make reset_endpoint_state_map() have effect on all shards gms/gossiper: Replicate STATUS change from mark_as_shutdown() to other shards gms/gossiper: Always override states from older generations
This commit is contained in:
@@ -129,26 +129,8 @@ public:
|
||||
update_is_normal();
|
||||
}
|
||||
|
||||
void apply_application_state(application_state key, versioned_value&& value) {
|
||||
auto&& e = _application_state[key];
|
||||
if (e.version < value.version) {
|
||||
e = std::move(value);
|
||||
}
|
||||
update_is_normal();
|
||||
}
|
||||
|
||||
void apply_application_state(application_state key, const versioned_value& value) {
|
||||
auto&& e = _application_state[key];
|
||||
if (e.version < value.version) {
|
||||
e = value;
|
||||
}
|
||||
update_is_normal();
|
||||
}
|
||||
|
||||
void apply_application_state(const endpoint_state& es) {
|
||||
for (auto&& e : es._application_state) {
|
||||
apply_application_state(e.first, e.second);
|
||||
}
|
||||
void add_application_state(const endpoint_state& es) {
|
||||
_application_state = es._application_state;
|
||||
update_is_normal();
|
||||
}
|
||||
|
||||
|
||||
@@ -930,7 +930,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
|
||||
future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
|
||||
return container().invoke_on_all([ep, es, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (engine().cpu_id() != orig) {
|
||||
g.endpoint_state_map[ep].apply_application_state(es);
|
||||
g.endpoint_state_map[ep].add_application_state(es);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -939,7 +939,7 @@ future<> gossiper::replicate(inet_address ep, const std::map<application_state,
|
||||
return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (engine().cpu_id() != orig) {
|
||||
for (auto&& key : changed) {
|
||||
g.endpoint_state_map[ep].apply_application_state(key, src.at(key));
|
||||
g.endpoint_state_map[ep].add_application_state(key, src.at(key));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -948,7 +948,7 @@ future<> gossiper::replicate(inet_address ep, const std::map<application_state,
|
||||
future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
|
||||
return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
|
||||
if (engine().cpu_id() != orig) {
|
||||
g.endpoint_state_map[ep].apply_application_state(key, value);
|
||||
g.endpoint_state_map[ep].add_application_state(key, value);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -1175,11 +1175,13 @@ stdx::optional<endpoint_state> gossiper::get_endpoint_state_for_endpoint(inet_ad
|
||||
}
|
||||
}
|
||||
|
||||
void gossiper::reset_endpoint_state_map() {
|
||||
endpoint_state_map.clear();
|
||||
future<> gossiper::reset_endpoint_state_map() {
|
||||
_unreachable_endpoints.clear();
|
||||
_live_endpoints.clear();
|
||||
_live_endpoints_just_added.clear();
|
||||
return container().invoke_on_all([] (gossiper& g) {
|
||||
g.endpoint_state_map.clear();
|
||||
});
|
||||
}
|
||||
|
||||
std::unordered_map<inet_address, endpoint_state>& gms::gossiper::get_endpoint_states() {
|
||||
@@ -1662,6 +1664,7 @@ void gossiper::maybe_initialize_local_state(int generation_nbr) {
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::add_saved_endpoint(inet_address ep) {
|
||||
if (ep == get_broadcast_address()) {
|
||||
logger.debug("Attempt to add self as saved endpoint");
|
||||
@@ -1687,6 +1690,7 @@ void gossiper::add_saved_endpoint(inet_address ep) {
|
||||
}
|
||||
ep_state.mark_dead();
|
||||
endpoint_state_map[ep] = ep_state;
|
||||
replicate(ep, ep_state).get();
|
||||
_unreachable_endpoints[ep] = now();
|
||||
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
|
||||
}
|
||||
@@ -1924,6 +1928,7 @@ void gossiper::mark_as_shutdown(const inet_address& endpoint) {
|
||||
auto& ep_state = *es;
|
||||
ep_state.add_application_state(application_state::STATUS, storage_service_value_factory().shutdown(true));
|
||||
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
|
||||
replicate(endpoint, ep_state).get();
|
||||
mark_dead(endpoint, ep_state);
|
||||
get_local_failure_detector().force_conviction(endpoint);
|
||||
}
|
||||
|
||||
@@ -417,7 +417,7 @@ public:
|
||||
stdx::optional<endpoint_state> get_endpoint_state_for_endpoint(inet_address ep) const;
|
||||
|
||||
// removes ALL endpoint states; should only be called after shadow gossip
|
||||
void reset_endpoint_state_map();
|
||||
future<> reset_endpoint_state_map();
|
||||
|
||||
std::unordered_map<inet_address, endpoint_state>& get_endpoint_states();
|
||||
|
||||
|
||||
@@ -353,7 +353,7 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
gossiper.check_knows_remote_features(local_features, peer_features);
|
||||
}
|
||||
|
||||
gossiper.reset_endpoint_state_map();
|
||||
gossiper.reset_endpoint_state_map().get();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
@@ -367,7 +367,7 @@ void storage_service::prepare_to_join(std::vector<inet_address> loaded_endpoints
|
||||
slogger.info("Checking remote features with gossip");
|
||||
gossiper.do_shadow_round().get();
|
||||
gossiper.check_knows_remote_features(local_features);
|
||||
gossiper.reset_endpoint_state_map();
|
||||
gossiper.reset_endpoint_state_map().get();
|
||||
for (auto ep : loaded_endpoints) {
|
||||
gossiper.add_saved_endpoint(ep);
|
||||
}
|
||||
@@ -1570,7 +1570,7 @@ future<> storage_service::check_for_endpoint_collision() {
|
||||
throw std::runtime_error("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while consistent_rangemovement is true (check_for_endpoint_collision)");
|
||||
} else {
|
||||
gossiper.goto_shadow_round();
|
||||
gossiper.reset_endpoint_state_map();
|
||||
gossiper.reset_endpoint_state_map().get();
|
||||
found_bootstrapping_node = true;
|
||||
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(gms::gossiper::clk::now() - t).count();
|
||||
slogger.info("Checking bootstrapping/leaving/moving nodes: node={}, status={}, sleep 1 second and check again ({} seconds elapsed) (check_for_endpoint_collision)", addr, state, elapsed);
|
||||
@@ -1582,7 +1582,7 @@ future<> storage_service::check_for_endpoint_collision() {
|
||||
}
|
||||
} while (found_bootstrapping_node);
|
||||
slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)");
|
||||
gossiper.reset_endpoint_state_map();
|
||||
gossiper.reset_endpoint_state_map().get();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1632,8 +1632,9 @@ future<std::unordered_set<token>> storage_service::prepare_replacement_info() {
|
||||
auto tokens = get_tokens_for(replace_address);
|
||||
// use the replacee's host Id as our own so we receive hints, etc
|
||||
return db::system_keyspace::set_local_host_id(host_id).discard_result().then([replace_address, tokens = std::move(tokens)] {
|
||||
gms::get_local_gossiper().reset_endpoint_state_map(); // clean up since we have what we need
|
||||
return make_ready_future<std::unordered_set<token>>(std::move(tokens));
|
||||
return gms::get_local_gossiper().reset_endpoint_state_map().then([tokens = std::move(tokens)] { // clean up since we have what we need
|
||||
return make_ready_future<std::unordered_set<token>>(std::move(tokens));
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user