This commit is contained in:
Avi Kivity
2015-08-24 10:58:39 +03:00
8 changed files with 136 additions and 77 deletions

View File

@@ -158,4 +158,37 @@ inline int64_t min_int64(int64_t a, int64_t b) {
inline int64_t max_int64(int64_t a, int64_t b) {
return std::max(a,b);
}
/**
* A helper struct for ratio calculation
* It combine total and the sub set for the ratio and its
* to_json method return the ration sub/total
*/
struct ratio_holder : public json::jsonable {
double total = 0;
double sub = 0;
virtual std::string to_json() const {
if (total == 0) {
return "0";
}
return std::to_string(sub/total);
}
ratio_holder() = default;
ratio_holder& add(double _total, double _sub) {
total += _total;
sub += _sub;
return *this;
}
ratio_holder(double _total, double _sub) {
total = _total;
sub = _sub;
}
ratio_holder& operator+=(const ratio_holder& a) {
return add(a.total, a.sub);
}
friend ratio_holder operator+(ratio_holder a, const ratio_holder& b) {
return a += b;
}
};
}

View File

@@ -4,6 +4,7 @@
#include "cache_service.hh"
#include "api/api-doc/cache_service.json.hh"
#include "column_family.hh"
namespace api {
using namespace json;
@@ -139,34 +140,43 @@ void set_cache_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(0);
});
cs::get_row_capacity.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_capacity.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return cf.get_row_cache().get_cache_tracker().region().occupancy().used_space();
}, std::plus<uint64_t>());
});
cs::get_row_hits.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_hits.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return cf.get_row_cache().stats().hits;
}, std::plus<int64_t>());
});
cs::get_row_requests.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_requests.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return cf.get_row_cache().stats().hits + cf.get_row_cache().stats().misses;
}, std::plus<int64_t>());
});
cs::get_row_hit_rate.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_hit_rate.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, ratio_holder(), [](const column_family& cf) {
return ratio_holder(cf.get_row_cache().stats().hits + cf.get_row_cache().stats().misses,
cf.get_row_cache().stats().hits);
}, std::plus<ratio_holder>());
});
cs::get_row_size.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_size.set(r, [&ctx] (std::unique_ptr<request> req) {
// In origin row size is the weighted size.
// We currently do not support weights, so we use num entries instead
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return cf.get_row_cache().num_entries();
}, std::plus<uint64_t>());
});
cs::get_row_entries.set(r, [] (std::unique_ptr<request> req) {
// TBD
return make_ready_future<json::json_return_type>(0);
cs::get_row_entries.set(r, [&ctx] (std::unique_ptr<request> req) {
return map_reduce_cf(ctx, 0, [](const column_family& cf) {
return cf.get_row_cache().num_entries();
}, std::plus<uint64_t>());
});
cs::get_counter_capacity.set(r, [] (std::unique_ptr<request> req) {

View File

@@ -17,7 +17,7 @@ using namespace std;
using namespace json;
namespace cf = httpd::column_family_json;
auto get_uuid(const sstring& name, const database& db) {
const utils::UUID& get_uuid(const sstring& name, const database& db) {
auto pos = name.find("%3A");
size_t end;
if (pos == sstring::npos) {
@@ -45,43 +45,6 @@ future<> foreach_column_family(http_context& ctx, const sstring& name, function<
});
}
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([mapper, uuid](database& db) {
return mapper(db.find_column_family(uuid));
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
template<class Mapper, class I, class Reducer, class Result>
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer, Result result) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([mapper, uuid](database& db) {
return mapper(db.find_column_family(uuid));
}, init, reducer).then([result](const I& res) mutable {
result = res;
return make_ready_future<json::json_return_type>(result);
});
}
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, I init,
Mapper mapper, Reducer reducer) {
return ctx.db.map_reduce0([mapper, init, reducer](database& db) {
auto res = init;
for (auto i : db.get_column_families()) {
res = reducer(res, mapper(*i.second.get()));
}
return res;
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
static future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& name,
int64_t column_family::stats::*f) {
return map_reduce_cf(ctx, name, 0, [f](const column_family& cf) {

View File

@@ -5,9 +5,52 @@
#pragma once
#include "api.hh"
#include "api/api-doc/column_family.json.hh"
namespace api {
void set_column_family(http_context& ctx, routes& r);
const utils::UUID& get_uuid(const sstring& name, const database& db);
future<> foreach_column_family(http_context& ctx, const sstring& name, std::function<void(column_family&)> f);
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([mapper, uuid](database& db) {
return mapper(db.find_column_family(uuid));
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
template<class Mapper, class I, class Reducer, class Result>
future<json::json_return_type> map_reduce_cf(http_context& ctx, const sstring& name, I init,
Mapper mapper, Reducer reducer, Result result) {
auto uuid = get_uuid(name, ctx.db.local());
return ctx.db.map_reduce0([mapper, uuid](database& db) {
return mapper(db.find_column_family(uuid));
}, init, reducer).then([result](const I& res) mutable {
result = res;
return make_ready_future<json::json_return_type>(result);
});
}
template<class Mapper, class I, class Reducer>
future<json::json_return_type> map_reduce_cf(http_context& ctx, I init,
Mapper mapper, Reducer reducer) {
return ctx.db.map_reduce0([mapper, init, reducer](database& db) {
auto res = init;
for (auto i : db.get_column_families()) {
res = reducer(res, mapper(*i.second.get()));
}
return res;
}, init, reducer).then([](const I& res) {
return make_ready_future<json::json_return_type>(res);
});
}
}

View File

@@ -153,6 +153,9 @@ public:
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
using const_row_ptr = std::unique_ptr<const row>;
memtable& active_memtable() { return *_memtables->back(); }
const row_cache& get_row_cache() const {
return _cache;
}
public:
column_family(schema_ptr schema, config cfg, db::commitlog& cl, compaction_manager&);
column_family(schema_ptr schema, config cfg, no_commitlog, compaction_manager&);

View File

@@ -77,6 +77,10 @@ logalloc::region& cache_tracker::region() {
return _region;
}
const logalloc::region& cache_tracker::region() const {
return _region;
}
// Reader which populates the cache using data from the delegate.
class populating_reader {
row_cache& _cache;

View File

@@ -96,6 +96,7 @@ public:
void insert(cache_entry&);
allocation_strategy& allocator();
logalloc::region& region();
const logalloc::region& region() const;
};
// Returns a reference to shard-wide cache_tracker.
@@ -149,4 +150,10 @@ public:
// The memtable can be queried during the process, but must not be written.
// After the update is complete, memtable is empty.
future<> update(memtable&, negative_mutation_reader underlying_negative);
auto num_entries() const {
return _partitions.size();
}
const cache_tracker& get_cache_tracker() const {
return _tracker;
}
};

View File

@@ -243,39 +243,35 @@ public:
return 0;
}
#endif
/**
* @return the mean histogram value (average of bucket offsets, weighted by count)
* @throws IllegalStateException if any values were greater than the largest bucket threshold
*/
public long mean()
{
int lastBucket = buckets.length() - 1;
if (buckets.get(lastBucket) > 0)
throw new IllegalStateException("Unable to compute ceiling for max when histogram overflowed");
long elements = 0;
long sum = 0;
for (int i = 0; i < lastBucket; i++)
{
long bCount = buckets.get(i);
int64_t mean() {
auto lastBucket = buckets.size() - 1;
int64_t elements = 0;
int64_t sum = 0;
for (size_t i = 0; i < lastBucket; i++) {
long bCount = buckets[i];
elements += bCount;
sum += bCount * bucketOffsets[i];
sum += bCount * bucket_offsets[i];
}
return (long) Math.ceil((double) sum / elements);
return ((double) (sum + elements -1)/ elements);
}
/**
* @return the total number of non-zero values
*/
public long count()
{
long sum = 0L;
for (int i = 0; i < buckets.length(); i++)
sum += buckets.get(i);
return sum;
int64_t count() const {
int64_t sum = 0L;
for (size_t i = 0; i < buckets.size(); i++) {
sum += buckets[i];
}
return sum;
}
#if 0
/**
* @return true if this histogram has overflowed -- that is, a value larger than our largest bucket could bound was added
*/