periodically calculate avg cache hit rate between all shards
This patch adds new class cache_hitrate_calculator whose responsibility is to periodically calculate average cache hit rates between all shards for each CF.
This commit is contained in:
@@ -1187,6 +1187,11 @@ void column_family::set_metrics() {
|
||||
ms::make_gauge("live_sstable", ms::description("Live sstable count"), _stats.live_sstable_count)(cf)(ks),
|
||||
ms::make_gauge("pending_compaction", ms::description("Estimated number of compactions pending for this column family"), _stats.pending_compactions)(cf)(ks)
|
||||
});
|
||||
if (_schema->ks_name() != db::system_keyspace::NAME) {
|
||||
_metrics.add_group("column_family", {
|
||||
ms::make_gauge("cache_hit_rate", ms::description("Cache hit rate"), [this] {return float(_global_cache_hit_rate);})(cf)(ks)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void column_family::rebuild_statistics() {
|
||||
|
||||
12
database.hh
12
database.hh
@@ -554,6 +554,10 @@ private:
|
||||
std::unique_ptr<cell_locker> _counter_cell_locks;
|
||||
void set_metrics();
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
|
||||
// holds average cache hit rate of all shards
|
||||
// recalculated periodically
|
||||
cache_temperature _global_cache_hit_rate = cache_temperature(0.0f);
|
||||
private:
|
||||
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, std::vector<unsigned>&& shards_for_the_sstable);
|
||||
// Adds new sstable to the set of sstables
|
||||
@@ -845,6 +849,14 @@ public:
|
||||
return _compaction_manager;
|
||||
}
|
||||
|
||||
cache_temperature get_global_cache_hit_rate() const {
|
||||
return _global_cache_hit_rate;
|
||||
}
|
||||
|
||||
void set_global_cache_hit_rate(cache_temperature rate) {
|
||||
_global_cache_hit_rate = rate;
|
||||
}
|
||||
|
||||
template<typename Func, typename Result = futurize_t<std::result_of_t<Func()>>>
|
||||
Result run_with_compaction_disabled(Func && func) {
|
||||
++_compaction_disabled;
|
||||
|
||||
12
main.cc
12
main.cc
@@ -53,6 +53,7 @@
|
||||
#include "core/prometheus.hh"
|
||||
#include "message/messaging_service.hh"
|
||||
#include <seastar/net/dns.hh>
|
||||
#include "service/cache_hitrate_calculator.hh"
|
||||
|
||||
thread_local disk_error_signal_type commit_error;
|
||||
thread_local disk_error_signal_type general_disk_error;
|
||||
@@ -290,6 +291,7 @@ int main(int ac, char** av) {
|
||||
;
|
||||
|
||||
distributed<database> db;
|
||||
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
|
||||
debug::db = &db;
|
||||
auto& qp = cql3::get_query_processor();
|
||||
auto& proxy = service::get_storage_proxy();
|
||||
@@ -332,7 +334,7 @@ int main(int ac, char** av) {
|
||||
|
||||
tcp_syncookies_sanity();
|
||||
|
||||
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value] {
|
||||
return seastar::async([cfg, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator] {
|
||||
read_config(opts, *cfg).get();
|
||||
apply_logger_settings(cfg->default_log_level(), cfg->logger_log_level(),
|
||||
cfg->log_to_stdout(), cfg->log_to_syslog());
|
||||
@@ -617,9 +619,13 @@ int main(int ac, char** av) {
|
||||
lb->start_broadcasting();
|
||||
service::get_local_storage_service().set_load_broadcaster(lb);
|
||||
engine().at_exit([lb = std::move(lb)] () mutable { return lb->stop_broadcasting(); });
|
||||
gms::get_local_gossiper().wait_for_gossip_to_settle().get();
|
||||
api::set_server_gossip_settle(ctx).get();
|
||||
cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
|
||||
engine().at_exit([&cf_cache_hitrate_calculator] { return cf_cache_hitrate_calculator.stop(); });
|
||||
cf_cache_hitrate_calculator.local().run_on(engine().cpu_id());
|
||||
supervisor::notify("starting native transport");
|
||||
gms::get_local_gossiper().wait_for_gossip_to_settle();
|
||||
api::set_server_gossip_settle(ctx).get();
|
||||
supervisor::notify("starting cf cache hit rate calculator");
|
||||
service::get_local_storage_service().start_native_transport().get();
|
||||
if (start_thrift) {
|
||||
service::get_local_storage_service().start_rpc_server().get();
|
||||
|
||||
46
service/cache_hitrate_calculator.hh
Normal file
46
service/cache_hitrate_calculator.hh
Normal file
@@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Copyright (C) 2016 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "database.hh"
|
||||
#include "core/timer.hh"
|
||||
#include "core/sharded.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
class cache_hitrate_calculator : public seastar::async_sharded_service<cache_hitrate_calculator> {
|
||||
seastar::sharded<database>& _db;
|
||||
seastar::sharded<cache_hitrate_calculator>& _me;
|
||||
timer<lowres_clock> _timer;
|
||||
bool _stopped = false;
|
||||
float _diff = 0;
|
||||
|
||||
future<lowres_clock::duration> recalculate_hitrates();
|
||||
void recalculate_timer();
|
||||
public:
|
||||
cache_hitrate_calculator(seastar::sharded<database>& db, seastar::sharded<cache_hitrate_calculator>& me);
|
||||
void run_on(size_t master, lowres_clock::duration d = std::chrono::milliseconds(2000));
|
||||
|
||||
future<> stop();
|
||||
};
|
||||
|
||||
}
|
||||
@@ -39,6 +39,8 @@
|
||||
*/
|
||||
|
||||
#include "load_broadcaster.hh"
|
||||
#include "cache_hitrate_calculator.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
@@ -78,4 +80,93 @@ future<> load_broadcaster::stop_broadcasting() {
|
||||
return std::move(_done);
|
||||
}
|
||||
|
||||
|
||||
// cache_hitrate_calculator implementation
|
||||
cache_hitrate_calculator::cache_hitrate_calculator(seastar::sharded<database>& db, seastar::sharded<cache_hitrate_calculator>& me) : _db(db), _me(me),
|
||||
_timer(std::bind(std::mem_fn(&cache_hitrate_calculator::recalculate_timer), this))
|
||||
{}
|
||||
|
||||
void cache_hitrate_calculator::recalculate_timer() {
|
||||
recalculate_hitrates().then_wrapped([p = shared_from_this()] (future<lowres_clock::duration> f) {
|
||||
lowres_clock::duration d;
|
||||
if (f.failed()) {
|
||||
d = std::chrono::milliseconds(2000);
|
||||
} else {
|
||||
d = f.get0();
|
||||
}
|
||||
p->run_on((engine().cpu_id() + 1) % smp::count, d);
|
||||
});
|
||||
}
|
||||
|
||||
void cache_hitrate_calculator::run_on(size_t master, lowres_clock::duration d) {
|
||||
if (!_stopped) {
|
||||
_me.invoke_on(master, [d] (cache_hitrate_calculator& local) {
|
||||
local._timer.arm(d);
|
||||
}).handle_exception_type([] (seastar::no_sharded_instance_exception&) { /* ignore */ });
|
||||
}
|
||||
}
|
||||
|
||||
future<lowres_clock::duration> cache_hitrate_calculator::recalculate_hitrates() {
|
||||
struct stat {
|
||||
float h = 0;
|
||||
float m = 0;
|
||||
stat& operator+=(stat& o) {
|
||||
h += o.h;
|
||||
m += o.m;
|
||||
return *this;
|
||||
}
|
||||
};
|
||||
|
||||
static auto non_system_filter = [] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& cf) {
|
||||
return cf.second->schema()->ks_name() != db::system_keyspace::NAME;
|
||||
};
|
||||
|
||||
auto cf_to_cache_hit_stats = [] (database& db) {
|
||||
return boost::copy_range<std::unordered_map<utils::UUID, stat>>(db.get_column_families() | boost::adaptors::filtered(non_system_filter) |
|
||||
boost::adaptors::transformed([] (const std::pair<utils::UUID, lw_shared_ptr<column_family>>& cf) {
|
||||
auto& stats = cf.second->get_row_cache().stats();
|
||||
return std::make_pair(cf.first, stat{float(stats.hits.rate().rates[0]), float(stats.misses.rate().rates[0])});
|
||||
}));
|
||||
};
|
||||
|
||||
auto sum_stats_per_cf = [] (std::unordered_map<utils::UUID, stat> a, std::unordered_map<utils::UUID, stat> b) {
|
||||
for (auto& r : b) {
|
||||
a[r.first] += r.second;
|
||||
}
|
||||
return std::move(a);
|
||||
};
|
||||
|
||||
return _db.map_reduce0(cf_to_cache_hit_stats, std::unordered_map<utils::UUID, stat>(), sum_stats_per_cf).then([this] (std::unordered_map<utils::UUID, stat> rates) mutable {
|
||||
_diff = 0;
|
||||
// set calculated rates on all shards
|
||||
return _db.invoke_on_all([this, rates = std::move(rates), cpuid = engine().cpu_id()] (database& db) {
|
||||
for (auto& cf : db.get_column_families() | boost::adaptors::filtered(non_system_filter)) {
|
||||
stat s = rates.at(cf.first);
|
||||
float rate = 0;
|
||||
if (s.h) {
|
||||
rate = s.h / (s.h + s.m);
|
||||
}
|
||||
if (engine().cpu_id() == cpuid) {
|
||||
// calculate max difference between old rate and new one for all cfs
|
||||
_diff = std::max(_diff, std::abs(float(cf.second->get_global_cache_hit_rate()) - rate));
|
||||
}
|
||||
cf.second->set_global_cache_hit_rate(cache_temperature(rate));
|
||||
}
|
||||
});
|
||||
}).then([this] {
|
||||
// if max difference during this round is big schedule next recalculate earlier
|
||||
if (_diff < 0.01) {
|
||||
return std::chrono::milliseconds(2000);
|
||||
} else {
|
||||
return std::chrono::milliseconds(500);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<> cache_hitrate_calculator::stop() {
|
||||
_timer.cancel();
|
||||
_stopped = true;
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user