From ea08c7e0008006c2937c64eddd054087a26d54e3 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 18:09:30 +0800 Subject: [PATCH 01/13] utils: Fix bounded_stats_deque::add --- utils/bounded_stats_deque.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/bounded_stats_deque.hh b/utils/bounded_stats_deque.hh index f86e3f3fd8..557dcd38a4 100644 --- a/utils/bounded_stats_deque.hh +++ b/utils/bounded_stats_deque.hh @@ -47,8 +47,8 @@ public: auto removed = _deque.front(); _deque.pop_front(); _sum -= removed; - _deque.push_back(i); } + _deque.push_back(i); _sum += i; } From 5a54ae3214ceaebdef50efbe33f31a425405264f Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Apr 2015 08:13:18 +0800 Subject: [PATCH 02/13] gossip: Fix string serializer in messaging_service --- message/messaging_service.hh | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 847da9d8ba..a5921934d7 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -111,9 +111,11 @@ struct serializer { // For sstring inline auto operator()(output_stream& out, sstring& v) { - auto sz = serialize_int16_size + v.size(); + auto serialize_string_size = serialize_int16_size + v.size(); + auto sz = serialize_int16_size + serialize_string_size; bytes b(bytes::initialized_later(), sz); auto _out = b.begin(); + serialize_int16(_out, serialize_string_size); serialize_string(_out, v); return out.write(reinterpret_cast(b.c_str()), sz); } @@ -122,12 +124,13 @@ struct serializer { if (buf.size() != serialize_int16_size) { throw rpc::closed_error(); } - size_t sz = net::ntoh(*reinterpret_cast*>(buf.get())); - return in.read_exactly(sz).then([sz, &v] (temporary_buffer buf) mutable { - if (buf.size() != sz) { + size_t serialize_string_size = net::ntoh(*reinterpret_cast*>(buf.get())); + return in.read_exactly(serialize_string_size).then([serialize_string_size, &v] + (temporary_buffer buf) mutable { + if (buf.size() != serialize_string_size) { throw rpc::closed_error(); } - bytes_view bv(reinterpret_cast(buf.get()), sz); + bytes_view bv(reinterpret_cast(buf.get()), serialize_string_size); v = read_simple_short_string(bv); return make_ready_future<>(); }); From a800fbfe6425e197e2dd346de69b5f7f002a626c Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 15:22:01 +0800 Subject: [PATCH 03/13] gossip: Set get_phi_convict_threshold to 8 It is the default value. --- gms/failure_detector.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gms/failure_detector.cc b/gms/failure_detector.cc index 4599205eb6..482a567941 100644 --- a/gms/failure_detector.cc +++ b/gms/failure_detector.cc @@ -183,10 +183,10 @@ void failure_detector::set_phi_convict_threshold(double phi) { } double failure_detector::get_phi_convict_threshold() { - // FIXME + // FIXME: phi_convict_threshold must be between 5 and 16" // return DatabaseDescriptor.getPhiConvictThreshold(); warn(unimplemented::cause::GOSSIP); - return 0; + return 8; } bool failure_detector::is_alive(inet_address ep) { From f2e840de5415fa8cf861318dfca7e7080363da0f Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Apr 2015 09:54:33 +0800 Subject: [PATCH 04/13] gossip: Switch from fail to warn Warn is enough for now. --- gms/gossiper.cc | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 4ee714526e..16466d2021 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -289,7 +289,7 @@ void gossiper::remove_endpoint(inet_address endpoint) { get_local_failure_detector().remove(endpoint); // FIXME: MessagingService //MessagingService.instance().resetVersion(endpoint); - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); quarantine_endpoint(endpoint); // FIXME: MessagingService //MessagingService.instance().destroyConnectionPool(endpoint); @@ -325,7 +325,6 @@ void gossiper::do_status_check() { } get_local_failure_detector().interpret(endpoint); - fail(unimplemented::cause::GOSSIP); auto it = endpoint_state_map.find(endpoint); if (it != endpoint_state_map.end()) { @@ -447,7 +446,7 @@ std::set gossiper::get_live_token_owners() { if (it != endpoint_state_map.end() && !is_dead_state(it->second) /* && StorageService.instance.getTokenMetadata().isMember(member) */) { token_owners.insert(member); } - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); } return token_owners; } @@ -456,7 +455,7 @@ std::set gossiper::get_unreachable_token_owners() { std::set token_owners; for (auto&& x : _unreachable_endpoints) { auto& endpoint = x.first; - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); if (true /* StorageService.instance.getTokenMetadata().isMember(endpoint) */) { token_owners.insert(endpoint); } @@ -572,7 +571,7 @@ void gossiper::advertise_removing(inet_address endpoint, utils::UUID host_id, ut // logger.info("Removing host: {}", host_id); // logger.info("Sleeping for {}ms to ensure {} does not change", StorageService.RING_DELAY, endpoint); // FIXME: sleep - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); // Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY, TimeUnit.MILLISECONDS); // make sure it did not change auto& eps = endpoint_state_map.at(endpoint); @@ -603,7 +602,7 @@ void gossiper::advertise_token_removed(inet_address endpoint, utils::UUID host_i // ensure at least one gossip round occurs before returning // FIXME: sleep //Uninterruptibles.sleepUninterruptibly(INTERVAL_IN_MILLIS * 2, TimeUnit.MILLISECONDS); - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); } void gossiper::unsafe_assassinate_endpoint(sstring address) { @@ -620,7 +619,7 @@ void gossiper::assassinate_endpoint(sstring address) { // logger.warn("Assassinating {} via gossip", endpoint); if (is_exist) { // FIXME: - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); #if 0 try { tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint); @@ -757,7 +756,7 @@ std::map&gms::gossiper::get_endpoint_states() { bool gossiper::uses_host_id(inet_address endpoint) { // FIXME - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); if (true /* MessagingService.instance().knowsVersion(endpoint) */) { return true; } else if (get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::NET_VERSION)) { @@ -776,7 +775,7 @@ utils::UUID gossiper::get_host_id(inet_address endpoint) { } sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value; // FIXME: Add UUID(const sstring& id) constructor - fail(unimplemented::cause::GOSSIP); + warn(unimplemented::cause::GOSSIP); return utils::UUID(0, 0); } @@ -1156,7 +1155,7 @@ void gossiper::add_lccal_application_states(std::list Date: Tue, 21 Apr 2015 15:34:53 +0800 Subject: [PATCH 05/13] gossip: Update heart beat --- gms/endpoint_state.hh | 2 +- gms/gossiper.cc | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 95e2cffb03..86622c2c2f 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -55,7 +55,7 @@ public: , _is_alive(true) { } - heart_beat_state get_heart_beat_state() { + heart_beat_state& get_heart_beat_state() { return _heart_beat_state; } diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 16466d2021..783b5706d8 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -367,9 +367,8 @@ void gossiper::run() { //wait on messaging service to start listening // MessagingService.instance().waitUntilListening(); - /* Update the local heartbeat counter. */ - //endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().updateHeartBeat(); + endpoint_state_map[get_broadcast_address()].get_heart_beat_state().update_heart_beat(); // if (logger.isTraceEnabled()) // logger.trace("My heartbeat is now {}", endpoint_state_map.get(FBUtilities.getBroadcastAddress()).get_heart_beat_state().get_heart_beat_version()); std::vector g_digests; From 5f0050dc973bf3bf6b81de1cd665cccb20b48bce Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 16:08:37 +0800 Subject: [PATCH 06/13] gossip: Fix add_application_state If the key exists, we should update the new value. --- gms/endpoint_state.hh | 2 +- gms/versioned_value.hh | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 86622c2c2f..14a1917f4d 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -82,7 +82,7 @@ public: } void add_application_state(application_state key, versioned_value value) { - _application_state.emplace(key, value); + _application_state[key] = value; } /* getters and setters */ diff --git a/gms/versioned_value.hh b/gms/versioned_value.hh index 5757ea3c87..aff77b768d 100644 --- a/gms/versioned_value.hh +++ b/gms/versioned_value.hh @@ -65,8 +65,13 @@ public: // values for ApplicationState.REMOVAL_COORDINATOR static constexpr const char *REMOVAL_COORDINATOR = "REMOVER"; - const int version; - const sstring value; + int version; + sstring value; +public: + versioned_value() + : version(version_generator::get_next_version()) + , value("") { + } private: versioned_value(const sstring& value, int version = version_generator::get_next_version()) From 622ec0111d67a9f42409fe03c2904d701ca37efa Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 16:40:44 +0800 Subject: [PATCH 07/13] gossip: Fix apply_new_states We should take a reference, otherwise remote's endpoint_state will not be updated locally. --- gms/gossiper.cc | 2 +- gms/gossiper.hh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 783b5706d8..5408dcd442 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -915,7 +915,7 @@ bool gossiper::is_dead_state(endpoint_state eps) { return false; } -void gossiper::apply_new_states(inet_address addr, endpoint_state local_state, endpoint_state remote_state) { +void gossiper::apply_new_states(inet_address addr, endpoint_state& local_state, endpoint_state& remote_state) { // don't assert here, since if the node restarts the version will go back to zero //int oldVersion = local_state.get_heart_beat_state().get_heart_beat_version(); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index eca2a01b6f..7bcd434631 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -372,7 +372,7 @@ public: void apply_state_locally(std::map& map); private: - void apply_new_states(inet_address addr, endpoint_state local_state, endpoint_state remote_state); + void apply_new_states(inet_address addr, endpoint_state& local_state, endpoint_state& remote_state); // notify that a local application state is going to change (doesn't get triggered for remote changes) void do_before_change_notifications(inet_address addr, endpoint_state& ep_state, application_state& ap_state, versioned_value& new_value); From 0060eac413a4d33053b7736032e9bf7c538b9cfd Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 18:11:03 +0800 Subject: [PATCH 08/13] gossip: Set last processed time when receiving gossip message --- gms/gossiper.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 5408dcd442..5b6f8753db 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -85,15 +85,19 @@ void gossiper::do_sort(std::vector& g_digest_list) { } void gossiper::init_messaging_service_handler() { - ms().register_handler(messaging_verb::ECHO, [] (empty_msg msg) { + ms().register_handler(messaging_verb::ECHO, [this] (empty_msg msg) { + // TODO: Use time_point instead of long for timing. + this->set_last_processed_message_at(now_millis()); return make_ready_future(); }); - ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [] (inet_address from) { + ms().register_handler_oneway(messaging_verb::GOSSIP_SHUTDOWN, [this] (inet_address from) { + this->set_last_processed_message_at(now_millis()); // TODO: Implement processing of incoming SHUTDOWN message get_local_failure_detector().force_conviction(from); return messaging_service::no_wait(); }); ms().register_handler(messaging_verb::GOSSIP_DIGEST_SYN, [this] (gossip_digest_syn syn_msg) { + this->set_last_processed_message_at(now_millis()); inet_address from; if (!this->is_enabled()) { return make_ready_future(gossip_digest_ack()); @@ -117,6 +121,7 @@ void gossiper::init_messaging_service_handler() { return make_ready_future(std::move(ack_msg)); }); ms().register_handler_oneway(messaging_verb::GOSSIP_DIGEST_ACK2, [this] (gossip_digest_ack2 msg) { + this->set_last_processed_message_at(now_millis()); auto& remote_ep_state_map = msg.get_endpoint_state_map(); /* Notify the Failure Detector */ this->notify_failure_detector(remote_ep_state_map); @@ -140,6 +145,7 @@ bool gossiper::send_gossip(gossip_digest_syn message, std::set eps using RetMsg = gossip_digest_ack; auto id = get_shard_id(to); ms().send_message(messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(message)).then([this, id] (RetMsg ack_msg) { + this->set_last_processed_message_at(now_millis()); if (!this->is_enabled() && !this->is_in_shadow_round()) { return make_ready_future<>(); } @@ -837,6 +843,7 @@ void gossiper::mark_alive(inet_address addr, endpoint_state local_state) { //logger.trace("Sending a EchoMessage to {}", addr); shard_id id = get_shard_id(addr); ms().send_message(messaging_verb::ECHO, id).then([this, addr, local_state = std::move(local_state)] (empty_msg msg) mutable { + this->set_last_processed_message_at(now_millis()); this->real_mark_alive(addr, local_state); }); } @@ -1065,6 +1072,7 @@ void gossiper::do_shadow_round() { auto id = get_shard_id(seed); ms().send_message(messaging_verb::GOSSIP_DIGEST_SYN, std::move(id), std::move(message)).then([this, id] (gossip_digest_ack ack_msg) { + this->set_last_processed_message_at(now_millis()); if (this->is_in_shadow_round()) { this->finish_shadow_round(); } From bf3d6a4c06cc9c52757ba70f1af9bc535f5038a1 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Apr 2015 10:04:29 +0800 Subject: [PATCH 09/13] gossip: Disable sleep and retry logic in do_status_check We do not have the ThreadPoolExecutor logic. Disable the sleep and retry logic. --- gms/gossiper.cc | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 5b6f8753db..ccb079130d 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -309,21 +309,6 @@ void gossiper::do_status_check() { int64_t now = now_millis(); - // FIXME: - // int64_t pending = ((JMXEnabledThreadPoolExecutor) StageManager.getStage(Stage.GOSSIP)).getPendingTasks(); - int64_t pending = 1; - if (pending > 0 && _last_processed_message_at < now - 1000) { - // FIXME: SLEEP - // if some new messages just arrived, give the executor some time to work on them - //Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - - // still behind? something's broke - if (_last_processed_message_at < now - 1000) { - // logger.warn("Gossip stage has {} pending tasks; skipping status check (no nodes will be marked down)", pending); - return; - } - } - for (auto& entry : endpoint_state_map) { const inet_address& endpoint = entry.first; if (endpoint == get_broadcast_address()) { From 3d36debcf85ce023b97a2bffd3117016f96fc0c2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 22 Apr 2015 15:27:02 +0800 Subject: [PATCH 10/13] message: Do not call remove_rpc_client() under exception We can not remove the rpc client while the client might still "active". --- message/messaging_service.hh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a5921934d7..26f85005f9 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -300,7 +300,9 @@ public: auto ret = f.get(); return make_ready_future(std::move(std::get<0>(ret))); } catch (std::runtime_error&) { - remove_rpc_client(id); + // FIXME: we need to distinguish between a transport error and + // a server error. + // remove_rpc_client(id); throw; } }); @@ -315,7 +317,8 @@ public: f.get(); return make_ready_future<>(); } catch (std::runtime_error&) { - remove_rpc_client(id); + // FIXME: as above + // remove_rpc_client(id); throw; } }); From 5bb42de01068688b43aa00000e5d0413e0ebc185 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 16:10:19 +0800 Subject: [PATCH 11/13] tests: Update load info over the time in tests/urchin/gossip.cc --- tests/urchin/gossip.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/urchin/gossip.cc b/tests/urchin/gossip.cc index c3f1e93258..2d6331649a 100644 --- a/tests/urchin/gossip.cc +++ b/tests/urchin/gossip.cc @@ -47,6 +47,17 @@ int main(int ac, char ** av) { print("%s", fd); }); reporter->arm_periodic(std::chrono::milliseconds(1000)); + + auto app_state_adder = std::make_shared>(); + app_state_adder->set_callback ([app_state_adder] { + static double load = 0.5; + auto& gossiper = gms::get_local_gossiper(); + auto state = gms::application_state::LOAD; + auto value = gms::versioned_value::versioned_value_factory::load(load); + gossiper.add_local_application_state(state, value); + load += 0.0001; + }); + app_state_adder->arm_periodic(std::chrono::seconds(1)); }); }); }); From 346d00cc8c42ceb2ef50f39250f0c3c3647ee12a Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 10:37:05 +0800 Subject: [PATCH 12/13] tests: Add listen-address option to tests/urchin/message.cc --- tests/urchin/message.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/urchin/message.cc b/tests/urchin/message.cc index 521ba7dcbb..648092683c 100644 --- a/tests/urchin/message.cc +++ b/tests/urchin/message.cc @@ -144,9 +144,12 @@ int main(int ac, char ** av) { app_template app; app.add_options() ("server", bpo::value(), "Server ip") + ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen") ("cpuid", bpo::value()->default_value(0), "Server cpuid"); return app.run(ac, av, [&] { - net::get_messaging_service().start().then([&] () { + auto&& config = app.configuration(); + auto listen = gms::inet_address(config["listen-address"].as()); + net::get_messaging_service().start(std::ref(listen)).then([&] () { auto&& config = app.configuration(); auto testers = new distributed; testers->start().then([testers]{ From 9377cdcc45176e4b6fd234bf614f4b9e64d7fa4b Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 21 Apr 2015 10:21:58 +0800 Subject: [PATCH 13/13] tests: Add exception tests to tests/urchin/message.cc --- tests/urchin/message.cc | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/tests/urchin/message.cc b/tests/urchin/message.cc index 648092683c..5e8bd0dd0b 100644 --- a/tests/urchin/message.cc +++ b/tests/urchin/message.cc @@ -81,6 +81,12 @@ public: std::tuple ret(x*x, y*y); return make_ready_future(std::move(ret)); }); + ms.register_handler(messaging_verb::UNUSED_1, [] (int x, long y) { + print("Server got echo msg = (%d, %ld) \n", x, y); + throw std::runtime_error("I'm throwing runtime_error exception"); + long ret = x + y; + return make_ready_future(ret); + }); } public: @@ -129,11 +135,34 @@ public: int msg1 = 30; int msg2 = 60; using RetMsg = std::tuple; - return ms.send_message(messaging_verb::ECHO, id, msg1, msg2).then([] (RetMsg msg) { - print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg)); - return sleep(100ms).then([]{ + return ms.send_message(messaging_verb::ECHO, id, msg1, msg2).then_wrapped([] (future f) { + try { + auto msg = std::get<0>(f.get()); + print("Client sent echo got reply = (%d , %ld)\n", std::get<0>(msg), std::get<1>(msg)); + return sleep(100ms).then([]{ + return make_ready_future<>(); + }); + } catch (std::runtime_error& e) { + print("test_echo: %s\n", e.what()); + } + return make_ready_future<>(); + }); + } + + future<> test_exception() { + print("=== %s ===\n", __func__); + auto id = get_shard_id(); + int msg1 = 3; + int msg2 = 6; + return ms.send_message(messaging_verb::UNUSED_1, id, msg1, msg2).then_wrapped([] (future f) { + try { + auto ret = std::get<0>(f.get()); + print("Client sent UNUSED_1 got reply = %ld\n", ret); return make_ready_future<>(); - }); + } catch (std::runtime_error& e) { + print("Client sent UNUSED_1 got exception: %s\n", e.what()); + } + return make_ready_future<>(); }); } }; @@ -172,6 +201,8 @@ int main(int ac, char ** av) { return t->test_gossip_shutdown(); }).then([testers, t] { return t->test_echo(); + }).then([testers, t] { + return t->test_exception(); }).then([testers, t] { print("=============TEST DONE===========\n"); testers->stop().then([testers] {