diff --git a/api/api-doc/messaging_service.json b/api/api-doc/messaging_service.json new file mode 100644 index 0000000000..789a49912b --- /dev/null +++ b/api/api-doc/messaging_service.json @@ -0,0 +1,216 @@ +{ + "apiVersion":"0.0.1", + "swaggerVersion":"1.2", + "basePath":"{{Protocol}}://{{Host}}", + "resourcePath":"/messaging_service", + "produces":[ + "application/json" + ], + "apis":[ + { + "path":"/messaging_service/totaltimeouts", + "operations":[ + { + "method":"GET", + "summary":"Total number of timeouts happened on this node", + "type":"long", + "nickname":"get_totaltimeouts", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/dropped", + "operations":[ + { + "method":"GET", + "summary":"Get the number of dropped messages per verb", + "type":"array", + "items":{ + "type":"verb_counter" + }, + "nickname":"get_dropped_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/replied", + "operations":[ + { + "method":"GET", + "summary":"Get the number of replied messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_completed_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/sent", + "operations":[ + { + "method":"GET", + "summary":"Get the number of sent messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_sent_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/pending", + "operations":[ + { + "method":"GET", + "summary":"Get the number of pending messages", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_pending_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/exception", + "operations":[ + { + "method":"GET", + "summary":"Get the number of messages return with an exception", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_exception_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + }, + { + "path":"/messaging_service/messages/respond_pending", + "operations":[ + { + "method":"GET", + "summary":"Get the number of messages waiting for respond", + "type":"array", + "items":{ + "type":"message_counter" + }, + "nickname":"get_respond_pending_messages", + "produces":[ + "application/json" + ], + "parameters":[ + + ] + } + ] + } + ], + "models":{ + "message_counter":{ + "id":"message_counter", + "description":"Holds command counters", + "properties":{ + "count":{ + "type":"long" + }, + "ip":{ + "type":"string" + } + } + }, + "verb_counter":{ + "id":"verb_counters", + "description":"Holds verb counters", + "properties":{ + "count":{ + "type":"long" + }, + "verb":{ + "type":"string", + "enum":[ + "MUTATION", + "BINARY", + "READ_REPAIR", + "READ", + "REQUEST_RESPONSE", + "STREAM_INITIATE", + "STREAM_INITIATE_DONE", + "STREAM_REPLY", + "STREAM_REQUEST", + "RANGE_SLICE", + "BOOTSTRAP_TOKEN", + "TREE_REQUEST", + "TREE_RESPONSE", + "JOIN", + "GOSSIP_DIGEST_SYN", + "GOSSIP_DIGEST_ACK", + "GOSSIP_DIGEST_ACK2", + "DEFINITIONS_ANNOUNCE", + "DEFINITIONS_UPDATE", + "TRUNCATE", + "SCHEMA_CHECK", + "INDEX_SCAN", + "REPLICATION_FINISHED", + "INTERNAL_RESPONSE", + "COUNTER_MUTATION", + "STREAMING_REPAIR_REQUEST", + "STREAMING_REPAIR_RESPONSE", + "SNAPSHOT", + "MIGRATION_REQUEST", + "GOSSIP_SHUTDOWN", + "_TRACE", + "ECHO", + "REPAIR_MESSAGE", + "PAXOS_PREPARE", + "PAXOS_PROPOSE", + "PAXOS_COMMIT", + "PAGED_RANGE", + "UNUSED_1", + "UNUSED_2", + "UNUSED_3" + ] + } + } + } + } +} diff --git a/api/api.cc b/api/api.cc index 27d7364872..55fc2934ec 100644 --- a/api/api.cc +++ b/api/api.cc @@ -9,6 +9,7 @@ #include "gossiper.hh" #include "failure_detector.hh" #include "column_family.hh" +#include "messaging_service.hh" namespace api { @@ -33,6 +34,10 @@ future<> set_server(http_context& ctx) { "The failure detector API"); set_failure_detector(ctx,r); + rb->register_function(r, "messaging_service", + "The messaging service API"); + set_messaging_service(ctx, r); + }); } diff --git a/api/api.hh b/api/api.hh index e620288438..3a2b5bfe1a 100644 --- a/api/api.hh +++ b/api/api.hh @@ -37,6 +37,16 @@ std::vector map_to_key_value(const std::map& map) { } return res; } + +template +T map_sum(T&& dest, const S& src) { + for (auto i : src) { + dest[i.first] += i.second; + } + return dest; +} + + } #endif /* API_API_HH_ */ diff --git a/api/messaging_service.cc b/api/messaging_service.cc new file mode 100644 index 0000000000..5bf78aaa36 --- /dev/null +++ b/api/messaging_service.cc @@ -0,0 +1,96 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include "messaging_service.hh" +#include "message/messaging_service.hh" +#include "api/api-doc/messaging_service.json.hh" +#include +#include + +using namespace httpd::messaging_service_json; +using namespace net; + +namespace api { +using client = rpc::protocol::client; +static const int32_t num_verb = static_cast(messaging_verb::UNUSED_3) + 1; + +std::vector map_to_message_counters( + const std::unordered_map& map) { + std::vector res; + for (auto i : map) { + res.push_back(message_counter()); + res.back().ip = boost::lexical_cast(i.first); + res.back().count = i.second; + } + return res; +} + +/** + * Return a function that performs a map_reduce on messaging_service + * For each instance it calls its foreach_client method set the value + * according to a function that it gets as a parameter. + * + */ +future_json_function get_client_getter(std::function f) { + return [f](std::unique_ptr req) { + using map_type = std::unordered_map; + auto get_shard_map = [f](messaging_service& ms) { + std::unordered_map map; + ms.foreach_client([&map, f] (const messaging_service::shard_id& id, + const messaging_service::shard_info& info) { + map[id.addr] = f(*info.rpc_client.get()); + }); + return map; + }; + return get_messaging_service().map_reduce0(get_shard_map, map_type(), map_sum). + then([](map_type&& map) { + return make_ready_future(map_to_message_counters(map)); + }); + }; +} + +void set_messaging_service(http_context& ctx, routes& r) { + + get_sent_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().sent_messages; + })); + + get_exception_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().exception_received; + })); + + get_pending_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().pending; + })); + + get_respond_pending_messages.set(r, get_client_getter([](const client& c) { + return c.get_stats().wait_reply; + })); + + get_dropped_messages.set(r, [](std::unique_ptr req) { + shared_ptr> map = make_shared>(num_verb, 0); + + return net::get_messaging_service().map_reduce([map](const uint64_t* local_map) mutable { + for (auto i = 0; i < num_verb; i++) { + (*map)[i]+= local_map[i]; + } + },[](messaging_service& ms) { + return make_ready_future(ms.get_dropped_messages()); + }).then([map]{ + std::vector res; + for (auto i : verb_counter::verb_wrapper::all_items()) { + verb_counter c; + messaging_verb v = i; // for type safety we use messaging_verb values + if ((*map)[static_cast(v)] > 0) { + c.count = (*map)[static_cast(v)]; + c.verb = i; + res.push_back(c); + } + } + return make_ready_future(res); + }); + }); +} +} + diff --git a/api/messaging_service.hh b/api/messaging_service.hh new file mode 100644 index 0000000000..73548cd945 --- /dev/null +++ b/api/messaging_service.hh @@ -0,0 +1,16 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#ifndef API_MESSAGING_SERVICE_HH_ +#define API_MESSAGING_SERVICE_HH_ + +#include "api.hh" + +namespace api { + +void set_messaging_service(http_context& ctx, routes& r); + +} + +#endif /* API_MESSAGING_SERVICE_HH_ */ diff --git a/configure.py b/configure.py index f00075aacc..52d92e5b3a 100755 --- a/configure.py +++ b/configure.py @@ -319,6 +319,8 @@ api = ['api/api.cc', 'api/failure_detector.cc', 'api/api-doc/column_family.json', 'api/column_family.cc', + 'api/messaging_service.cc', + 'api/api-doc/messaging_service.json', ] boost_test_lib = [ @@ -465,7 +467,7 @@ urchin_core = (['database.cc', + [Thrift('interface/cassandra.thrift', 'Cassandra')] + core + libnet) -urchin_tests_dependencies = urchin_core + [ +urchin_tests_dependencies = urchin_core + http + api + [ 'tests/urchin/cql_test_env.cc', 'tests/urchin/cql_assertions.cc', ] diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5f7c0fd752..8c8745cc7e 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -63,6 +63,7 @@ enum class messaging_verb : int32_t { UNUSED_1, UNUSED_2, UNUSED_3, + LAST, }; } // namespace net @@ -266,6 +267,27 @@ public: } std::unique_ptr::client> rpc_client; }; + + void foreach_client(std::function f) const { + for (auto i = _clients.cbegin(); i != _clients.cend(); i++) { + f(i->first, i->second); + } + } + + void increment_dropped_messages(messaging_verb verb) { + _dropped_messages[static_cast(verb)]++; + } + + uint64_t get_dropped_messages(messaging_verb verb) const { + return _dropped_messages[static_cast(verb)]; + } + + const uint64_t* get_dropped_messages() const { + return _dropped_messages; + } + + private: static constexpr uint16_t _default_port = 7000; gms::inet_address _listen_address; @@ -273,6 +295,7 @@ private: rpc::protocol _rpc; rpc::protocol::server _server; std::unordered_map _clients; + uint64_t _dropped_messages[static_cast(messaging_verb::LAST)] = {}; public: messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0")) : _listen_address(ip) @@ -306,9 +329,10 @@ public: auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) { auto& rpc_client = get_rpc_client(id); auto rpc_handler = _rpc.make_client(verb); - return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id] (auto&& f) { + return rpc_handler(rpc_client, std::forward(msg)...).then_wrapped([this, id, verb] (auto&& f) { try { if (f.failed()) { + this->increment_dropped_messages(verb); f.get(); assert(false); // never reached } diff --git a/tests/urchin/message.cc b/tests/urchin/message.cc index 315d869d4a..210108c3e6 100644 --- a/tests/urchin/message.cc +++ b/tests/urchin/message.cc @@ -7,6 +7,8 @@ #include "gms/gossip_digest_ack2.hh" #include "gms/gossip_digest.hh" #include "core/sleep.hh" +#include "http/httpd.hh" +#include "api/api.hh" using namespace std::chrono_literals; using namespace net; @@ -169,18 +171,37 @@ int main(int ac, char ** av) { app.add_options() ("server", bpo::value(), "Server ip") ("listen-address", bpo::value()->default_value("0.0.0.0"), "IP address to listen") + ("api-port", bpo::value()->default_value(10000), "Http Rest API port") + ("stay-alive", bpo::value()->default_value(false), "Do not kill the test server after the test") ("cpuid", bpo::value()->default_value(0), "Server cpuid"); - return app.run(ac, av, [&app] { + + distributed db; + api::http_context ctx(db); + + return app.run(ac, av, [&app, &ctx] { auto config = app.configuration(); + uint16_t api_port = config["api-port"].as(); + bool stay_alive = config["stay-alive"].as(); + if (config.count("server")) { + api_port++; + } const gms::inet_address listen = gms::inet_address(config["listen-address"].as()); - net::get_messaging_service().start(listen).then([config] () { + net::get_messaging_service().start(listen).then([config, api_port, &ctx, stay_alive] () { auto testers = new distributed; testers->start().then([testers]{ auto& server = net::get_local_messaging_service(); auto port = server.port(); std::cout << "Messaging server listening on port " << port << " ...\n"; return testers->invoke_on_all(&tester::init_handler); - }).then([testers, config] { + }).then([api_port, &ctx] { + return ctx.http_server.start(); + }).then([api_port, &ctx] { + return set_server(ctx); + }).then([&ctx, api_port] { + return ctx.http_server.listen(api_port); + }).then([api_port] { + std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n"; + }).then([testers, config, stay_alive] { auto t = &testers->local(); if (!config.count("server")) { return; @@ -197,7 +218,10 @@ int main(int ac, char ** av) { return t->test_echo(); }).then([testers, t] { return t->test_exception(); - }).then([testers, t] { + }).then([testers, t, stay_alive] { + if (stay_alive) { + return; + } print("=============TEST DONE===========\n"); testers->stop().then([testers] { delete testers;