periodically calculate avg cache hit rate between all shards

This patch adds new class cache_hitrate_calculator whose responsibility
is to periodically calculate average cache hit rates between all shards
for each CF.
This commit is contained in:
Gleb Natapov
2016-10-09 13:35:04 +03:00
parent fab18c0c5a
commit 991ec4a16c
5 changed files with 163 additions and 3 deletions

View File

@@ -1187,6 +1187,11 @@ void column_family::set_metrics() {
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks)
});
if (_schema->ks_name() != db::system_keyspace::NAME) {
_metrics.add_group("column_family", {
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)
});
}
}
void column_family::rebuild_statistics() {

View File

@@ -554,6 +554,10 @@ private:
std::unique_ptr<cell_locker> _counter_cell_locks;
void set_metrics();
seastar::metrics::metric_groups _metrics;
// holds average cache hit rate of all shards
// recalculated periodically
cache_temperature _global_cache_hit_rate = cache_temperature(0.0f);
private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, std::vector<unsigned>&& shards_for_the_sstable);
// Adds new sstable to the set of sstables
@@ -845,6 +849,14 @@ public:
return _compaction_manager;
}
cache_temperature get_global_cache_hit_rate() const {
return _global_cache_hit_rate;
}
void set_global_cache_hit_rate(cache_temperature rate) {
_global_cache_hit_rate = rate;
}
template<typename Func, typename Result = futurize_t<std::result_of_t<Func()>>>
Result run_with_compaction_disabled(Func && func) {
++_compaction_disabled;

12
main.cc
View File

@@ -53,6 +53,7 @@
#include "core/prometheus.hh"
#include "message/messaging_service.hh"
#include <seastar/net/dns.hh>
#include "service/cache_hitrate_calculator.hh"
thread_local disk_error_signal_type commit_error;
thread_local disk_error_signal_type general_disk_error;
@@ -290,6 +291,7 @@ int main(int ac, char** av) {
;
distributed<database> db;
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
debug::db = &db;
auto& qp = cql3::get_query_processor();
auto& proxy = service::get_storage_proxy();
@@ -332,7 +334,7 @@ int main(int ac, char** av) {
tcp_syncookies_sanity();
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value] {
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator] {
read_config(opts, *cfg).get();
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
cfg->log_to_stdout(), cfg->log_to_syslog());
@@ -617,9 +619,13 @@ int main(int ac, char** av) {
lb->start_broadcasting();
service::get_local_storage_service().set_load_broadcaster(lb);
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
api::set_server_gossip_settle(ctx).get();
cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
engine().at_exit([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop(); });
cf_cache_hitrate_calculator.local().run_on(engine().cpu_id());
supervisor::notify("starting native transport");
gms::get_local_gossiper().wait_for_gossip_to_settle();
api::set_server_gossip_settle(ctx).get();
supervisor::notify("starting cf cache hit rate calculator");
service::get_local_storage_service().start_native_transport().get();
if (start_thrift) {
service::get_local_storage_service().start_rpc_server().get();

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "database.hh"
#include "core/timer.hh"
#include "core/sharded.hh"
namespace service {
class cache_hitrate_calculator : public seastar::async_sharded_service<cache_hitrate_calculator> {
seastar::sharded<database>& _db;
seastar::sharded<cache_hitrate_calculator>& _me;
timer<lowres_clock> _timer;
bool _stopped = false;
float _diff = 0;
future<lowres_clock::duration> recalculate_hitrates();
void recalculate_timer();
public:
cache_hitrate_calculator(seastar::sharded<database>& db, seastar::sharded<cache_hitrate_calculator>& me);
void run_on(size_t master, lowres_clock::duration d = std::chrono::milliseconds(2000));
future<> stop();
};
}

View File

@@ -39,6 +39,8 @@
*/
#include "load_broadcaster.hh"
#include "cache_hitrate_calculator.hh"
#include "db/system_keyspace.hh"
namespace service {
@@ -78,4 +80,93 @@ future<> load_broadcaster::stop_broadcasting() {
return std::move(_done);
}
// cache_hitrate_calculator implementation
cache_hitrate_calculator::cache_hitrate_calculator(seastar::sharded<database>& db, seastar::sharded<cache_hitrate_calculator>& me) : _db(db), _me(me),
_timer(std::bind(std::mem_fn(&cache_hitrate_calculator::recalculate_timer), this))
{}
void cache_hitrate_calculator::recalculate_timer() {
recalculate_hitrates().then_wrapped([p = shared_from_this()] (future<lowres_clock::duration> f) {
lowres_clock::duration d;
if (f.failed()) {
d = std::chrono::milliseconds(2000);
} else {
d = f.get0();
}
p->run_on((engine().cpu_id() + 1) % smp::count, d);
});
}
void cache_hitrate_calculator::run_on(size_t master, lowres_clock::duration d) {
if (!_stopped) {
_me.invoke_on(master, [d] (cache_hitrate_calculator& local) {
local._timer.arm(d);
}).handle_exception_type([] (seastar::no_sharded_instance_exception&) { /* ignore */ });
}
}
future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates() {
struct stat {
float h = 0;
float m = 0;
stat& operator+=(stat& o) {
h += o.h;
m += o.m;
return *this;
}
};
static auto non_system_filter = [] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& cf) {
return cf.second->schema()->ks_name() != db::system_keyspace::NAME;
};
auto cf_to_cache_hit_stats = [] (database& db) {
return boost::copy_range<std::unordered_map<utils::UUID, stat>>(db.get_column_families() | boost::adaptors::filtered(non_system_filter) |
boost::adaptors::transformed([] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& cf) {
auto& stats = cf.second->get_row_cache().stats();
return std::make_pair(cf.first, stat{float(stats.hits.rate().rates[0]), float(stats.misses.rate().rates[0])});
}));
};
auto sum_stats_per_cf = [] (std::unordered_map<utils::UUID, stat> a, std::unordered_map<utils::UUID, stat> b) {
for (auto& r : b) {
a[r.first] += r.second;
}
return std::move(a);
};
return _db.map_reduce0(cf_to_cache_hit_stats, std::unordered_map<utils::UUID, stat>(), sum_stats_per_cf).then([this] (std::unordered_map<utils::UUID, stat> rates) mutable {
_diff = 0;
// set calculated rates on all shards
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
stat s = rates.at(cf.first);
float rate = 0;
if (s.h) {
rate = s.h / (s.h + s.m);
}
if (engine().cpu_id() == cpuid) {
// calculate max difference between old rate and new one for all cfs
_diff = std::max(_diff, std::abs(float(cf.second->get_global_cache_hit_rate()) - rate));
}
cf.second->set_global_cache_hit_rate(cache_temperature(rate));
}
});
}).then([this] {
// if max difference during this round is big schedule next recalculate earlier
if (_diff < 0.01) {
return std::chrono::milliseconds(2000);
} else {
return std::chrono::milliseconds(500);
}
});
}
future<> cache_hitrate_calculator::stop() {
_timer.cancel();
_stopped = true;
return make_ready_future<>();
}
}