gms/gossiper: Replicate changes incrementally to other shards

storage_service depends on endpoint states to be replicated to all
shards before token metadata is replicated. Currently this is taken
care of by storage_service::replicate_to_all_cores(), invoked from
storage_service's change listener. It copies whole endpoint state map,
which is expensive in large clusters. It's more efficient to replicate
only incremental changes, and only once, rather than for each
application state.
This commit is contained in:
Tomasz Grabiec
2017-10-06 16:31:14 +02:00
parent 28c9609370
commit 2d5fb9d109
5 changed files with 79 additions and 63 deletions

View File

@@ -123,6 +123,26 @@ public:
_application_state[key] = std::move(value);
}
void apply_application_state(application_state key, versioned_value&& value) {
auto&& e = _application_state[key];
if (e.version < value.version) {
e = std::move(value);
}
}
void apply_application_state(application_state key, const versioned_value& value) {
auto&& e = _application_state[key];
if (e.version < value.version) {
e = value;
}
}
void apply_application_state(const endpoint_state& es) {
for (auto&& e : es._application_state) {
apply_application_state(e.first, e.second);
}
}
/* getters and setters */
/**
* @return System.nanoTime() when state was updated last time.

View File

@@ -824,9 +824,12 @@ int gossiper::get_max_endpoint_state_version(endpoint_state state) {
return max_version;
}
// Runs inside seastar::async context
void gossiper::evict_from_membership(inet_address endpoint) {
_unreachable_endpoints.erase(endpoint);
endpoint_state_map.erase(endpoint);
container().invoke_on_all([endpoint] (auto& g) {
g.endpoint_state_map.erase(endpoint);
}).get();
_expire_time_endpoint_map.erase(endpoint);
get_local_failure_detector().remove(endpoint);
quarantine_endpoint(endpoint);
@@ -886,6 +889,32 @@ void gossiper::make_random_gossip_digest(std::vector<gossip_digest>& g_digests)
#endif
}
future<> gossiper::replicate(inet_address ep, const endpoint_state& es) {
return container().invoke_on_all([ep, es, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
g.endpoint_state_map[ep].apply_application_state(es);
}
});
}
future<> gossiper::replicate(inet_address ep, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& changed) {
return container().invoke_on_all([ep, &src, &changed, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
for (auto&& key : changed) {
g.endpoint_state_map[ep].apply_application_state(key, src.at(key));
}
}
});
}
future<> gossiper::replicate(inet_address ep, application_state key, const versioned_value& value) {
return container().invoke_on_all([ep, key, &value, orig = engine().cpu_id(), self = shared_from_this()] (gossiper& g) {
if (engine().cpu_id() != orig) {
g.endpoint_state_map[ep].apply_application_state(key, value);
}
});
}
future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, utils::UUID local_host_id) {
return seastar::async([this, g = this->shared_from_this(), endpoint, host_id, local_host_id] {
auto& state = get_endpoint_state(endpoint);
@@ -908,6 +937,7 @@ future<> gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id
eps.add_application_state(application_state::STATUS, storage_service_value_factory().removing_nonlocal(host_id));
eps.add_application_state(application_state::REMOVAL_COORDINATOR, storage_service_value_factory().removal_coordinator(local_host_id));
endpoint_state_map[endpoint] = eps;
replicate(endpoint, eps).get();
});
}
@@ -921,6 +951,7 @@ future<> gossiper::advertise_token_removed(inet_address endpoint, utils::UUID ho
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
endpoint_state_map[endpoint] = eps;
replicate(endpoint, eps).get();
// ensure at least one gossip round occurs before returning
sleep(INTERVAL * 2).get();
});
@@ -1274,6 +1305,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
endpoint_state_map[ep] = eps;
replicate(ep, eps).get();
if (_in_shadow_round) {
// In shadow round, we only interested in the peer's endpoint_state,
@@ -1364,6 +1396,14 @@ void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state,
}
});
// We must replicate endpoint states before listeners run.
// Exceptions during replication will cause abort because node's state
// would be inconsistent across shards. Changes listeners depend on state
// being replicated to all shards.
auto replicate_changes = seastar::defer([&] () noexcept {
replicate(addr, remote_map, changed).get();
});
// we need to make two loops here, one to apply, then another to notify,
// this way all states in an update are present and current when the notifications are received
for (const auto& remote_entry : remote_map) {
@@ -1505,8 +1545,10 @@ future<> gossiper::start_gossiping(int generation_nbr, std::map<application_stat
auto generation = local_state.get_heart_beat_state().get_generation();
//notify snitches that Gossiper is about to start
return locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting().then([this, generation] {
return replicate(get_broadcast_address(), local_state).then([] {
//notify snitches that Gossiper is about to start
return locator::i_endpoint_snitch::get_local_snitch_ptr()->gossiper_starting();
}).then([this, generation] {
logger.trace("gossip started with generation {}", generation);
_enabled = true;
_nr_run = 0;
@@ -1616,6 +1658,7 @@ future<> gossiper::add_local_application_state(application_state state, versione
es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
if (es) {
es->add_application_state(state, value);
gossiper.replicate(ep_addr, state, value).get();
gossiper.do_on_change_notifications(ep_addr, state, value);
}
}).handle_exception([] (auto ep) {

View File

@@ -81,7 +81,7 @@ class i_failure_detector;
* Upon hearing a GossipShutdownMessage, this module will instantly mark the remote node as down in
* the Failure Detector.
*/
class gossiper : public i_failure_detection_event_listener, public seastar::async_sharded_service<gossiper> {
class gossiper : public i_failure_detection_event_listener, public seastar::async_sharded_service<gossiper>, public seastar::peering_sharded_service<gossiper> {
public:
using clk = seastar::lowres_system_clock;
private:
@@ -224,6 +224,15 @@ private:
std::vector<inet_address> _shadow_live_endpoints;
void run();
// Replicates given endpoint_state to all other shards.
// The state state doesn't have to be kept alive around until completes.
future<> replicate(inet_address, const endpoint_state&);
// Replicates "states" from "src" to all other shards.
// "src" and "states" must be kept alive until completes and must not change.
future<> replicate(inet_address, const std::map<application_state, versioned_value>& src, const std::vector<application_state>& states);
// Replicates given value to all other shards.
// The value must be kept alive until completes and not change.
future<> replicate(inet_address, application_state key, const versioned_value& value);
public:
gossiper();

View File

@@ -1373,30 +1373,6 @@ future<> storage_service::replicate_tm_only() {
});
}
// Should run under gossiper::timer_callback lock. Serialized.
future<> storage_service::replicate_tm_and_ep_map(shared_ptr<gms::gossiper> g0) {
// sanity: check that gossiper is fully initialized like we expect it to be
return get_storage_service().invoke_on_all([](storage_service& local_ss) {
if (!gms::get_gossiper().local_is_initialized()) {
auto err = sprint("replicate_to_all_cores is called before gossiper is fully initialized");
slogger.warn("{}", err);
throw std::runtime_error(err);
}
}).then([this, g0] {
_shadow_token_metadata = _token_metadata;
g0->shadow_endpoint_state_map = g0->endpoint_state_map;
g0->maybe_enable_features();
return get_storage_service().invoke_on_all([g0, this](storage_service& local_ss) {
if (engine().cpu_id() != 0) {
gms::get_local_gossiper().endpoint_state_map = g0->shadow_endpoint_state_map;
gms::get_local_gossiper().maybe_enable_features();
local_ss._token_metadata = _shadow_token_metadata;
}
});
});
}
future<> storage_service::replicate_to_all_cores() {
// sanity checks: this function is supposed to be run on shard 0 only and
// when gossiper has already been initialized.
@@ -1406,31 +1382,13 @@ future<> storage_service::replicate_to_all_cores() {
throw std::runtime_error(err);
}
if (!gms::get_gossiper().local_is_initialized()) {
auto err = sprint("replicate_to_all_cores is called before gossiper on shard0 is initialized");
slogger.warn("{}", err);
throw std::runtime_error(err);
}
return _replicate_action.trigger_later().then([self = shared_from_this()] {});
}
future<> storage_service::do_replicate_to_all_cores() {
auto g0 = gms::get_local_gossiper().shared_from_this();
return g0->timer_callback_lock().then([this, g0] {
bool endpoint_map_changed = g0->shadow_endpoint_state_map != g0->endpoint_state_map;
if (endpoint_map_changed) {
return replicate_tm_and_ep_map(g0).finally([g0] {
g0->timer_callback_unlock();
});
} else {
g0->timer_callback_unlock();
return replicate_tm_only();
}
}).handle_exception([g0] (auto e) {
slogger.error("Fail to replicate _token_metadata: {}", e);
});
return replicate_tm_only().handle_exception([] (auto e) {
slogger.error("Fail to replicate _token_metadata: {}", e);
});
}
future<> storage_service::gossip_snitch_info() {

View File

@@ -699,20 +699,6 @@ private:
*/
future<> replicate_tm_only();
/**
* Replicates token_metadata and gossiper::endpoint_state_map contents on
* shard0 instances to other shards.
*
* Should be called with a _replicate_task and a gossiper::timer_callback
* semaphores taken.
* Should run on shard 0 only.
*
* @param g0 a "shared_from_this()" pointer to a gossiper instance on shard0
*
* @return a ready future when replication is complete.
*/
future<> replicate_tm_and_ep_map(shared_ptr<gms::gossiper> g0);
/**
* Handle node bootstrap
*