mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 01:20:39 +00:00
Merge "Adding stats to the column family" from Amnon
This commit is contained in:
@@ -915,6 +915,49 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/read/{name}",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get number of reads",
|
||||
"type":"long",
|
||||
"nickname":"get_read",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"name",
|
||||
"description":"The column family name in keysspace:name format",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/read/",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get number of reads from all column family, per shard",
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"long"
|
||||
},
|
||||
"nickname":"get_all_read",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/read_latency",
|
||||
"operations":[
|
||||
@@ -995,6 +1038,49 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/write/{name}",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get number of writes",
|
||||
"type":"long",
|
||||
"nickname":"get_write",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"name",
|
||||
"description":"The column family name in keysspace:name format",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/write/",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Get number of writes from all column family, per shard",
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"long"
|
||||
},
|
||||
"nickname":"get_all_write",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/column_family/metrics/write_latency",
|
||||
"operations":[
|
||||
|
||||
@@ -5,16 +5,95 @@
|
||||
#include "column_family.hh"
|
||||
#include "api/api-doc/column_family.json.hh"
|
||||
#include <vector>
|
||||
#include "http/exception.hh"
|
||||
|
||||
namespace api {
|
||||
using namespace httpd;
|
||||
|
||||
using namespace std;
|
||||
using namespace json;
|
||||
namespace cf = httpd::column_family_json;
|
||||
|
||||
auto get_uuid(const sstring& name, const database& db) {
|
||||
auto pos = name.find(':');
|
||||
return db.find_uuid(name.substr(0, pos), name.substr(pos + 1));
|
||||
auto pos = name.find("%3A");
|
||||
size_t end;
|
||||
if (pos == sstring::npos) {
|
||||
pos = name.find(":");
|
||||
if (pos == sstring::npos) {
|
||||
throw bad_param_exception("Column family name should be in keyspace::column_family format");
|
||||
}
|
||||
end = pos + 1;
|
||||
} else {
|
||||
end = pos + 3;
|
||||
}
|
||||
try {
|
||||
return db.find_uuid(name.substr(0, pos), name.substr(end));
|
||||
} catch (std::out_of_range& e) {
|
||||
throw bad_param_exception("Column family '" + name.substr(0, pos) + ":"
|
||||
+ name.substr(end) + "' not found");
|
||||
}
|
||||
}
|
||||
|
||||
future<> foreach_column_family(http_context& ctx, const sstring& name, function<void(column_family&)> f) {
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
|
||||
return ctx.db.invoke_on_all([f, uuid](database& db) {
|
||||
f(db.find_column_family(uuid));
|
||||
});
|
||||
}
|
||||
|
||||
template<class Mapper, class I, class Reducer>
|
||||
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
|
||||
Mapper mapper, Reducer reducer) {
|
||||
auto uuid = get_uuid(name, ctx.db.local());
|
||||
return ctx.db.map_reduce0([mapper, uuid](database& db) {
|
||||
return mapper(db.find_column_family(uuid));
|
||||
}, init, reducer).then([](const I& res) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
}
|
||||
|
||||
template<class Mapper, class I, class Reducer>
|
||||
future<json::json_return_type> map_reduce_cf(http_context& ctx, I init,
|
||||
Mapper mapper, Reducer reducer) {
|
||||
return ctx.db.map_reduce0([mapper, init, reducer](database& db) {
|
||||
auto res = init;
|
||||
for (auto i : db.get_column_families()) {
|
||||
res = reducer(res, mapper(*i.second.get()));
|
||||
}
|
||||
return res;
|
||||
}, init, reducer).then([](const I& res) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
}
|
||||
|
||||
static future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& name,
|
||||
int64_t column_family::stats::*f) {
|
||||
return map_reduce_cf(ctx, name, 0, [f](const column_family& cf) {
|
||||
return cf.get_stats().*f;
|
||||
}, std::plus<int64_t>());
|
||||
}
|
||||
|
||||
static future<json::json_return_type> get_cf_stats(http_context& ctx,
|
||||
int64_t column_family::stats::*f) {
|
||||
return map_reduce_cf(ctx, 0, [f](const column_family& cf) {
|
||||
return cf.get_stats().*f;
|
||||
}, std::plus<int64_t>());
|
||||
}
|
||||
|
||||
static future<json::json_return_type> map_cf_stats(http_context& ctx,
|
||||
int64_t column_family::stats::*f) {
|
||||
return ctx.db.map([f](const database& db) {
|
||||
int64_t res = 0;
|
||||
for (auto i : db.get_column_families()) {
|
||||
res += i.second.get()->get_stats().*f;
|
||||
}
|
||||
return res;
|
||||
}).then([](const std::vector<int64_t>& res) {
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
}
|
||||
|
||||
void set_column_family(http_context& ctx, routes& r) {
|
||||
cf::get_column_family_name.set(r, [&ctx] (const_req req){
|
||||
vector<sstring> res;
|
||||
@@ -45,14 +124,15 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
});
|
||||
|
||||
cf::get_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
return map_reduce_cf(ctx, req->param["name"], 0, [](column_family& cf) {
|
||||
return cf.active_memtable().all_partitions().size();
|
||||
}, std::plus<int>());
|
||||
});
|
||||
|
||||
cf::get_all_memtable_columns_count.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_all_memtable_columns_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_reduce_cf(ctx, 0, [](column_family& cf) {
|
||||
return cf.active_memtable().all_partitions().size();
|
||||
}, std::plus<int>());
|
||||
});
|
||||
|
||||
cf::get_memtable_on_heap_size.set(r, [] (std::unique_ptr<request> req) {
|
||||
@@ -121,15 +201,12 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
});
|
||||
|
||||
cf::get_memtable_switch_count.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_memtable_switch_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::memtable_switch_count);
|
||||
});
|
||||
|
||||
cf::get_all_memtable_switch_count.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_all_memtable_switch_count.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx, &column_family::stats::memtable_switch_count);
|
||||
});
|
||||
|
||||
cf::get_estimated_row_size_histogram.set(r, [] (std::unique_ptr<request> req) {
|
||||
@@ -157,49 +234,28 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
});
|
||||
|
||||
cf::get_read_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_pending_flushes.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::pending_flushes);
|
||||
});
|
||||
|
||||
cf::get_all_read_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_all_pending_flushes.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx, &column_family::stats::pending_flushes);
|
||||
});
|
||||
|
||||
cf::get_range_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_read.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::reads);
|
||||
});
|
||||
|
||||
cf::get_all_range_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_all_read.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_cf_stats(ctx, &column_family::stats::reads);
|
||||
});
|
||||
|
||||
cf::get_write_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_write.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return get_cf_stats(ctx,req->param["name"] ,&column_family::stats::writes);
|
||||
});
|
||||
|
||||
cf::get_all_write_latency.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
});
|
||||
|
||||
cf::get_pending_flushes.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
//auto id = get_uuid(req->param["name"], ctx.db.local());
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
});
|
||||
|
||||
cf::get_all_pending_flushes.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
return make_ready_future<json::json_return_type>(0);
|
||||
cf::get_all_write.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
return map_cf_stats(ctx, &column_family::stats::writes);
|
||||
});
|
||||
|
||||
cf::get_pending_compactions.set(r, [] (std::unique_ptr<request> req) {
|
||||
@@ -474,6 +530,5 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
std::vector<double> res;
|
||||
return make_ready_future<json::json_return_type>(res);
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
27
database.hh
27
database.hh
@@ -88,9 +88,19 @@ public:
|
||||
bool enable_commitlog = true;
|
||||
};
|
||||
struct no_commitlog {};
|
||||
struct stats {
|
||||
/** Number of times flush has resulted in the memtable being switched out. */
|
||||
int64_t memtable_switch_count = 0;
|
||||
/** Estimated number of tasks pending for this column family */
|
||||
int64_t pending_flushes = 0;
|
||||
int64_t reads = 0;
|
||||
int64_t writes = 0;
|
||||
};
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
config _config;
|
||||
stats _stats;
|
||||
lw_shared_ptr<memtable_list> _memtables;
|
||||
// generation -> sstable. Ordered by key so we can easily get the most recent.
|
||||
lw_shared_ptr<sstable_list> _sstables;
|
||||
@@ -107,7 +117,6 @@ private:
|
||||
private:
|
||||
void add_sstable(sstables::sstable&& sstable);
|
||||
void add_memtable();
|
||||
memtable& active_memtable() { return *_memtables->back(); }
|
||||
future<> update_cache(memtable&);
|
||||
struct merge_comparator;
|
||||
private:
|
||||
@@ -132,6 +141,7 @@ public:
|
||||
// FIXME: in case a query is satisfied from a single memtable, avoid a copy
|
||||
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
|
||||
using const_row_ptr = std::unique_ptr<const row>;
|
||||
memtable& active_memtable() { return *_memtables->back(); }
|
||||
public:
|
||||
column_family(schema_ptr schema, config cfg, db::commitlog& cl);
|
||||
column_family(schema_ptr schema, config cfg, no_commitlog);
|
||||
@@ -156,7 +166,14 @@ public:
|
||||
future<> flush() {
|
||||
// FIXME: this will synchronously wait for this write to finish, but doesn't guarantee
|
||||
// anything about previous writes.
|
||||
return seal_active_memtable();
|
||||
_stats.pending_flushes++;
|
||||
return seal_active_memtable().finally([this]() mutable {
|
||||
_stats.pending_flushes--;
|
||||
// In origin memtable_switch_count is incremented inside
|
||||
// ColumnFamilyMeetrics Flush.run
|
||||
_stats.memtable_switch_count++;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
// FIXME: this is just an example, should be changed to something more
|
||||
// general. compact_all_sstables() starts a compaction of all sstables.
|
||||
@@ -172,6 +189,10 @@ public:
|
||||
void start_compaction();
|
||||
void trigger_compaction();
|
||||
void set_compaction_strategy(sstables::compaction_strategy_type strategy);
|
||||
const stats& get_stats() const {
|
||||
return _stats;
|
||||
}
|
||||
|
||||
private:
|
||||
// One does not need to wait on this future if all we are interested in, is
|
||||
// initiating the write. The writes initiated here will eventually
|
||||
@@ -440,6 +461,7 @@ inline
|
||||
void
|
||||
column_family::apply(const mutation& m, const db::replay_position& rp) {
|
||||
active_memtable().apply(m, rp);
|
||||
_stats.writes++;
|
||||
seal_on_overflow();
|
||||
}
|
||||
|
||||
@@ -466,6 +488,7 @@ void
|
||||
column_family::apply(const frozen_mutation& m, const db::replay_position& rp) {
|
||||
check_valid_rp(rp);
|
||||
active_memtable().apply(m, rp);
|
||||
_stats.writes++;
|
||||
seal_on_overflow();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user