From adff3b9c795e4b48eccd0dca056c6d56ac0c2c0e Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 16:59:53 +0800 Subject: [PATCH 1/8] gossip: Drop redundant print in heart_beat_state --- gms/heart_beat_state.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/heart_beat_state.hh b/gms/heart_beat_state.hh index ef0ea01ccc..9b4a04deac 100644 --- a/gms/heart_beat_state.hh +++ b/gms/heart_beat_state.hh @@ -62,7 +62,7 @@ public: } friend inline std::ostream& operator<<(std::ostream& os, const heart_beat_state& h) { - return os << "HeartBeat: generation = " << h._generation << ", version = " << h._version; + return os << "generation = " << h._generation << ", version = " << h._version; } // The following replaces HeartBeatStateSerializer from the Java code From d661827045a0d51c949d9c2f1224f523a3106d55 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:01:52 +0800 Subject: [PATCH 2/8] gossip: Fix get_broadcast_address It is default to listen_address. --- gms/gossiper.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/gossiper.hh b/gms/gossiper.hh index c82c03e61b..e4dadd3fd9 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -98,7 +98,7 @@ private: private: inet_address get_broadcast_address() { // FIXME: Helper for FBUtilities.getBroadcastAddress - return inet_address(0xffffff); + return ms().listen_address(); } std::set _seeds_from_config; public: From eeafdf5815b7e99a6bd4e522b00bcd5b10923b32 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:03:46 +0800 Subject: [PATCH 3/8] gossip: Make gms::versioned_value::load static We are supposed to call it without an instance. We will convert other similar functions in follow up patches. --- gms/versioned_value.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index df8ec9425d..5757ea3c87 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -139,7 +139,7 @@ public: } #endif - versioned_value load(double load) + static inline versioned_value load(double load) { return versioned_value(to_sstring_sprintf(load, "%g")); } From 7f98644742abd4f57d6f0b58e80a51b89596dafd Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:08:19 +0800 Subject: [PATCH 4/8] gossip: Fix send_gossip Insert when local_ep_state_ptr is engaged not otherwise. --- gms/gossiper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 713981b806..347754dea0 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -168,7 +168,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set eps for (auto g_digest : g_digest_list) { inet_address addr = g_digest.get_endpoint(); auto local_ep_state_ptr = this->get_state_for_version_bigger_than(addr, g_digest.get_max_version()); - if (!local_ep_state_ptr) { + if (local_ep_state_ptr) { delta_ep_state_map.emplace(addr, *local_ep_state_ptr); } } From 4cffb5513d8abaed4822733efce0f2b1a7ea9174 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:40:16 +0800 Subject: [PATCH 5/8] gossip: Drop unnecessary FIXME --- gms/gossiper.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 347754dea0..eeea110acb 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1113,8 +1113,6 @@ void gossiper::maybe_initialize_local_state(int generation_nbr) { heart_beat_state hb_state(generation_nbr); endpoint_state local_state(hb_state); local_state.mark_alive(); - // FIXME - // endpoint_state_map.putIfAbsent(FBUtilities.getBroadcastAddress(), local_state); inet_address ep = get_broadcast_address(); auto it = endpoint_state_map.find(ep); if (it == endpoint_state_map.end()) { From 4abee75c040cf63940522423fe15a122d804c362 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:36:16 +0800 Subject: [PATCH 6/8] gossip: Drop fail guard in mark_alive and apply_state_locally --- gms/gossiper.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index eeea110acb..57d100ea6d 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -273,7 +273,6 @@ void gossiper::apply_state_locally(std::map& map) // this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive get_local_failure_detector().report(ep); handle_major_state_change(ep, remote_state); - fail(unimplemented::cause::GOSSIP); } } } @@ -838,8 +837,6 @@ void gossiper::notify_failure_detector(std::map re } void gossiper::mark_alive(inet_address addr, endpoint_state local_state) { - fail(unimplemented::cause::GOSSIP); - // if (MessagingService.instance().getVersion(addr) < MessagingService.VERSION_20) { // real_mark_alive(addr, local_state); // return; From 02f8c9d9652057e124d28c60317741a1b46ed668 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:37:59 +0800 Subject: [PATCH 7/8] gossip: Add dump_endpoint_state_map for debug --- gms/gossiper.cc | 8 ++++++++ gms/gossiper.hh | 2 ++ 2 files changed, 10 insertions(+) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 57d100ea6d..380a04e05a 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1201,4 +1201,12 @@ int64_t gossiper::compute_expire_time() { return now_millis() + A_VERY_LONG_TIME; } +void gossiper::dump_endpoint_state_map() { + print("----------- endpoint_state_map dump beg -----------\n"); + for (auto& x : endpoint_state_map) { + print("ep=%s, eps=%s\n", x.first, x.second); + } + print("----------- endpoint_state_map dump end -----------\n"); +} + } // namespace gms diff --git a/gms/gossiper.hh b/gms/gossiper.hh index e4dadd3fd9..87eb23afa2 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -472,6 +472,8 @@ public: void add_expire_time_for_endpoint(inet_address endpoint, int64_t expire_time); static int64_t compute_expire_time(); +public: + void dump_endpoint_state_map(); }; extern distributed _the_gossiper; From 6a2eed05fd33a5df7c6a48b0f46c70da9a58c2b3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Thu, 16 Apr 2015 17:38:46 +0800 Subject: [PATCH 8/8] tests: Gossip around node load info $ ./gossip --seed 127.0.0.1 --listen-address 127.0.0.1 $ ./gossip --seed 127.0.0.1 --listen-address 127.0.0.2 $ ./gossip --seed 127.0.0.1 --listen-address 127.0.0.3 After a few seconds, all the 3 nodes will know each other's load info by gossip. ----------- endpoint_state_map dump beg ----------- ep=127.0.0.1, eps=EndpointState: HeartBeatState = generation = 1, version = 0, AppStateMap = { 1 : Value(0.5,1) } ep=127.0.0.2, eps=EndpointState: HeartBeatState = generation = 1, version = 0, AppStateMap = { 1 : Value(0.5,1) } ep=127.0.0.3, eps=EndpointState: HeartBeatState = generation = 1, version = 0, AppStateMap = { 1 : Value(0.5,1) } ----------- endpoint_state_map dump end ----------- --- tests/urchin/gossip.cc | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/urchin/gossip.cc b/tests/urchin/gossip.cc index 797cd769c4..ee29870dae 100644 --- a/tests/urchin/gossip.cc +++ b/tests/urchin/gossip.cc @@ -3,6 +3,7 @@ #include "message/messaging_service.hh" #include "gms/failure_detector.hh" #include "gms/gossiper.hh" +#include "gms/application_state.hh" namespace bpo = boost::program_options; @@ -31,8 +32,20 @@ int main(int ac, char ** av) { std::cout << "Start gossiper service ...\n"; auto& gossiper = gms::get_local_gossiper(); gossiper.set_seeds(std::move(seeds)); + + std::map app_states = { + { gms::application_state::LOAD, gms::versioned_value::versioned_value_factory::load(0.5) }, + }; + int generation_number = 1; - gossiper.start(generation_number); + gossiper.start(generation_number, app_states); + + auto reporter = std::make_shared>(); + reporter->set_callback ([reporter] { + auto& gossiper = gms::get_local_gossiper(); + gossiper.dump_endpoint_state_map(); + }); + reporter->arm_periodic(std::chrono::milliseconds(1000)); }); }); });