From 896f562de752b88f3be0e4827744f903ccfba2f9 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 14 May 2015 14:25:06 +0300 Subject: [PATCH] Adding dropped messages counter to messaging_service This adds a drop messages counter per verb type to the messaging service. It will be used by the API to return the number of dropped messages. Signed-off-by: Amnon Heiman --- message/messaging_service.hh | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 4fe6272fed..8c8745cc7e 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -63,6 +63,7 @@ enum class messaging_verb : int32_t { UNUSED_1, UNUSED_2, UNUSED_3, + LAST, }; } // namespace net @@ -273,6 +274,20 @@ public: f(i->first, i->second); } } + + void increment_dropped_messages(messaging_verb verb) { + _dropped_messages[static_cast(verb)]++; + } + + uint64_t get_dropped_messages(messaging_verb verb) const { + return _dropped_messages[static_cast(verb)]; + } + + const uint64_t* get_dropped_messages() const { + return _dropped_messages; + } + + private: static constexpr uint16_t _default_port = 7000; gms::inet_address _listen_address; @@ -280,6 +295,7 @@ private: rpc::protocol _rpc; rpc::protocol::server _server; std::unordered_map _clients; + uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0")) : _listen_address(ip) @@ -313,9 +329,10 @@ public: auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) { auto& rpc_client = get_rpc_client(id); auto rpc_handler = _rpc.make_client(verb); - return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (auto&& f) { + return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id, verb] (auto&& f) { try { if (f.failed()) { + this->increment_dropped_messages(verb); f.get(); assert(false); // never reached }