mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Adding dropped messages counter to messaging_service
This adds a drop messages counter per verb type to the messaging service. It will be used by the API to return the number of dropped messages. Signed-off-by: Amnon Heiman <amnon@cloudius-systems.com>
This commit is contained in:
@@ -63,6 +63,7 @@ enum class messaging_verb : int32_t {
|
||||
UNUSED_1,
|
||||
UNUSED_2,
|
||||
UNUSED_3,
|
||||
LAST,
|
||||
};
|
||||
|
||||
} // namespace net
|
||||
@@ -273,6 +274,20 @@ public:
|
||||
f(i->first, i->second);
|
||||
}
|
||||
}
|
||||
|
||||
void increment_dropped_messages(messaging_verb verb) {
|
||||
_dropped_messages[static_cast<int32_t>(verb)]++;
|
||||
}
|
||||
|
||||
uint64_t get_dropped_messages(messaging_verb verb) const {
|
||||
return _dropped_messages[static_cast<int32_t>(verb)];
|
||||
}
|
||||
|
||||
const uint64_t* get_dropped_messages() const {
|
||||
return _dropped_messages;
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
static constexpr uint16_t _default_port = 7000;
|
||||
gms::inet_address _listen_address;
|
||||
@@ -280,6 +295,7 @@ private:
|
||||
rpc::protocol<serializer, messaging_verb> _rpc;
|
||||
rpc::protocol<serializer, messaging_verb>::server _server;
|
||||
std::unordered_map<shard_id, shard_info, shard_id::hash> _clients;
|
||||
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
|
||||
public:
|
||||
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"))
|
||||
: _listen_address(ip)
|
||||
@@ -313,9 +329,10 @@ public:
|
||||
auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
|
||||
auto& rpc_client = get_rpc_client(id);
|
||||
auto rpc_handler = _rpc.make_client<MsgIn(MsgOut...)>(verb);
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (auto&& f) {
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id, verb] (auto&& f) {
|
||||
try {
|
||||
if (f.failed()) {
|
||||
this->increment_dropped_messages(verb);
|
||||
f.get();
|
||||
assert(false); // never reached
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user