gossip: Make gossip message handling async

In each gossip round, i.e., gossiper::run(), we do:

1) send syn message
2)                           peer node: receive syn message, send back ack message
3) process ack message in handle_ack_msg
   apply_state_locally
     mark_alive
       send_gossip_echo
     handle_major_state_change
       on_restart
       mark_alive
         send_gossip_echo
       mark_dead
         on_dead
       on_join
     apply_new_states
       do_on_change_notifications
          on_change
4) send back ack2 message
5)                            peer node: process ack2 message
   			      apply_state_locally

At the moment, syn is "wait" message, it times out in 3 seconds. In step
3, all the registered gossip callbacks are called which might take
significant amount of time to complete.

In order to reduce the gossip round latency, we make syn "no-wait" and
do not run the handle_ack_msg insdie the gossip::run(). As a result, we
will not get a ack message as the return value of a syn message any
more, so a GOSSIP_DIGEST_ACK message verb is introduced.

With this patch, the gossip message exchange is now async. It is useful
when some nodes are down in the cluster. We will not delay the gossip
round, which is supposed to run every second, 3*n seconds (n = 1-3,
since it talks to 1-3 peer nodes in each gossip round) or even
longer (considering the time to run gossip callbacks).

Later, we can make talking to the 1-3 peer nodes in parallel to reduce
latency even more.

Refs: #900
This commit is contained in:
Asias He
2016-02-18 09:30:18 +08:00
parent 63df54b368
commit 697b16414a
5 changed files with 119 additions and 80 deletions

View File

@@ -160,22 +160,21 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
}
}
future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
future<> gossiper::handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg) {
logger.trace("cluster_name:peer={},local={},partitioner_name:peer={},local={}",
syn_msg.cluster_id(), get_cluster_name(), syn_msg.partioner(), get_partitioner_name());
this->set_last_processed_message_at();
inet_address from;
if (!this->is_enabled()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
return make_ready_future<>();
}
/* If the message is from a different cluster throw it away. */
if (syn_msg.cluster_id() != get_cluster_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
return make_ready_future<>();
}
if (syn_msg.partioner() != "" && syn_msg.partioner() != get_partitioner_name()) {
return make_ready_future<gossip_digest_ack>(gossip_digest_ack());
return make_ready_future<>();
}
auto g_digest_list = syn_msg.get_gossip_digests();
@@ -184,7 +183,7 @@ future<gossip_digest_ack> gossiper::handle_syn_msg(gossip_digest_syn syn_msg) {
std::map<inet_address, endpoint_state> delta_ep_state_map;
this->examine_gossiper(g_digest_list, delta_gossip_digest_list, delta_ep_state_map);
gms::gossip_digest_ack ack_msg(std::move(delta_gossip_digest_list), std::move(delta_ep_state_map));
return make_ready_future<gossip_digest_ack>(std::move(ack_msg));
return this->ms().send_gossip_digest_ack(from, std::move(ack_msg));
}
future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
@@ -220,19 +219,50 @@ future<> gossiper::handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg) {
}
gms::gossip_digest_ack2 ack2_msg(std::move(delta_ep_state_map));
logger.trace("Sending a GossipDigestACK2 to {}", id);
return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).then_wrapped([id] (auto&& f) {
try {
f.get();
logger.trace("Got GossipDigestACK2 Reply");
} catch (...) {
logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, std::current_exception());
}
return make_ready_future<>();
return this->ms().send_gossip_digest_ack2(id, std::move(ack2_msg)).handle_exception([id] (auto ep) {
logger.warn("Fail to send GossipDigestACK2 to {}: {}", id, ep);
});
});
}
void gossiper::init_messaging_service_handler() {
if (_ms_registered) {
return;
}
_ms_registered = true;
ms().register_gossip_digest_syn([] (const rpc::client_info& cinfo, gossip_digest_syn syn_msg) {
auto from = net::messaging_service::get_source(cinfo);
smp::submit_to(0, [from, syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_syn_msg(from, std::move(syn_msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_SYN: {}", ep);
});
return messaging_service::no_wait();
});
ms().register_gossip_digest_ack([] (const rpc::client_info& cinfo, gossip_digest_ack msg) {
auto from = net::messaging_service::get_source(cinfo);
smp::submit_to(0, [from, msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_ack_msg(from, std::move(msg));
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at();
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
gossiper.notify_failure_detector(remote_ep_state_map);
return gossiper.apply_state_locally(remote_ep_state_map);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK2: {}", ep);
});
return messaging_service::no_wait();
});
ms().register_gossip_echo([] {
return smp::submit_to(0, [] {
auto& gossiper = gms::get_local_gossiper();
@@ -256,25 +286,6 @@ void gossiper::init_messaging_service_handler() {
});
return messaging_service::no_wait();
});
ms().register_gossip_digest_syn([] (gossip_digest_syn syn_msg) {
return smp::submit_to(0, [syn_msg = std::move(syn_msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
return gossiper.handle_syn_msg(std::move(syn_msg));
});
});
ms().register_gossip_digest_ack2([] (gossip_digest_ack2 msg) {
smp::submit_to(0, [msg = std::move(msg)] () mutable {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_last_processed_message_at();
auto& remote_ep_state_map = msg.get_endpoint_state_map();
/* Notify the Failure Detector */
gossiper.notify_failure_detector(remote_ep_state_map);
return gossiper.apply_state_locally(remote_ep_state_map);
}).handle_exception([] (auto ep) {
logger.warn("Fail to handle GOSSIP_DIGEST_ACK: {}", ep);
});
return messaging_service::no_wait();
});
}
void gossiper::uninit_messaging_service_handler() {
@@ -282,7 +293,9 @@ void gossiper::uninit_messaging_service_handler() {
ms.unregister_gossip_echo();
ms.unregister_gossip_shutdown();
ms.unregister_gossip_digest_syn();
ms.unregister_gossip_digest_ack();
ms.unregister_gossip_digest_ack2();
_ms_registered = false;
}
future<bool> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_address> epset) {
@@ -297,18 +310,11 @@ future<bool> gossiper::send_gossip(gossip_digest_syn message, std::set<inet_addr
inet_address to = __live_endpoints[index];
auto id = get_msg_addr(to);
logger.trace("Sending a GossipDigestSyn to {} ...", id);
return ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) {
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn Reply");
return this->handle_ack_msg(id, std::move(ack_msg));
} catch (...) {
// It is normal to reach here because it is normal that a node
// tries to send a SYN message to a peer node which is down before
// failure_detector thinks that peer node is down.
logger.trace("Fail to send GossipDigestSyn to {}: {}", id, std::current_exception());
}
return make_ready_future<>();
return ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
// It is normal to reach here because it is normal that a node
// tries to send a SYN message to a peer node which is down before
// failure_detector thinks that peer node is down.
logger.trace("Fail to send GossipDigestSyn to {}: {}", id, ep);
}).then([this, to] {
return make_ready_future<bool>(_seeds.count(to));
});
@@ -1319,6 +1325,13 @@ future<> gossiper::do_shadow_round() {
build_seeds_list();
_in_shadow_round = true;
auto t = clk::now();
// When peer node receives a syn message, it will send back a ack message.
// So, we need to register gossip message handlers before sending syn message.
get_gossiper().invoke_on_all([] (gossiper& g) {
g.init_messaging_service_handler();
}).get();
while (this->_in_shadow_round) {
// send a completely empty syn
for (inet_address seed : _seeds) {
@@ -1326,24 +1339,17 @@ future<> gossiper::do_shadow_round() {
gossip_digest_syn message(get_cluster_name(), get_partitioner_name(), digests);
auto id = get_msg_addr(seed);
logger.trace("Sending a GossipDigestSyn (ShadowRound) to {} ...", id);
ms().send_gossip_digest_syn(id, std::move(message)).then_wrapped([this, id] (auto&& f) {
try {
auto ack_msg = f.get0();
logger.trace("Got GossipDigestSyn (ShadowRound) Reply");
return this->handle_ack_msg(id, std::move(ack_msg));
} catch (...) {
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, std::current_exception());
}
return make_ready_future<>();
ms().send_gossip_digest_syn(id, std::move(message)).handle_exception([id] (auto ep) {
logger.trace("Fail to send GossipDigestSyn (ShadowRound) to {}: {}", id, ep);
}).get();
}
auto& ss = service::get_local_storage_service();
sleep(std::chrono::seconds(1)).get();
if (clk::now() > t + ss.get_ring_delay() * 60) {
throw std::runtime_error(sprint("Unable to gossip with any seeds (ShadowRound)"));
}
if (this->_in_shadow_round) {
logger.info("Sleep 1 second and connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
sleep(std::chrono::seconds(1)).get();
logger.info("Connect seeds again ... ({} seconds passed)", std::chrono::duration_cast<std::chrono::seconds>(clk::now() - t).count());
}
}
});

View File

@@ -88,8 +88,8 @@ private:
}
void init_messaging_service_handler();
void uninit_messaging_service_handler();
future<gossip_digest_ack> handle_syn_msg(gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr id, gossip_digest_ack ack_msg);
future<> handle_syn_msg(msg_addr from, gossip_digest_syn syn_msg);
future<> handle_ack_msg(msg_addr from, gossip_digest_ack ack_msg);
static constexpr uint32_t _default_cpuid = 0;
msg_addr get_msg_addr(inet_address to) {
return msg_addr{to, _default_cpuid};
@@ -506,6 +506,7 @@ public:
future<> wait_for_gossip_to_settle();
private:
uint64_t _nr_run = 0;
bool _ms_registered = false;
};
extern distributed<gossiper> _the_gossiper;

View File

@@ -614,16 +614,29 @@ future<> messaging_service::send_gossip_shutdown(msg_addr id, inet_address from)
return send_message_oneway(this, messaging_verb::GOSSIP_SHUTDOWN, std::move(id), std::move(from));
}
void messaging_service::register_gossip_digest_syn(std::function<future<gossip_digest_ack> (gossip_digest_syn)>&& func) {
// gossip syn
void messaging_service::register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gossip_digest_syn)>&& func) {
register_handler(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(func));
}
void messaging_service::unregister_gossip_digest_syn() {
_rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_SYN);
}
future<gossip_digest_ack> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) {
return send_message_timeout<gossip_digest_ack>(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), 3000ms, std::move(msg));
future<> messaging_service::send_gossip_digest_syn(msg_addr id, gossip_digest_syn msg) {
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(msg));
}
// gossip ack
void messaging_service::register_gossip_digest_ack(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gossip_digest_ack)>&& func) {
register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(func));
}
void messaging_service::unregister_gossip_digest_ack() {
_rpc->unregister_handler(net::messaging_verb::GOSSIP_DIGEST_ACK);
}
future<> messaging_service::send_gossip_digest_ack(msg_addr id, gossip_digest_ack msg) {
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK, std::move(id), std::move(msg));
}
// gossip ack2
void messaging_service::register_gossip_digest_ack2(std::function<rpc::no_wait_type (gossip_digest_ack2)>&& func) {
register_handler(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(func));
}

View File

@@ -234,9 +234,14 @@ public:
future<> send_gossip_shutdown(msg_addr id, inet_address from);
// Wrapper for GOSSIP_DIGEST_SYN
void register_gossip_digest_syn(std::function<future<gms::gossip_digest_ack> (gms::gossip_digest_syn)>&& func);
void register_gossip_digest_syn(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_syn)>&& func);
void unregister_gossip_digest_syn();
future<gms::gossip_digest_ack> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg);
future<> send_gossip_digest_syn(msg_addr id, gms::gossip_digest_syn msg);
// Wrapper for GOSSIP_DIGEST_ACK
void register_gossip_digest_ack(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, gms::gossip_digest_ack)>&& func);
void unregister_gossip_digest_ack();
future<> send_gossip_digest_ack(msg_addr id, gms::gossip_digest_ack msg);
// Wrapper for GOSSIP_DIGEST_ACK2
void register_gossip_digest_ack2(std::function<rpc::no_wait_type (gms::gossip_digest_ack2)>&& func);

View File

@@ -20,9 +20,10 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "core/reactor.hh"
#include "core/app-template.hh"
#include "core/sstring.hh"
#include <core/reactor.hh>
#include <core/app-template.hh>
#include <seastar/core/sstring.hh>
#include <seastar/rpc/rpc_types.hh>
#include "message/messaging_service.hh"
#include "gms/gossip_digest_syn.hh"
#include "gms/gossip_digest_ack.hh"
@@ -57,10 +58,13 @@ public:
future<> stop() {
return make_ready_future<>();
}
promise<> digest_test_done;
public:
void init_handler() {
ms.register_gossip_digest_syn([] (gms::gossip_digest_syn msg) {
ms.register_gossip_digest_syn([this] (const rpc::client_info& cinfo, gms::gossip_digest_syn msg) {
print("Server got syn msg = %s\n", msg);
auto from = net::messaging_service::get_source(cinfo);
auto ep1 = inet_address("1.1.1.1");
auto ep2 = inet_address("2.2.2.2");
int32_t gen = 800;
@@ -74,7 +78,26 @@ public:
{ep2, endpoint_state()},
};
gms::gossip_digest_ack ack(std::move(digests), std::move(eps));
return make_ready_future<gms::gossip_digest_ack>(std::move(ack));
ms.send_gossip_digest_ack(from, std::move(ack)).handle_exception([] (auto ep) {
print("Fail to send ack : %s", ep);
});
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack([this] (const rpc::client_info& cinfo, gms::gossip_digest_ack msg) {
print("Server got ack msg = %s\n", msg);
auto from = net::messaging_service::get_source(cinfo);
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
std::map<inet_address, endpoint_state> eps{
{ep1, endpoint_state()},
};
gms::gossip_digest_ack2 ack2(std::move(eps));
ms.send_gossip_digest_ack2(from, std::move(ack2)).handle_exception([] (auto ep) {
print("Fail to send ack2 : %s", ep);
});
digest_test_done.set_value();
return messaging_service::no_wait();
});
ms.register_gossip_digest_ack2([] (gms::gossip_digest_ack2 msg) {
@@ -108,18 +131,8 @@ public:
{ep2, gen++, ver++},
};
gms::gossip_digest_syn syn("my_cluster", "my_partition", digests);
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this, id] (gms::gossip_digest_ack ack) {
print("Client sent gossip_digest_syn got gossip_digest_ack reply = %s\n", ack);
// Prepare gossip_digest_ack2 message
auto ep1 = inet_address("3.3.3.3");
std::map<inet_address, endpoint_state> eps{
{ep1, endpoint_state()},
};
gms::gossip_digest_ack2 ack2(std::move(eps));
return ms.send_gossip_digest_ack2(id, std::move(ack2)).then([] () {
print("Client sent gossip_digest_ack2 got reply = void\n");
return make_ready_future<>();
});
return ms.send_gossip_digest_syn(id, std::move(syn)).then([this] {
return digest_test_done.get_future();
});
}
@@ -169,6 +182,7 @@ int main(int ac, char ** av) {
api_port++;
}
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
utils::fb_utilities::set_broadcast_address(listen);
net::get_messaging_service().start(listen).then([config, api_port, stay_alive] () {
auto testers = new distributed<tester>;
testers->start().then([testers]{