diff --git a/api/column_family.cc b/api/column_family.cc index 0354675480..f416d50142 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -5,16 +5,95 @@ #include "column_family.hh" #include "api/api-doc/column_family.json.hh" #include +#include "http/exception.hh" namespace api { +using namespace httpd; using namespace std; +using namespace json; namespace cf = httpd::column_family_json; auto get_uuid(const sstring& name, const database& db) { - auto pos = name.find(':'); - return db.find_uuid(name.substr(0, pos), name.substr(pos + 1)); + auto pos = name.find("%3A"); + size_t end; + if (pos == sstring::npos) { + pos = name.find(":"); + if (pos == sstring::npos) { + throw bad_param_exception("Column family name should be in keyspace::column_family format"); + } + end = pos + 1; + } else { + end = pos + 3; + } + try { + return db.find_uuid(name.substr(0, pos), name.substr(end)); + } catch (std::out_of_range& e) { + throw bad_param_exception("Column family '" + name.substr(0, pos) + ":" + + name.substr(end) + "' not found"); + } } + +future<> foreach_column_family(http_context& ctx, const sstring& name, function f) { + auto uuid = get_uuid(name, ctx.db.local()); + + return ctx.db.invoke_on_all([f, uuid](database& db) { + f(db.find_column_family(uuid)); + }); +} + +template +future map_reduce_cf(http_context& ctx, const sstring& name, I init, + Mapper mapper, Reducer reducer) { + auto uuid = get_uuid(name, ctx.db.local()); + return ctx.db.map_reduce0([mapper, uuid](database& db) { + return mapper(db.find_column_family(uuid)); + }, init, reducer).then([](const I& res) { + return make_ready_future(res); + }); +} + +template +future map_reduce_cf(http_context& ctx, I init, + Mapper mapper, Reducer reducer) { + return ctx.db.map_reduce0([mapper, init, reducer](database& db) { + auto res = init; + for (auto i : db.get_column_families()) { + res = reducer(res, mapper(*i.second.get())); + } + return res; + }, init, reducer).then([](const I& res) { + return make_ready_future(res); + }); +} + +static future get_cf_stats(http_context& ctx, const sstring& name, + int64_t column_family::stats::*f) { + return map_reduce_cf(ctx, name, 0, [f](const column_family& cf) { + return cf.get_stats().*f; + }, std::plus()); +} + +static future get_cf_stats(http_context& ctx, + int64_t column_family::stats::*f) { + return map_reduce_cf(ctx, 0, [f](const column_family& cf) { + return cf.get_stats().*f; + }, std::plus()); +} + +static future map_cf_stats(http_context& ctx, + int64_t column_family::stats::*f) { + return ctx.db.map([f](const database& db) { + int64_t res = 0; + for (auto i : db.get_column_families()) { + res += i.second.get()->get_stats().*f; + } + return res; + }).then([](const std::vector& res) { + return make_ready_future(res); + }); +} + void set_column_family(http_context& ctx, routes& r) { cf::get_column_family_name.set(r, [&ctx] (const_req req){ vector res; @@ -45,14 +124,15 @@ void set_column_family(http_context& ctx, routes& r) { }); cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + return map_reduce_cf(ctx, req->param["name"], 0, [](column_family& cf) { + return cf.active_memtable().all_partitions().size(); + }, std::plus()); }); - cf::get_all_memtable_columns_count.set(r, [] (std::unique_ptr req) { - //TBD - return make_ready_future(0); + cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr req) { + return map_reduce_cf(ctx, 0, [](column_family& cf) { + return cf.active_memtable().all_partitions().size(); + }, std::plus()); }); cf::get_memtable_on_heap_size.set(r, [] (std::unique_ptr req) { @@ -121,15 +201,12 @@ void set_column_family(http_context& ctx, routes& r) { return make_ready_future(0); }); - cf::get_memtable_switch_count.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + cf::get_memtable_switch_count.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::memtable_switch_count); }); - cf::get_all_memtable_switch_count.set(r, [] (std::unique_ptr req) { - //TBD - return make_ready_future(0); + cf::get_all_memtable_switch_count.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx, &column_family::stats::memtable_switch_count); }); cf::get_estimated_row_size_histogram.set(r, [] (std::unique_ptr req) { @@ -157,49 +234,28 @@ void set_column_family(http_context& ctx, routes& r) { return make_ready_future(0); }); - cf::get_read_latency.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + cf::get_pending_flushes.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::pending_flushes); }); - cf::get_all_read_latency.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + cf::get_all_pending_flushes.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx, &column_family::stats::pending_flushes); }); - cf::get_range_latency.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + cf::get_read.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::reads); }); - cf::get_all_range_latency.set(r, [] (std::unique_ptr req) { - //TBD - return make_ready_future(0); + cf::get_all_read.set(r, [&ctx] (std::unique_ptr req) { + return map_cf_stats(ctx, &column_family::stats::reads); }); - cf::get_write_latency.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); + cf::get_write.set(r, [&ctx] (std::unique_ptr req) { + return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::writes); }); - cf::get_all_write_latency.set(r, [] (std::unique_ptr req) { - //TBD - return make_ready_future(0); - }); - - cf::get_pending_flushes.set(r, [] (std::unique_ptr req) { - //TBD - //auto id = get_uuid(req->param["name"], ctx.db.local()); - return make_ready_future(0); - }); - - cf::get_all_pending_flushes.set(r, [] (std::unique_ptr req) { - //TBD - return make_ready_future(0); + cf::get_all_write.set(r, [&ctx] (std::unique_ptr req) { + return map_cf_stats(ctx, &column_family::stats::writes); }); cf::get_pending_compactions.set(r, [] (std::unique_ptr req) { @@ -474,6 +530,5 @@ void set_column_family(http_context& ctx, routes& r) { std::vector res; return make_ready_future(res); }); - } }