diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 5150ee6d59..8d873d32a5 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -129,26 +129,8 @@ public: update_is_normal(); } - void apply_application_state(application_state key, versioned_value&& value) { - auto&& e = _application_state[key]; - if (e.version < value.version) { - e = std::move(value); - } - update_is_normal(); - } - - void apply_application_state(application_state key, const versioned_value& value) { - auto&& e = _application_state[key]; - if (e.version < value.version) { - e = value; - } - update_is_normal(); - } - - void apply_application_state(const endpoint_state& es) { - for (auto&& e : es._application_state) { - apply_application_state(e.first, e.second); - } + void add_application_state(const endpoint_state& es) { + _application_state = es._application_state; update_is_normal(); } diff --git a/gms/gossiper.cc b/gms/gossiper.cc index aabd34223b..bac91ee878 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -930,7 +930,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g future<> gossiper::replicate(inet_address ep, const endpoint_state& es) { return container().invoke_on_all([ep, es, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) { if (engine().cpu_id() != orig) { - g.endpoint_state_map[ep].apply_application_state(es); + g.endpoint_state_map[ep].add_application_state(es); } }); } @@ -939,7 +939,7 @@ future<> gossiper::replicate(inet_address ep, const std::map gossiper::replicate(inet_address ep, const std::map gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) { return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) { if (engine().cpu_id() != orig) { - g.endpoint_state_map[ep].apply_application_state(key, value); + g.endpoint_state_map[ep].add_application_state(key, value); } }); } @@ -1175,11 +1175,13 @@ stdx::optional gossiper::get_endpoint_state_for_endpoint(inet_ad } } -void gossiper::reset_endpoint_state_map() { - endpoint_state_map.clear(); +future<> gossiper::reset_endpoint_state_map() { _unreachable_endpoints.clear(); _live_endpoints.clear(); _live_endpoints_just_added.clear(); + return container().invoke_on_all([] (gossiper& g) { + g.endpoint_state_map.clear(); + }); } std::unordered_map& gms::gossiper::get_endpoint_states() { @@ -1662,6 +1664,7 @@ void gossiper::maybe_initialize_local_state(int generation_nbr) { } } +// Runs inside seastar::async context void gossiper::add_saved_endpoint(inet_address ep) { if (ep == get_broadcast_address()) { logger.debug("Attempt to add self as saved endpoint"); @@ -1687,6 +1690,7 @@ void gossiper::add_saved_endpoint(inet_address ep) { } ep_state.mark_dead(); endpoint_state_map[ep] = ep_state; + replicate(ep, ep_state).get(); _unreachable_endpoints[ep] = now(); logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation()); } @@ -1924,6 +1928,7 @@ void gossiper::mark_as_shutdown(const inet_address& endpoint) { auto& ep_state = *es; ep_state.add_application_state(application_state::STATUS, storage_service_value_factory().shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); + replicate(endpoint, ep_state).get(); mark_dead(endpoint, ep_state); get_local_failure_detector().force_conviction(endpoint); } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 89acabb2ff..ee416b5a99 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -417,7 +417,7 @@ public: stdx::optional get_endpoint_state_for_endpoint(inet_address ep) const; // removes ALL endpoint states; should only be called after shadow gossip - void reset_endpoint_state_map(); + future<> reset_endpoint_state_map(); std::unordered_map& get_endpoint_states(); diff --git a/service/storage_service.cc b/service/storage_service.cc index b4576169f0..0eb32d60f8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -353,7 +353,7 @@ void storage_service::prepare_to_join(std::vector loaded_endpoints gossiper.check_knows_remote_features(local_features, peer_features); } - gossiper.reset_endpoint_state_map(); + gossiper.reset_endpoint_state_map().get(); for (auto ep : loaded_endpoints) { gossiper.add_saved_endpoint(ep); } @@ -367,7 +367,7 @@ void storage_service::prepare_to_join(std::vector loaded_endpoints slogger.info("Checking remote features with gossip"); gossiper.do_shadow_round().get(); gossiper.check_knows_remote_features(local_features); - gossiper.reset_endpoint_state_map(); + gossiper.reset_endpoint_state_map().get(); for (auto ep : loaded_endpoints) { gossiper.add_saved_endpoint(ep); } @@ -1570,7 +1570,7 @@ future<> storage_service::check_for_endpoint_collision() { throw std::runtime_error("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while consistent_rangemovement is true (check_for_endpoint_collision)"); } else { gossiper.goto_shadow_round(); - gossiper.reset_endpoint_state_map(); + gossiper.reset_endpoint_state_map().get(); found_bootstrapping_node = true; auto elapsed = std::chrono::duration_cast(gms::gossiper::clk::now() - t).count(); slogger.info("Checking bootstrapping/leaving/moving nodes: node={}, status={}, sleep 1 second and check again ({} seconds elapsed) (check_for_endpoint_collision)", addr, state, elapsed); @@ -1582,7 +1582,7 @@ future<> storage_service::check_for_endpoint_collision() { } } while (found_bootstrapping_node); slogger.info("Checking bootstrapping/leaving/moving nodes: ok (check_for_endpoint_collision)"); - gossiper.reset_endpoint_state_map(); + gossiper.reset_endpoint_state_map().get(); }); } @@ -1632,8 +1632,9 @@ future> storage_service::prepare_replacement_info() { auto tokens = get_tokens_for(replace_address); // use the replacee's host Id as our own so we receive hints, etc return db::system_keyspace::set_local_host_id(host_id).discard_result().then([replace_address, tokens = std::move(tokens)] { - gms::get_local_gossiper().reset_endpoint_state_map(); // clean up since we have what we need - return make_ready_future>(std::move(tokens)); + return gms::get_local_gossiper().reset_endpoint_state_map().then([tokens = std::move(tokens)] { // clean up since we have what we need + return make_ready_future>(std::move(tokens)); + }); }); }); }