API: Add read, write, and flush statistic to column_family

This adds the API implementation for the read, write, number of
panding flushes and memtable switch count.

The implementation uses a helper function to perform map and map_reduce
on column_family.

The get_uuid helper method now supports both colon notations (i.e.
either as a ":" or as %3A)

Signed-off-by: Amnon Heiman <amnon@cloudius-systems.com>
This commit is contained in:
Amnon Heiman
2015-07-26 09:20:56 +03:00
parent 8356b493a3
commit cea73277ca

View File

@@ -5,16 +5,95 @@
#include "column_family.hh"
#include "api/api-doc/column_family.json.hh"
#include <vector>
#include "http/exception.hh"
namespace api {
using namespace httpd;
using namespace std;
using namespace json;
namespace cf = httpd::column_family_json;
auto get_uuid(const sstring& name, const database& db) {
auto pos = name.find(':');
return db.find_uuid(name.substr(0, pos), name.substr(pos + 1));
auto pos = name.find("%3A");
size_t end;
if (pos == sstring::npos) {
pos = name.find(":");
if (pos == sstring::npos) {
throw bad_param_exception("Column family name should be in keyspace::column_family format");
}
end = pos + 1;
} else {
end = pos + 3;
}
try {
return db.find_uuid(name.substr(0, pos), name.substr(end));
} catch (std::out_of_range& e) {
throw bad_param_exception("Column family '" + name.substr(0, pos) + ":"
+ name.substr(end) + "' not found");
}
}
future<> foreach_column_family(http_context& ctx, const sstring& name, function<void(column_family&)> f) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.invoke_on_all([f, uuid](database& db) {
f(db.find_column_family(uuid));
});
}
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([mapper, uuid](database& db) {
return mapper(db.find_column_family(uuid));
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, I init,
Mapper mapper, Reducer reducer) {
return ctx.db.map_reduce0([mapper, init, reducer](database& db) {
auto res = init;
for (auto i : db.get_column_families()) {
res = reducer(res, mapper(*i.second.get()));
}
return res;
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
static future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& name,
int64_t column_family::stats::*f) {
return map_reduce_cf(ctx, name, 0, [f](const column_family& cf) {
return cf.get_stats().*f;
}, std::plus<int64_t>());
}
static future<json::json_return_type> get_cf_stats(http_context& ctx,
int64_t column_family::stats::*f) {
return map_reduce_cf(ctx, 0, [f](const column_family& cf) {
return cf.get_stats().*f;
}, std::plus<int64_t>());
}
static future<json::json_return_type> map_cf_stats(http_context& ctx,
int64_t column_family::stats::*f) {
return ctx.db.map([f](const database& db) {
int64_t res = 0;
for (auto i : db.get_column_families()) {
res += i.second.get()->get_stats().*f;
}
return res;
}).then([](const std::vector<int64_t>& res) {
return make_ready_future<json::json_return_type>(res);
});
}
void set_column_family(http_context& ctx, routes& r) {
cf::get_column_family_name.set(r, [&ctx] (const_req req){
vector<sstring> res;
@@ -45,14 +124,15 @@ void set_column_family(http_context& ctx, routes& r) {
});
cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
return map_reduce_cf(ctx, req->param["name"], 0, [](column_family& cf) {
return cf.active_memtable().all_partitions().size();
}, std::plus<int>());
});
cf::get_all_memtable_columns_count.set(r, [] (std::unique_ptr<request> req) {
//TBD
return make_ready_future<json::json_return_type>(0);
cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](column_family& cf) {
return cf.active_memtable().all_partitions().size();
}, std::plus<int>());
});
cf::get_memtable_on_heap_size.set(r, [] (std::unique_ptr<request> req) {
@@ -121,15 +201,12 @@ void set_column_family(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(0);
});
cf::get_memtable_switch_count.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
cf::get_memtable_switch_count.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::memtable_switch_count);
});
cf::get_all_memtable_switch_count.set(r, [] (std::unique_ptr<request> req) {
//TBD
return make_ready_future<json::json_return_type>(0);
cf::get_all_memtable_switch_count.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx, &column_family::stats::memtable_switch_count);
});
cf::get_estimated_row_size_histogram.set(r, [] (std::unique_ptr<request> req) {
@@ -157,49 +234,28 @@ void set_column_family(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(0);
});
cf::get_read_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
cf::get_pending_flushes.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::pending_flushes);
});
cf::get_all_read_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
cf::get_all_pending_flushes.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx, &column_family::stats::pending_flushes);
});
cf::get_range_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
cf::get_read.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::reads);
});
cf::get_all_range_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
return make_ready_future<json::json_return_type>(0);
cf::get_all_read.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_cf_stats(ctx, &column_family::stats::reads);
});
cf::get_write_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
cf::get_write.set(r, [&ctx] (std::unique_ptr<request> req) {
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::writes);
});
cf::get_all_write_latency.set(r, [] (std::unique_ptr<request> req) {
//TBD
return make_ready_future<json::json_return_type>(0);
});
cf::get_pending_flushes.set(r, [] (std::unique_ptr<request> req) {
//TBD
//auto id = get_uuid(req->param["name"], ctx.db.local());
return make_ready_future<json::json_return_type>(0);
});
cf::get_all_pending_flushes.set(r, [] (std::unique_ptr<request> req) {
//TBD
return make_ready_future<json::json_return_type>(0);
cf::get_all_write.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_cf_stats(ctx, &column_family::stats::writes);
});
cf::get_pending_compactions.set(r, [] (std::unique_ptr<request> req) {
@@ -474,6 +530,5 @@ void set_column_family(http_context& ctx, routes& r) {
std::vector<double> res;
return make_ready_future<json::json_return_type>(res);
});
}
}