Merge "gossip: optimize apply_state_locally for large cluster" from Asias
"This series tries to improve the bootstrap of a node in a large cluster by
improving how gossip applies the gossip node state. In #2404, the joining node
failed to bootstrap, because it did not see the seed node when
storage_service::bootstrap ran. After this series, we apply the whole gossip
state contained in the gossip ack/ack2 message before applying the next one,
and we apply the state of the seed node earlier than non-seed node so we can
have the seed node's state faster. We also add some randomness to the order of
applying gossip node state to prevent some of the nodes' state are always
applied earlier than the others.
This series improves apply_state_locally for large cluster:
- Tune the order of applying endpoint_state
- Serialize apply_state_locally
- Avoid copying of the gossip state map
Fixes #2404"
* tag 'asias/gossip_issue_2404_v2' of github.com:scylladb/seastar-dev:
gossip: Avoid copying with apply_state_locally
gossip: Serialize apply_state_locally
gossip: Tune the order of applying endpoint_state in apply_state_locally
gossip: Introduce is_seed helper
gossip: Pass const endpoint_state& in notify_failure_detector
gossip: Pass reference in notify_failure_detector
(cherry picked from commit d2632ddf1d)
This commit is contained in:
@@ -68,7 +68,11 @@ public:
|
||||
return _digests;
|
||||
}
|
||||
|
||||
std::map<inet_address, endpoint_state> get_endpoint_state_map() const {
|
||||
std::map<inet_address, endpoint_state>& get_endpoint_state_map() {
|
||||
return _map;
|
||||
}
|
||||
|
||||
const std::map<inet_address, endpoint_state>& get_endpoint_state_map() const {
|
||||
return _map;
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@
|
||||
#include <chrono>
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include <boost/range/algorithm/set_algorithm.hpp>
|
||||
#include <boost/range/adaptors.hpp>
|
||||
|
||||
namespace gms {
|
||||
|
||||
@@ -222,13 +223,13 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
|
||||
}
|
||||
|
||||
auto g_digest_list = ack_msg.get_gossip_digest_list();
|
||||
auto ep_state_map = ack_msg.get_endpoint_state_map();
|
||||
auto& ep_state_map = ack_msg.get_endpoint_state_map();
|
||||
|
||||
auto f = make_ready_future<>();
|
||||
if (ep_state_map.size() > 0) {
|
||||
/* Notify the Failure Detector */
|
||||
this->notify_failure_detector(ep_state_map);
|
||||
f = this->apply_state_locally(ep_state_map);
|
||||
f = this->apply_state_locally(std::move(ep_state_map));
|
||||
}
|
||||
|
||||
return f.then([id, g_digest_list = std::move(g_digest_list), this] {
|
||||
@@ -268,7 +269,7 @@ future<> gossiper::handle_ack2_msg(gossip_digest_ack2 msg) {
|
||||
auto& remote_ep_state_map = msg.get_endpoint_state_map();
|
||||
/* Notify the Failure Detector */
|
||||
notify_failure_detector(remote_ep_state_map);
|
||||
return apply_state_locally(remote_ep_state_map);
|
||||
return apply_state_locally(std::move(remote_ep_state_map));
|
||||
}
|
||||
|
||||
future<> gossiper::handle_echo_msg() {
|
||||
@@ -370,7 +371,7 @@ future<> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address>
|
||||
}
|
||||
|
||||
|
||||
void gossiper::notify_failure_detector(inet_address endpoint, endpoint_state remote_endpoint_state) {
|
||||
void gossiper::notify_failure_detector(inet_address endpoint, const endpoint_state& remote_endpoint_state) {
|
||||
/*
|
||||
* If the local endpoint state exists then report to the FD only
|
||||
* if the versions workout.
|
||||
@@ -405,10 +406,17 @@ void gossiper::notify_failure_detector(inet_address endpoint, endpoint_state rem
|
||||
}
|
||||
}
|
||||
|
||||
future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_state>& map) {
|
||||
return seastar::async([this, g = this->shared_from_this(), map] () mutable {
|
||||
for (auto& entry : map) {
|
||||
const auto& ep = entry.first;
|
||||
future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> map) {
|
||||
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, g = this->shared_from_this(), map = std::move(map)] {
|
||||
return seastar::async([this, g, map = std::move(map)] () mutable {
|
||||
std::vector<inet_address> endpoints;
|
||||
boost::copy(map | boost::adaptors::map_keys, std::inserter(endpoints, endpoints.begin()));
|
||||
std::shuffle(endpoints.begin(), endpoints.end(), _random_engine);
|
||||
auto node_is_seed = [this] (gms::inet_address ip) { return is_seed(ip); };
|
||||
boost::partition(endpoints, node_is_seed);
|
||||
logger.debug("apply_state_locally_endpoints={}", endpoints);
|
||||
|
||||
for (auto& ep : endpoints) {
|
||||
if (ep == get_broadcast_address() && !is_in_shadow_round()) {
|
||||
continue;
|
||||
}
|
||||
@@ -420,14 +428,13 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
|
||||
If state does not exist just add it. If it does then add it if the remote generation is greater.
|
||||
If there is a generation tie, attempt to break it by heartbeat version.
|
||||
*/
|
||||
const endpoint_state& remote_state = entry.second;
|
||||
const endpoint_state& remote_state = map[ep];
|
||||
auto it = endpoint_state_map.find(ep);
|
||||
if (it != endpoint_state_map.end()) {
|
||||
endpoint_state& local_ep_state_ptr = it->second;
|
||||
int local_generation = local_ep_state_ptr.get_heart_beat_state().get_generation();
|
||||
int remote_generation = remote_state.get_heart_beat_state().get_generation();
|
||||
logger.trace("{} local generation {}, remote generation {}", ep, local_generation, remote_generation);
|
||||
// }
|
||||
if (local_generation != 0 && remote_generation > local_generation + MAX_GENERATION_DIFFERENCE) {
|
||||
// assume some peer has corrupted memory and is broadcasting an unbelievable generation about another peer (or itself)
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
@@ -459,6 +466,7 @@ future<> gossiper::apply_state_locally(const std::map<inet_address, endpoint_sta
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
@@ -726,6 +734,10 @@ bool gossiper::seen_any_seed() {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool gossiper::is_seed(const gms::inet_address& endpoint) const {
|
||||
return _seeds.count(endpoint);
|
||||
}
|
||||
|
||||
void gossiper::register_(shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
|
||||
_subscribers.push_back(subscriber);
|
||||
}
|
||||
@@ -1152,7 +1164,7 @@ int gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
|
||||
return ep1->get_heart_beat_state().get_generation() - ep2->get_heart_beat_state().get_generation();
|
||||
}
|
||||
|
||||
void gossiper::notify_failure_detector(std::map<inet_address, endpoint_state> remoteEpStateMap) {
|
||||
void gossiper::notify_failure_detector(const std::map<inet_address, endpoint_state>& remoteEpStateMap) {
|
||||
for (auto& entry : remoteEpStateMap) {
|
||||
notify_failure_detector(entry.first, entry.second);
|
||||
}
|
||||
|
||||
@@ -105,6 +105,7 @@ private:
|
||||
std::set<inet_address> _seeds_from_config;
|
||||
sstring _cluster_name;
|
||||
semaphore _callback_running{1};
|
||||
semaphore _apply_state_locally_semaphore{1};
|
||||
public:
|
||||
future<> timer_callback_lock() { return _callback_running.wait(); }
|
||||
void timer_callback_unlock() { _callback_running.signal(); }
|
||||
@@ -404,10 +405,10 @@ public:
|
||||
*/
|
||||
int compare_endpoint_startup(inet_address addr1, inet_address addr2);
|
||||
|
||||
void notify_failure_detector(std::map<inet_address, endpoint_state> remoteEpStateMap);
|
||||
void notify_failure_detector(const std::map<inet_address, endpoint_state>& remoteEpStateMap);
|
||||
|
||||
|
||||
void notify_failure_detector(inet_address endpoint, endpoint_state remote_endpoint_state);
|
||||
void notify_failure_detector(inet_address endpoint, const endpoint_state& remote_endpoint_state);
|
||||
|
||||
private:
|
||||
void mark_alive(inet_address addr, endpoint_state& local_state);
|
||||
@@ -428,7 +429,7 @@ public:
|
||||
bool is_alive(inet_address ep);
|
||||
bool is_dead_state(const endpoint_state& eps) const;
|
||||
|
||||
future<> apply_state_locally(const std::map<inet_address, endpoint_state>& map);
|
||||
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
|
||||
|
||||
private:
|
||||
void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);
|
||||
@@ -504,6 +505,7 @@ public:
|
||||
void dump_endpoint_state_map();
|
||||
void debug_show();
|
||||
public:
|
||||
bool is_seed(const inet_address& endpoint) const;
|
||||
bool is_shutdown(const inet_address& endpoint) const;
|
||||
bool is_normal(const inet_address& endpoint) const;
|
||||
bool is_silent_shutdown_state(const endpoint_state& ep_state) const;
|
||||
|
||||
Reference in New Issue
Block a user