Merge "messaging service API" from Amnon

"This patch series adds counters to the rpc clients then it expose them in the
messaging service with a rest API, it was tested with the messaging_service
test app that was modified to run the API."
This commit is contained in:
Avi Kivity
2015-06-16 15:23:30 +03:00
8 changed files with 399 additions and 6 deletions

View File

@@ -0,0 +1,216 @@
{
"apiVersion":"0.0.1",
"swaggerVersion":"1.2",
"basePath":"{{Protocol}}://{{Host}}",
"resourcePath":"/messaging_service",
"produces":[
"application/json"
],
"apis":[
{
"path":"/messaging_service/totaltimeouts",
"operations":[
{
"method":"GET",
"summary":"Total number of timeouts happened on this node",
"type":"long",
"nickname":"get_totaltimeouts",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/dropped",
"operations":[
{
"method":"GET",
"summary":"Get the number of dropped messages per verb",
"type":"array",
"items":{
"type":"verb_counter"
},
"nickname":"get_dropped_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/replied",
"operations":[
{
"method":"GET",
"summary":"Get the number of replied messages",
"type":"array",
"items":{
"type":"message_counter"
},
"nickname":"get_completed_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/sent",
"operations":[
{
"method":"GET",
"summary":"Get the number of sent messages",
"type":"array",
"items":{
"type":"message_counter"
},
"nickname":"get_sent_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/pending",
"operations":[
{
"method":"GET",
"summary":"Get the number of pending messages",
"type":"array",
"items":{
"type":"message_counter"
},
"nickname":"get_pending_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/exception",
"operations":[
{
"method":"GET",
"summary":"Get the number of messages return with an exception",
"type":"array",
"items":{
"type":"message_counter"
},
"nickname":"get_exception_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/messaging_service/messages/respond_pending",
"operations":[
{
"method":"GET",
"summary":"Get the number of messages waiting for respond",
"type":"array",
"items":{
"type":"message_counter"
},
"nickname":"get_respond_pending_messages",
"produces":[
"application/json"
],
"parameters":[
]
}
]
}
],
"models":{
"message_counter":{
"id":"message_counter",
"description":"Holds command counters",
"properties":{
"count":{
"type":"long"
},
"ip":{
"type":"string"
}
}
},
"verb_counter":{
"id":"verb_counters",
"description":"Holds verb counters",
"properties":{
"count":{
"type":"long"
},
"verb":{
"type":"string",
"enum":[
"MUTATION",
"BINARY",
"READ_REPAIR",
"READ",
"REQUEST_RESPONSE",
"STREAM_INITIATE",
"STREAM_INITIATE_DONE",
"STREAM_REPLY",
"STREAM_REQUEST",
"RANGE_SLICE",
"BOOTSTRAP_TOKEN",
"TREE_REQUEST",
"TREE_RESPONSE",
"JOIN",
"GOSSIP_DIGEST_SYN",
"GOSSIP_DIGEST_ACK",
"GOSSIP_DIGEST_ACK2",
"DEFINITIONS_ANNOUNCE",
"DEFINITIONS_UPDATE",
"TRUNCATE",
"SCHEMA_CHECK",
"INDEX_SCAN",
"REPLICATION_FINISHED",
"INTERNAL_RESPONSE",
"COUNTER_MUTATION",
"STREAMING_REPAIR_REQUEST",
"STREAMING_REPAIR_RESPONSE",
"SNAPSHOT",
"MIGRATION_REQUEST",
"GOSSIP_SHUTDOWN",
"_TRACE",
"ECHO",
"REPAIR_MESSAGE",
"PAXOS_PREPARE",
"PAXOS_PROPOSE",
"PAXOS_COMMIT",
"PAGED_RANGE",
"UNUSED_1",
"UNUSED_2",
"UNUSED_3"
]
}
}
}
}
}

View File

@@ -9,6 +9,7 @@
#include "gossiper.hh"
#include "failure_detector.hh"
#include "column_family.hh"
#include "messaging_service.hh"
namespace api {
@@ -33,6 +34,10 @@ future<> set_server(http_context& ctx) {
"The failure detector API");
set_failure_detector(ctx,r);
rb->register_function(r, "messaging_service",
"The messaging service API");
set_messaging_service(ctx, r);
});
}

View File

@@ -37,6 +37,16 @@ std::vector<T> map_to_key_value(const std::map<sstring, sstring>& map) {
}
return res;
}
template <typename T, typename S = T>
T map_sum(T&& dest, const S& src) {
for (auto i : src) {
dest[i.first] += i.second;
}
return dest;
}
}
#endif /* API_API_HH_ */

96
api/messaging_service.cc Normal file
View File

@@ -0,0 +1,96 @@
/*
* Copyright 2015 Cloudius Systems
*/
#include "messaging_service.hh"
#include "message/messaging_service.hh"
#include "api/api-doc/messaging_service.json.hh"
#include <iostream>
#include <sstream>
using namespace httpd::messaging_service_json;
using namespace net;
namespace api {
using client = rpc::protocol<serializer, messaging_verb>::client;
static const int32_t num_verb = static_cast<int32_t>(messaging_verb::UNUSED_3) + 1;
std::vector<message_counter> map_to_message_counters(
const std::unordered_map<gms::inet_address, unsigned long>& map) {
std::vector<message_counter> res;
for (auto i : map) {
res.push_back(message_counter());
res.back().ip = boost::lexical_cast<sstring>(i.first);
res.back().count = i.second;
}
return res;
}
/**
* Return a function that performs a map_reduce on messaging_service
* For each instance it calls its foreach_client method set the value
* according to a function that it gets as a parameter.
*
*/
future_json_function get_client_getter(std::function<uint64_t(const client&)> f) {
return [f](std::unique_ptr<request> req) {
using map_type = std::unordered_map<gms::inet_address, uint64_t>;
auto get_shard_map = [f](messaging_service& ms) {
std::unordered_map<gms::inet_address, unsigned long> map;
ms.foreach_client([&map, f] (const messaging_service::shard_id& id,
const messaging_service::shard_info& info) {
map[id.addr] = f(*info.rpc_client.get());
});
return map;
};
return get_messaging_service().map_reduce0(get_shard_map, map_type(), map_sum<map_type>).
then([](map_type&& map) {
return make_ready_future<json::json_return_type>(map_to_message_counters(map));
});
};
}
void set_messaging_service(http_context& ctx, routes& r) {
get_sent_messages.set(r, get_client_getter([](const client& c) {
return c.get_stats().sent_messages;
}));
get_exception_messages.set(r, get_client_getter([](const client& c) {
return c.get_stats().exception_received;
}));
get_pending_messages.set(r, get_client_getter([](const client& c) {
return c.get_stats().pending;
}));
get_respond_pending_messages.set(r, get_client_getter([](const client& c) {
return c.get_stats().wait_reply;
}));
get_dropped_messages.set(r, [](std::unique_ptr<request> req) {
shared_ptr<std::vector<uint64_t>> map = make_shared<std::vector<uint64_t>>(num_verb, 0);
return net::get_messaging_service().map_reduce([map](const uint64_t* local_map) mutable {
for (auto i = 0; i < num_verb; i++) {
(*map)[i]+= local_map[i];
}
},[](messaging_service& ms) {
return make_ready_future<const uint64_t*>(ms.get_dropped_messages());
}).then([map]{
std::vector<verb_counter> res;
for (auto i : verb_counter::verb_wrapper::all_items()) {
verb_counter c;
messaging_verb v = i; // for type safety we use messaging_verb values
if ((*map)[static_cast<int32_t>(v)] > 0) {
c.count = (*map)[static_cast<int32_t>(v)];
c.verb = i;
res.push_back(c);
}
}
return make_ready_future<json::json_return_type>(res);
});
});
}
}

16
api/messaging_service.hh Normal file
View File

@@ -0,0 +1,16 @@
/*
* Copyright 2015 Cloudius Systems
*/
#ifndef API_MESSAGING_SERVICE_HH_
#define API_MESSAGING_SERVICE_HH_
#include "api.hh"
namespace api {
void set_messaging_service(http_context& ctx, routes& r);
}
#endif /* API_MESSAGING_SERVICE_HH_ */

View File

@@ -319,6 +319,8 @@ api = ['api/api.cc',
'api/failure_detector.cc',
'api/api-doc/column_family.json',
'api/column_family.cc',
'api/messaging_service.cc',
'api/api-doc/messaging_service.json',
]
boost_test_lib = [
@@ -465,7 +467,7 @@ urchin_core = (['database.cc',
+ [Thrift('interface/cassandra.thrift', 'Cassandra')]
+ core + libnet)
urchin_tests_dependencies = urchin_core + [
urchin_tests_dependencies = urchin_core + http + api + [
'tests/urchin/cql_test_env.cc',
'tests/urchin/cql_assertions.cc',
]

View File

@@ -63,6 +63,7 @@ enum class messaging_verb : int32_t {
UNUSED_1,
UNUSED_2,
UNUSED_3,
LAST,
};
} // namespace net
@@ -266,6 +267,27 @@ public:
}
std::unique_ptr<rpc::protocol<serializer, messaging_verb>::client> rpc_client;
};
void foreach_client(std::function<void(const messaging_service::shard_id& id,
const messaging_service::shard_info& info)> f) const {
for (auto i = _clients.cbegin(); i != _clients.cend(); i++) {
f(i->first, i->second);
}
}
void increment_dropped_messages(messaging_verb verb) {
_dropped_messages[static_cast<int32_t>(verb)]++;
}
uint64_t get_dropped_messages(messaging_verb verb) const {
return _dropped_messages[static_cast<int32_t>(verb)];
}
const uint64_t* get_dropped_messages() const {
return _dropped_messages;
}
private:
static constexpr uint16_t _default_port = 7000;
gms::inet_address _listen_address;
@@ -273,6 +295,7 @@ private:
rpc::protocol<serializer, messaging_verb> _rpc;
rpc::protocol<serializer, messaging_verb>::server _server;
std::unordered_map<shard_id, shard_info, shard_id::hash> _clients;
uint64_t _dropped_messages[static_cast<int32_t>(messaging_verb::LAST)] = {};
public:
messaging_service(gms::inet_address ip = gms::inet_address("0.0.0.0"))
: _listen_address(ip)
@@ -306,9 +329,10 @@ public:
auto send_message(messaging_verb verb, shard_id id, MsgOut&&... msg) {
auto& rpc_client = get_rpc_client(id);
auto rpc_handler = _rpc.make_client<MsgIn(MsgOut...)>(verb);
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id] (auto&& f) {
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([this, id, verb] (auto&& f) {
try {
if (f.failed()) {
this->increment_dropped_messages(verb);
f.get();
assert(false); // never reached
}

View File

@@ -7,6 +7,8 @@
#include "gms/gossip_digest_ack2.hh"
#include "gms/gossip_digest.hh"
#include "core/sleep.hh"
#include "http/httpd.hh"
#include "api/api.hh"
using namespace std::chrono_literals;
using namespace net;
@@ -169,18 +171,37 @@ int main(int ac, char ** av) {
app.add_options()
("server", bpo::value<std::string>(), "Server ip")
("listen-address", bpo::value<std::string>()->default_value("0.0.0.0"), "IP address to listen")
("api-port", bpo::value<uint16_t>()->default_value(10000), "Http Rest API port")
("stay-alive", bpo::value<bool>()->default_value(false), "Do not kill the test server after the test")
("cpuid", bpo::value<uint32_t>()->default_value(0), "Server cpuid");
return app.run(ac, av, [&app] {
distributed<database> db;
api::http_context ctx(db);
return app.run(ac, av, [&app, &ctx] {
auto config = app.configuration();
uint16_t api_port = config["api-port"].as<uint16_t>();
bool stay_alive = config["stay-alive"].as<bool>();
if (config.count("server")) {
api_port++;
}
const gms::inet_address listen = gms::inet_address(config["listen-address"].as<std::string>());
net::get_messaging_service().start(listen).then([config] () {
net::get_messaging_service().start(listen).then([config, api_port, &ctx, stay_alive] () {
auto testers = new distributed<tester>;
testers->start().then([testers]{
auto& server = net::get_local_messaging_service();
auto port = server.port();
std::cout << "Messaging server listening on port " << port << " ...\n";
return testers->invoke_on_all(&tester::init_handler);
}).then([testers, config] {
}).then([api_port, &ctx] {
return ctx.http_server.start();
}).then([api_port, &ctx] {
return set_server(ctx);
}).then([&ctx, api_port] {
return ctx.http_server.listen(api_port);
}).then([api_port] {
std::cout << "Seastar HTTP server listening on port " << api_port << " ...\n";
}).then([testers, config, stay_alive] {
auto t = &testers->local();
if (!config.count("server")) {
return;
@@ -197,7 +218,10 @@ int main(int ac, char ** av) {
return t->test_echo();
}).then([testers, t] {
return t->test_exception();
}).then([testers, t] {
}).then([testers, t, stay_alive] {
if (stay_alive) {
return;
}
print("=============TEST DONE===========\n");
testers->stop().then([testers] {
delete testers;