From b236e8596130d9a45f89c6d3dae021eda6eb1139 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 11 May 2015 17:59:39 +0300 Subject: [PATCH 1/6] Add a foreach_client method to the messaging service The messaging service holds a table of clients which the API needs information from. This adds a foreach_client method that recieve a functions and itererate over all the clients calling the given function on each of them. This implementation support the current table that holds unique ptr. Signed-off-by: Amnon Heiman --- message/messaging_service.hh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5f7c0fd752..4fe6272fed 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -266,6 +266,13 @@ public: } std::unique_ptr::client> rpc_client; }; + + void foreach_client(std::function f) const { + for (auto i = _clients.cbegin(); i != _clients.cend(); i++) { + f(i->first, i->second); + } + } private: static constexpr uint16_t _default_port = 7000; gms::inet_address _listen_address; From 896f562de752b88f3be0e4827744f903ccfba2f9 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Thu, 14 May 2015 14:25:06 +0300 Subject: [PATCH 2/6] Adding dropped messages counter to messaging_service This adds a drop messages counter per verb type to the messaging service. It will be used by the API to return the number of dropped messages. Signed-off-by: Amnon Heiman --- message/messaging_service.hh | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 4fe6272fed..8c8745cc7e 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -63,6 +63,7 @@ enum class messaging_verb : int32_t { UNUSED_1, UNUSED_2, UNUSED_3, + LAST, }; } // namespace net @@ -273,6 +274,20 @@ public: f(i->first, i->second); } } + + void increment_dropped_messages(messaging_verb verb) { + _dropped_messages[static_cast(verb)]++; + } + + uint64_t get_dropped_messages(messaging_verb verb) const { + return _dropped_messages[static_cast(verb)]; + } + + const uint64_t* get_dropped_messages() const { + return _dropped_messages; + } + + private: static constexpr uint16_t _default_port = 7000; gms::inet_address _listen_address; @@ -280,6 +295,7 @@ private: rpc::protocol _rpc; rpc::protocol::server _server; std::unordered_map _clients; + uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0")) : _listen_address(ip) @@ -313,9 +329,10 @@ public: auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) { auto& rpc_client = get_rpc_client(id); auto rpc_handler = _rpc.make_client(verb); - return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (auto&& f) { + return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id, verb] (auto&& f) { try { if (f.failed()) { + this->increment_dropped_messages(verb); f.get(); assert(false); // never reached } From 04de4382f364a5ce259af2652574d823407d5ac2 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 11 May 2015 19:33:37 +0300 Subject: [PATCH 3/6] API Doc: Adding the messaging_service swagger definition This adds the messaging_service API definition it will expose the messaging_service metrics. Signed-off-by: Amnon Heiman --- api/api-doc/messaging_service.json | 216 +++++++++++++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 api/api-doc/messaging_service.json diff --git a/api/api-doc/messaging_service.json b/api/api-doc/messaging_service.json new file mode 100644 index 0000000000..789a49912b --- /dev/null +++ b/api/api-doc/messaging_service.json @@ -0,0 +1,216 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/messaging_service", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/messaging_service/totaltimeouts", + "operations":[ + { + "method":"GET", + "summary":"Total number of timeouts happened on this node", + "type":"long", + "nickname":"get_totaltimeouts", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/dropped", + "operations":[ + { + "method":"GET", + "summary":"Get the number of dropped messages per verb", + "type":"array", + "items":{ + "type":"verb_counter" + }, + "nickname":"get_dropped_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/replied", + "operations":[ + { + "method":"GET", + "summary":"Get the number of replied messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_completed_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/sent", + "operations":[ + { + "method":"GET", + "summary":"Get the number of sent messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_sent_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/pending", + "operations":[ + { + "method":"GET", + "summary":"Get the number of pending messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_pending_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/exception", + "operations":[ + { + "method":"GET", + "summary":"Get the number of messages return with an exception", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_exception_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/respond_pending", + "operations":[ + { + "method":"GET", + "summary":"Get the number of messages waiting for respond", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_respond_pending_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + } + ], + "models":{ + "message_counter":{ + "id":"message_counter", + "description":"Holds command counters", + "properties":{ + "count":{ + "type":"long" + }, + "ip":{ + "type":"string" + } + } + }, + "verb_counter":{ + "id":"verb_counters", + "description":"Holds verb counters", + "properties":{ + "count":{ + "type":"long" + }, + "verb":{ + "type":"string", + "enum":[ + "MUTATION", + "BINARY", + "READ_REPAIR", + "READ", + "REQUEST_RESPONSE", + "STREAM_INITIATE", + "STREAM_INITIATE_DONE", + "STREAM_REPLY", + "STREAM_REQUEST", + "RANGE_SLICE", + "BOOTSTRAP_TOKEN", + "TREE_REQUEST", + "TREE_RESPONSE", + "JOIN", + "GOSSIP_DIGEST_SYN", + "GOSSIP_DIGEST_ACK", + "GOSSIP_DIGEST_ACK2", + "DEFINITIONS_ANNOUNCE", + "DEFINITIONS_UPDATE", + "TRUNCATE", + "SCHEMA_CHECK", + "INDEX_SCAN", + "REPLICATION_FINISHED", + "INTERNAL_RESPONSE", + "COUNTER_MUTATION", + "STREAMING_REPAIR_REQUEST", + "STREAMING_REPAIR_RESPONSE", + "SNAPSHOT", + "MIGRATION_REQUEST", + "GOSSIP_SHUTDOWN", + "_TRACE", + "ECHO", + "REPAIR_MESSAGE", + "PAXOS_PREPARE", + "PAXOS_PROPOSE", + "PAXOS_COMMIT", + "PAGED_RANGE", + "UNUSED_1", + "UNUSED_2", + "UNUSED_3" + ] + } + } + } + } +} From 25188ed28e0545b3f3b50f6aba201039e950a4b9 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 1 Jun 2015 09:23:01 +0300 Subject: [PATCH 4/6] API: Adding a helper function to sum a map In a typical scenario the API collect values from multiple distributed instances. Sometimes it is needed to reduce multiple maps by merging their keys and accumulating their values. This is a helper function that can be used in map_reduce to perform maps sum. Signed-off-by: Amnon Heiman --- api/api.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/api/api.hh b/api/api.hh index e620288438..3a2b5bfe1a 100644 --- a/api/api.hh +++ b/api/api.hh @@ -37,6 +37,16 @@ std::vector map_to_key_value(const std::map& map) { } return res; } + +template +T map_sum(T&& dest, const S& src) { + for (auto i : src) { + dest[i.first] += i.second; + } + return dest; +} + + } #endif /* API_API_HH_ */ From a928c7422d2cc46c307ac8bf4ad39d3e7a9b83b4 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 11 May 2015 19:48:49 +0300 Subject: [PATCH 5/6] Add the messaging_service API implementation This API gother information about messages passing in the system, in this patch the following API will be supported: /messaging_service/messages/replied /messaging_service/messages/sent /messaging_service/messages/pending /messaging_service/messages/exception /messaging_service/messages/respond_pending /messaging_service/messages/dropped The swagger defintion can be retrieved from: /api-doc/messaging_service --- api/api.cc | 5 +++ api/messaging_service.cc | 96 ++++++++++++++++++++++++++++++++++++++++ api/messaging_service.hh | 16 +++++++ configure.py | 2 + 4 files changed, 119 insertions(+) create mode 100644 api/messaging_service.cc create mode 100644 api/messaging_service.hh diff --git a/api/api.cc b/api/api.cc index 27d7364872..55fc2934ec 100644 --- a/api/api.cc +++ b/api/api.cc @@ -9,6 +9,7 @@ #include "gossiper.hh" #include "failure_detector.hh" #include "column_family.hh" +#include "messaging_service.hh" namespace api { @@ -33,6 +34,10 @@ future<> set_server(http_context& ctx) { "The failure detector API"); set_failure_detector(ctx,r); + rb->register_function(r, "messaging_service", + "The messaging service API"); + set_messaging_service(ctx, r); + }); } diff --git a/api/messaging_service.cc b/api/messaging_service.cc new file mode 100644 index 0000000000..5bf78aaa36 --- /dev/null +++ b/api/messaging_service.cc @@ -0,0 +1,96 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "messaging_service.hh" +#include "message/messaging_service.hh" +#include "api/api-doc/messaging_service.json.hh" +#include +#include + +using namespace httpd::messaging_service_json; +using namespace net; + +namespace api { +using client = rpc::protocol::client; +static const int32_t num_verb = static_cast(messaging_verb::UNUSED_3) + 1; + +std::vector map_to_message_counters( + const std::unordered_map& map) { + std::vector res; + for (auto i : map) { + res.push_back(message_counter()); + res.back().ip = boost::lexical_cast(i.first); + res.back().count = i.second; + } + return res; +} + +/** + * Return a function that performs a map_reduce on messaging_service + * For each instance it calls its foreach_client method set the value + * according to a function that it gets as a parameter. + * + */ +future_json_function get_client_getter(std::function f) { + return [f](std::unique_ptr req) { + using map_type = std::unordered_map; + auto get_shard_map = [f](messaging_service& ms) { + std::unordered_map map; + ms.foreach_client([&map, f] (const messaging_service::shard_id& id, + const messaging_service::shard_info& info) { + map[id.addr] = f(*info.rpc_client.get()); + }); + return map; + }; + return get_messaging_service().map_reduce0(get_shard_map, map_type(), map_sum). + then([](map_type&& map) { + return make_ready_future(map_to_message_counters(map)); + }); + }; +} + +void set_messaging_service(http_context& ctx, routes& r) { + + get_sent_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().sent_messages; + })); + + get_exception_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().exception_received; + })); + + get_pending_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().pending; + })); + + get_respond_pending_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().wait_reply; + })); + + get_dropped_messages.set(r, [](std::unique_ptr req) { + shared_ptr> map = make_shared>(num_verb, 0); + + return net::get_messaging_service().map_reduce([map](const uint64_t* local_map) mutable { + for (auto i = 0; i < num_verb; i++) { + (*map)[i]+= local_map[i]; + } + },[](messaging_service& ms) { + return make_ready_future(ms.get_dropped_messages()); + }).then([map]{ + std::vector res; + for (auto i : verb_counter::verb_wrapper::all_items()) { + verb_counter c; + messaging_verb v = i; // for type safety we use messaging_verb values + if ((*map)[static_cast(v)] > 0) { + c.count = (*map)[static_cast(v)]; + c.verb = i; + res.push_back(c); + } + } + return make_ready_future(res); + }); + }); +} +} + diff --git a/api/messaging_service.hh b/api/messaging_service.hh new file mode 100644 index 0000000000..73548cd945 --- /dev/null +++ b/api/messaging_service.hh @@ -0,0 +1,16 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#ifndef API_MESSAGING_SERVICE_HH_ +#define API_MESSAGING_SERVICE_HH_ + +#include "api.hh" + +namespace api { + +void set_messaging_service(http_context& ctx, routes& r); + +} + +#endif /* API_MESSAGING_SERVICE_HH_ */ diff --git a/configure.py b/configure.py index f00075aacc..3d0ca41e28 100755 --- a/configure.py +++ b/configure.py @@ -319,6 +319,8 @@ api = ['api/api.cc', 'api/failure_detector.cc', 'api/api-doc/column_family.json', 'api/column_family.cc', + 'api/messaging_service.cc', + 'api/api-doc/messaging_service.json', ] boost_test_lib = [ From 0ffea496ac021df01723fcde7d9c615af89254c7 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 11 May 2015 15:06:06 +0300 Subject: [PATCH 6/6] Adding the http server to the messaging_service test To test the messaging service API it needs to be included in the messaging service test. To test it, start the server with --stay-alive true then you can use the API to get the messges information: http://localhost:10001/messaging_service/messages/exception will return the number of exception per connection http://localhost:10001/messaging_service/command/completed will return the number of completed command per connection. Note that because the two servers are running on the same machine, to prevent port conflict, the server port will be increment by one (i.e 10001) Signed-off-by: Amnon Heiman --- configure.py | 2 +- tests/urchin/message.cc | 32 ++++++++++++++++++++++++++++---- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/configure.py b/configure.py index 3d0ca41e28..52d92e5b3a 100755 --- a/configure.py +++ b/configure.py @@ -467,7 +467,7 @@ urchin_core = (['database.cc', + [Thrift('interface/cassandra.thrift', 'Cassandra')] + core + libnet) -urchin_tests_dependencies = urchin_core + [ +urchin_tests_dependencies = urchin_core + http + api + [ 'tests/urchin/cql_test_env.cc', 'tests/urchin/cql_assertions.cc', ] diff --git a/tests/urchin/message.cc b/tests/urchin/message.cc index 315d869d4a..210108c3e6 100644 --- a/tests/urchin/message.cc +++ b/tests/urchin/message.cc @@ -7,6 +7,8 @@ #include "gms/gossip_digest_ack2.hh" #include "gms/gossip_digest.hh" #include "core/sleep.hh" +#include "http/httpd.hh" +#include "api/api.hh" using namespace std::chrono_literals; using namespace net; @@ -169,18 +171,37 @@ int main(int ac, char ** av) { app.add_options() ("server", bpo::value(), "Server ip") ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen") + ("api-port", bpo::value()->default_value(10000), "Http Rest API port") + ("stay-alive", bpo::value()->default_value(false), "Do not kill the test server after the test") ("cpuid", bpo::value()->default_value(0), "Server cpuid"); - return app.run(ac, av, [&app] { + + distributed db; + api::http_context ctx(db); + + return app.run(ac, av, [&app, &ctx] { auto config = app.configuration(); + uint16_t api_port = config["api-port"].as(); + bool stay_alive = config["stay-alive"].as(); + if (config.count("server")) { + api_port++; + } const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); - net::get_messaging_service().start(listen).then([config] () { + net::get_messaging_service().start(listen).then([config, api_port, &ctx, stay_alive] () { auto testers = new distributed; testers->start().then([testers]{ auto& server = net::get_local_messaging_service(); auto port = server.port(); std::cout << "Messaging server listening on port " << port << " ...\n"; return testers->invoke_on_all(&tester::init_handler); - }).then([testers, config] { + }).then([api_port, &ctx] { + return ctx.http_server.start(); + }).then([api_port, &ctx] { + return set_server(ctx); + }).then([&ctx, api_port] { + return ctx.http_server.listen(api_port); + }).then([api_port] { + std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n"; + }).then([testers, config, stay_alive] { auto t = &testers->local(); if (!config.count("server")) { return; @@ -197,7 +218,10 @@ int main(int ac, char ** av) { return t->test_echo(); }).then([testers, t] { return t->test_exception(); - }).then([testers, t] { + }).then([testers, t, stay_alive] { + if (stay_alive) { + return; + } print("=============TEST DONE===========\n"); testers->stop().then([testers] { delete testers;