gossip: Make gossip message handling async
In each gossip round, i.e., gossiper::run(), we do:
1) send syn message
2) peer node: receive syn message, send back ack message
3) process ack message in handle_ack_msg
apply_state_locally
mark_alive
send_gossip_echo
handle_major_state_change
on_restart
mark_alive
send_gossip_echo
mark_dead
on_dead
on_join
apply_new_states
do_on_change_notifications
on_change
4) send back ack2 message
5) peer node: process ack2 message
apply_state_locally
At the moment, syn is "wait" message, it times out in 3 seconds. In step
3, all the registered gossip callbacks are called which might take
significant amount of time to complete.
In order to reduce the gossip round latency, we make syn "no-wait" and
do not run the handle_ack_msg insdie the gossip::run(). As a result, we
will not get a ack message as the return value of a syn message any
more, so a GOSSIP_DIGEST_ACK message verb is introduced.
With this patch, the gossip message exchange is now async. It is useful
when some nodes are down in the cluster. We will not delay the gossip
round, which is supposed to run every second, 3*n seconds (n = 1-3,
since it talks to 1-3 peer nodes in each gossip round) or even
longer (considering the time to run gossip callbacks).
Later, we can make talking to the 1-3 peer nodes in parallel to reduce
latency even more.
Refs: #900
This commit is contained in:
118
gms/gossiper.cc
118
gms/gossiper.cc
@@ -160,22 +160,21 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
|
||||
}
|
||||
}
|
||||
|
||||
future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
|
||||
future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
|
||||
logger.trace("cluster_name:peer={},local={},partitioner_name:peer={},local={}",
|
||||
syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name());
|
||||
this->set_last_processed_message_at();
|
||||
inet_address from;
|
||||
if (!this->is_enabled()) {
|
||||
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
/* If the message is from a different cluster throw it away. */
|
||||
if (syn_msg.cluster_id() != get_cluster_name()) {
|
||||
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
|
||||
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
auto g_digest_list = syn_msg.get_gossip_digests();
|
||||
@@ -184,7 +183,7 @@ future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
|
||||
std::map<inet_address, endpoint_state> delta_ep_state_map;
|
||||
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
|
||||
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
|
||||
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
|
||||
return this->ms().send_gossip_digest_ack(from, std::move(ack_msg));
|
||||
}
|
||||
|
||||
future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
|
||||
@@ -220,19 +219,50 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
|
||||
}
|
||||
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
|
||||
logger.trace("Sending a GossipDigestACK2 to {}", id);
|
||||
return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).then_wrapped([id] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
logger.trace("Got GossipDigestACK2 Reply");
|
||||
} catch (...) {
|
||||
logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, std::current_exception());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) {
|
||||
logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void gossiper::init_messaging_service_handler() {
|
||||
if (_ms_registered) {
|
||||
return;
|
||||
}
|
||||
_ms_registered = true;
|
||||
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
|
||||
auto from = net::messaging_service::get_source(cinfo);
|
||||
smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.handle_syn_msg(from, std::move(syn_msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
|
||||
auto from = net::messaging_service::get_source(cinfo);
|
||||
smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.handle_ack_msg(from, std::move(msg));
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
|
||||
smp::submit_to(0, [msg = std::move(msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
gossiper.set_last_processed_message_at();
|
||||
auto& remote_ep_state_map = msg.get_endpoint_state_map();
|
||||
/* Notify the Failure Detector */
|
||||
gossiper.notify_failure_detector(remote_ep_state_map);
|
||||
return gossiper.apply_state_locally(remote_ep_state_map);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_echo([] {
|
||||
return smp::submit_to(0, [] {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
@@ -256,25 +286,6 @@ void gossiper::init_messaging_service_handler() {
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_digest_syn([] (gossip_digest_syn syn_msg) {
|
||||
return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
return gossiper.handle_syn_msg(std::move(syn_msg));
|
||||
});
|
||||
});
|
||||
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
|
||||
smp::submit_to(0, [msg = std::move(msg)] () mutable {
|
||||
auto& gossiper = gms::get_local_gossiper();
|
||||
gossiper.set_last_processed_message_at();
|
||||
auto& remote_ep_state_map = msg.get_endpoint_state_map();
|
||||
/* Notify the Failure Detector */
|
||||
gossiper.notify_failure_detector(remote_ep_state_map);
|
||||
return gossiper.apply_state_locally(remote_ep_state_map);
|
||||
}).handle_exception([] (auto ep) {
|
||||
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
}
|
||||
|
||||
void gossiper::uninit_messaging_service_handler() {
|
||||
@@ -282,7 +293,9 @@ void gossiper::uninit_messaging_service_handler() {
|
||||
ms.unregister_gossip_echo();
|
||||
ms.unregister_gossip_shutdown();
|
||||
ms.unregister_gossip_digest_syn();
|
||||
ms.unregister_gossip_digest_ack();
|
||||
ms.unregister_gossip_digest_ack2();
|
||||
_ms_registered = false;
|
||||
}
|
||||
|
||||
future<bool> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> epset) {
|
||||
@@ -297,18 +310,11 @@ future<bool> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_addr
|
||||
inet_address to = __live_endpoints[index];
|
||||
auto id = get_msg_addr(to);
|
||||
logger.trace("Sending a GossipDigestSyn to {} ...", id);
|
||||
return ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) {
|
||||
try {
|
||||
auto ack_msg = f.get0();
|
||||
logger.trace("Got GossipDigestSyn Reply");
|
||||
return this->handle_ack_msg(id, std::move(ack_msg));
|
||||
} catch (...) {
|
||||
// It is normal to reach here because it is normal that a node
|
||||
// tries to send a SYN message to a peer node which is down before
|
||||
// failure_detector thinks that peer node is down.
|
||||
logger.trace("Fail to send GossipDigestSyn to {}: {}", id, std::current_exception());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
// It is normal to reach here because it is normal that a node
|
||||
// tries to send a SYN message to a peer node which is down before
|
||||
// failure_detector thinks that peer node is down.
|
||||
logger.trace("Fail to send GossipDigestSyn to {}: {}", id, ep);
|
||||
}).then([this, to] {
|
||||
return make_ready_future<bool>(_seeds.count(to));
|
||||
});
|
||||
@@ -1319,6 +1325,13 @@ future<> gossiper::do_shadow_round() {
|
||||
build_seeds_list();
|
||||
_in_shadow_round = true;
|
||||
auto t = clk::now();
|
||||
|
||||
// When peer node receives a syn message, it will send back a ack message.
|
||||
// So, we need to register gossip message handlers before sending syn message.
|
||||
get_gossiper().invoke_on_all([] (gossiper& g) {
|
||||
g.init_messaging_service_handler();
|
||||
}).get();
|
||||
|
||||
while (this->_in_shadow_round) {
|
||||
// send a completely empty syn
|
||||
for (inet_address seed : _seeds) {
|
||||
@@ -1326,24 +1339,17 @@ future<> gossiper::do_shadow_round() {
|
||||
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
|
||||
auto id = get_msg_addr(seed);
|
||||
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
|
||||
ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) {
|
||||
try {
|
||||
auto ack_msg = f.get0();
|
||||
logger.trace("Got GossipDigestSyn (ShadowRound) Reply");
|
||||
return this->handle_ack_msg(id, std::move(ack_msg));
|
||||
} catch (...) {
|
||||
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, std::current_exception());
|
||||
}
|
||||
return make_ready_future<>();
|
||||
ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
|
||||
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
|
||||
}).get();
|
||||
}
|
||||
auto& ss = service::get_local_storage_service();
|
||||
sleep(std::chrono::seconds(1)).get();
|
||||
if (clk::now() > t + ss.get_ring_delay() * 60) {
|
||||
throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)"));
|
||||
}
|
||||
if (this->_in_shadow_round) {
|
||||
logger.info("Sleep 1 second and connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
|
||||
sleep(std::chrono::seconds(1)).get();
|
||||
logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -88,8 +88,8 @@ private:
|
||||
}
|
||||
void init_messaging_service_handler();
|
||||
void uninit_messaging_service_handler();
|
||||
future<gossip_digest_ack> handle_syn_msg(gossip_digest_syn syn_msg);
|
||||
future<> handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg);
|
||||
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
|
||||
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
|
||||
static constexpr uint32_t _default_cpuid = 0;
|
||||
msg_addr get_msg_addr(inet_address to) {
|
||||
return msg_addr{to, _default_cpuid};
|
||||
@@ -506,6 +506,7 @@ public:
|
||||
future<> wait_for_gossip_to_settle();
|
||||
private:
|
||||
uint64_t _nr_run = 0;
|
||||
bool _ms_registered = false;
|
||||
};
|
||||
|
||||
extern distributed<gossiper> _the_gossiper;
|
||||
|
||||
@@ -614,16 +614,29 @@ future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from)
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from));
|
||||
}
|
||||
|
||||
void messaging_service::register_gossip_digest_syn(std::function<future<gossip_digest_ack> (gossip_digest_syn)>&& func) {
|
||||
// gossip syn
|
||||
void messaging_service::register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gossip_digest_syn)>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(func));
|
||||
}
|
||||
void messaging_service::unregister_gossip_digest_syn() {
|
||||
_rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_SYN);
|
||||
}
|
||||
future<gossip_digest_ack> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) {
|
||||
return send_message_timeout<gossip_digest_ack>(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), 3000ms, std::move(msg));
|
||||
future<> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) {
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(msg));
|
||||
}
|
||||
|
||||
// gossip ack
|
||||
void messaging_service::register_gossip_digest_ack(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gossip_digest_ack)>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(func));
|
||||
}
|
||||
void messaging_service::unregister_gossip_digest_ack() {
|
||||
_rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_ACK);
|
||||
}
|
||||
future<> messaging_service::send_gossip_digest_ack(msg_addr id, gossip_digest_ack msg) {
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(id), std::move(msg));
|
||||
}
|
||||
|
||||
// gossip ack2
|
||||
void messaging_service::register_gossip_digest_ack2(std::function<rpc::no_wait_type (gossip_digest_ack2)>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(func));
|
||||
}
|
||||
|
||||
@@ -234,9 +234,14 @@ public:
|
||||
future<> send_gossip_shutdown(msg_addr id, inet_address from);
|
||||
|
||||
// Wrapper for GOSSIP_DIGEST_SYN
|
||||
void register_gossip_digest_syn(std::function<future<gms::gossip_digest_ack> (gms::gossip_digest_syn)>&& func);
|
||||
void register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_syn)>&& func);
|
||||
void unregister_gossip_digest_syn();
|
||||
future<gms::gossip_digest_ack> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg);
|
||||
future<> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg);
|
||||
|
||||
// Wrapper for GOSSIP_DIGEST_ACK
|
||||
void register_gossip_digest_ack(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_ack)>&& func);
|
||||
void unregister_gossip_digest_ack();
|
||||
future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg);
|
||||
|
||||
// Wrapper for GOSSIP_DIGEST_ACK2
|
||||
void register_gossip_digest_ack2(std::function<rpc::no_wait_type (gms::gossip_digest_ack2)>&& func);
|
||||
|
||||
@@ -20,9 +20,10 @@
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "core/reactor.hh"
|
||||
#include "core/app-template.hh"
|
||||
#include "core/sstring.hh"
|
||||
#include <core/reactor.hh>
|
||||
#include <core/app-template.hh>
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/rpc/rpc_types.hh>
|
||||
#include "message/messaging_service.hh"
|
||||
#include "gms/gossip_digest_syn.hh"
|
||||
#include "gms/gossip_digest_ack.hh"
|
||||
@@ -57,10 +58,13 @@ public:
|
||||
future<> stop() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
promise<> digest_test_done;
|
||||
public:
|
||||
void init_handler() {
|
||||
ms.register_gossip_digest_syn([] (gms::gossip_digest_syn msg) {
|
||||
ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) {
|
||||
print("Server got syn msg = %s\n", msg);
|
||||
|
||||
auto from = net::messaging_service::get_source(cinfo);
|
||||
auto ep1 = inet_address("1.1.1.1");
|
||||
auto ep2 = inet_address("2.2.2.2");
|
||||
int32_t gen = 800;
|
||||
@@ -74,7 +78,26 @@ public:
|
||||
{ep2, endpoint_state()},
|
||||
};
|
||||
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
|
||||
return make_ready_future<gms::gossip_digest_ack>(std::move(ack));
|
||||
ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) {
|
||||
print("Fail to send ack : %s", ep);
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
|
||||
ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) {
|
||||
print("Server got ack msg = %s\n", msg);
|
||||
auto from = net::messaging_service::get_source(cinfo);
|
||||
// Prepare gossip_digest_ack2 message
|
||||
auto ep1 = inet_address("3.3.3.3");
|
||||
std::map<inet_address, endpoint_state> eps{
|
||||
{ep1, endpoint_state()},
|
||||
};
|
||||
gms::gossip_digest_ack2 ack2(std::move(eps));
|
||||
ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) {
|
||||
print("Fail to send ack2 : %s", ep);
|
||||
});
|
||||
digest_test_done.set_value();
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
|
||||
ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) {
|
||||
@@ -108,18 +131,8 @@ public:
|
||||
{ep2, gen++, ver++},
|
||||
};
|
||||
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests);
|
||||
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this, id] (gms::gossip_digest_ack ack) {
|
||||
print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack);
|
||||
// Prepare gossip_digest_ack2 message
|
||||
auto ep1 = inet_address("3.3.3.3");
|
||||
std::map<inet_address, endpoint_state> eps{
|
||||
{ep1, endpoint_state()},
|
||||
};
|
||||
gms::gossip_digest_ack2 ack2(std::move(eps));
|
||||
return ms.send_gossip_digest_ack2(id, std::move(ack2)).then([] () {
|
||||
print("Client sent gossip_digest_ack2 got reply = void\n");
|
||||
return make_ready_future<>();
|
||||
});
|
||||
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] {
|
||||
return digest_test_done.get_future();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -169,6 +182,7 @@ int main(int ac, char ** av) {
|
||||
api_port++;
|
||||
}
|
||||
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
|
||||
utils::fb_utilities::set_broadcast_address(listen);
|
||||
net::get_messaging_service().start(listen).then([config, api_port, stay_alive] () {
|
||||
auto testers = new distributed<tester>;
|
||||
testers->start().then([testers]{
|
||||
|
||||
Reference in New Issue
Block a user