This reverts commit ceec703bb7.
The commit was fixed with abort source handling for alternator
standalone path so it's safe to reapply.
237 lines
8.8 KiB
C++
237 lines
8.8 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "perf.hh"
|
|
#include <seastar/core/reactor.hh>
|
|
#include <seastar/core/memory.hh>
|
|
#include "seastarx.hh"
|
|
#include "reader_concurrency_semaphore.hh"
|
|
#include "schema/schema.hh"
|
|
#include "utils/logalloc.hh"
|
|
#include "release.hh"
|
|
#include <boost/algorithm/string.hpp>
|
|
#include <fstream>
|
|
|
|
|
|
uint64_t perf_mallocs() {
|
|
return memory::stats().mallocs();
|
|
}
|
|
|
|
uint64_t perf_logallocs() {
|
|
return logalloc::shard_tracker().statistics().num_allocations;
|
|
}
|
|
|
|
uint64_t perf_tasks_processed() {
|
|
return engine().get_sched_stats().tasks_processed;
|
|
}
|
|
|
|
void scheduling_latency_measurer::schedule_tick() {
|
|
seastar::schedule(make_task(default_scheduling_group(), [self = weak_from_this()] () mutable {
|
|
if (self) {
|
|
self->tick();
|
|
}
|
|
}));
|
|
}
|
|
|
|
auto fmt::formatter<scheduling_latency_measurer>::format(const scheduling_latency_measurer& slm, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
auto to_ms = [] (int64_t nanos) {
|
|
return float(nanos) / 1e6;
|
|
};
|
|
return fmt::format_to(ctx.out(), "{{count: {}, "
|
|
//"min: {:.6f} [ms], "
|
|
//"50%: {:.6f} [ms], "
|
|
//"90%: {:.6f} [ms], "
|
|
"99%: {:.6f} [ms], "
|
|
"max: {:.6f} [ms]}}",
|
|
slm.histogram().count(),
|
|
//to_ms(slm.min().count()),
|
|
//to_ms(slm.histogram().percentile(0.5)),
|
|
//to_ms(slm.histogram().percentile(0.9)),
|
|
to_ms(slm.histogram().percentile(0.99)),
|
|
to_ms(slm.max().count()));
|
|
}
|
|
|
|
auto fmt::formatter<perf_result>::format(const perf_result& result, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "{:.2f} tps ({:5.1f} allocs/op, {:5.1f} logallocs/op, {:5.1f} tasks/op, {:7.0f} insns/op, {:7.0f} cycles/op, {:8} errors)",
|
|
result.throughput, result.mallocs_per_op, result.logallocs_per_op, result.tasks_per_op, result.instructions_per_op, result.cpu_cycles_per_op, result.errors);
|
|
}
|
|
|
|
aggregated_perf_results::aggregated_perf_results(std::vector<perf_result>& results) {
|
|
stats["throughput"] = calculate_stats(results, std::mem_fn(&perf_result::throughput));
|
|
median_by_throughput = results[results.size() / 2];
|
|
stats["instructions_per_op"] = calculate_stats(results, std::mem_fn(&perf_result::instructions_per_op));
|
|
stats["cpu_cycles_per_op"] = calculate_stats(results, std::mem_fn(&perf_result::cpu_cycles_per_op));
|
|
}
|
|
|
|
aggregated_perf_results::stats_t aggregated_perf_results::calculate_stats(std::vector<perf_result>& results, std::function<double(const perf_result&)> get_stat) const {
|
|
stats_t ret;
|
|
std::sort(results.begin(), results.end(), [&] (const perf_result& a, const perf_result& b) {
|
|
return get_stat(a) < get_stat(b);
|
|
});
|
|
ret.min = get_stat(results[0]);
|
|
ret.median = get_stat(results[results.size() / 2]);
|
|
ret.max = get_stat(results[results.size() - 1]);
|
|
|
|
ret.mean = 0;
|
|
for (const auto& pr : results) {
|
|
ret.mean += get_stat(pr);
|
|
}
|
|
ret.mean /= results.size();
|
|
double var = 0;
|
|
std::vector<double> abs_deviations;
|
|
abs_deviations.reserve(results.size());
|
|
for (const auto& pr : results) {
|
|
auto dev = get_stat(pr) - ret.mean;
|
|
abs_deviations.emplace_back(std::abs(dev));
|
|
var += dev * dev;
|
|
}
|
|
// Since the results are samples of the total population
|
|
// devide the sum of squared deviations by (n - 1) rather than by n
|
|
ret.stdev = std::sqrt(results.size() > 1 ? var / (results.size() - 1) : var);
|
|
|
|
std::sort(abs_deviations.begin(), abs_deviations.end());
|
|
ret.median_absolute_deviation = abs_deviations[abs_deviations.size() / 2];
|
|
|
|
return ret;
|
|
}
|
|
|
|
std::ostream&
|
|
operator<<(std::ostream& os, const aggregated_perf_results& result) {
|
|
for (const auto& s : {"throughput", "instructions_per_op", "cpu_cycles_per_op"}) {
|
|
auto& t = result.stats.at(s);
|
|
fmt::print(os, "{}:\n\tmean= {:.2f} standard-deviation={:.2f}\n\tmedian= {:.2f} median-absolute-deviation={:.2f}\n\tmaximum={:.2f} minimum={:.2f}\n",
|
|
s, t.mean, t.stdev, t.median, t.median_absolute_deviation, t.max, t.min);
|
|
}
|
|
return os;
|
|
}
|
|
|
|
aio_writes_result_mixin::aio_writes_result_mixin()
|
|
: aio_writes(engine().get_io_stats().aio_writes)
|
|
, aio_write_bytes(engine().get_io_stats().aio_write_bytes)
|
|
{}
|
|
|
|
void aio_writes_result_mixin::update(aio_writes_result_mixin& result, const executor_shard_stats& stats) {
|
|
result.aio_writes = (engine().get_io_stats().aio_writes - result.aio_writes) / stats.invocations;
|
|
result.aio_write_bytes = (engine().get_io_stats().aio_write_bytes - result.aio_write_bytes) / stats.invocations;
|
|
}
|
|
|
|
auto fmt::formatter<perf_result_with_aio_writes>::format(const perf_result_with_aio_writes& result, fmt::format_context& ctx) const
|
|
-> decltype(ctx.out()) {
|
|
return fmt::format_to(ctx.out(), "{:.2f} tps ({:5.1f} allocs/op, {:5.1f} logallocs/op, {:5.1f} tasks/op, {:7.0f} insns/op, {:7.0f} cycles/op, {:8} errors, {:7.2f} bytes/op, {:5.1f} writes/op)",
|
|
result.throughput, result.mallocs_per_op, result.logallocs_per_op, result.tasks_per_op, result.instructions_per_op, result.cpu_cycles_per_op, result.errors, result.aio_write_bytes, result.aio_writes);
|
|
}
|
|
|
|
namespace perf {
|
|
|
|
reader_concurrency_semaphore_wrapper::reader_concurrency_semaphore_wrapper(sstring name)
|
|
: _semaphore(std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, std::move(name),
|
|
reader_concurrency_semaphore::register_metrics::no))
|
|
{
|
|
}
|
|
|
|
reader_concurrency_semaphore_wrapper::~reader_concurrency_semaphore_wrapper() {
|
|
(void)_semaphore->stop().finally([sem = std::move(_semaphore)] { });
|
|
}
|
|
|
|
reader_permit reader_concurrency_semaphore_wrapper::make_permit() {
|
|
return _semaphore->make_tracking_only_permit(nullptr, "perf", db::no_timeout, {});
|
|
}
|
|
|
|
std::tuple<int, char**> cut_arg(int ac, char** av, std::string name, int num_args) {
|
|
for (int i = 1 ; i < ac - 1; i++) {
|
|
if (std::string(av[i]) == name) {
|
|
std::shift_left(av + i, av + ac, num_args);
|
|
ac -= num_args;
|
|
break;
|
|
}
|
|
}
|
|
return std::make_tuple(ac, av);
|
|
}
|
|
|
|
void write_json_result(const std::string& filename, const aggregated_perf_results& agg, const Json::Value& params, const std::string& test_type) {
|
|
Json::Value results;
|
|
|
|
results["parameters"] = params;
|
|
|
|
Json::Value stats;
|
|
auto med = agg.median_by_throughput;
|
|
stats["median tps"] = med.throughput;
|
|
stats["allocs_per_op"] = med.mallocs_per_op;
|
|
stats["logallocs_per_op"] = med.logallocs_per_op;
|
|
stats["tasks_per_op"] = med.tasks_per_op;
|
|
stats["instructions_per_op"] = med.instructions_per_op;
|
|
stats["cpu_cycles_per_op"] = med.cpu_cycles_per_op;
|
|
const auto& tps = agg.stats.at("throughput");
|
|
stats["mad tps"] = tps.median_absolute_deviation;
|
|
stats["max tps"] = tps.max;
|
|
stats["min tps"] = tps.min;
|
|
results["stats"] = std::move(stats);
|
|
|
|
results["test_properties"]["type"] = test_type;
|
|
|
|
// <version>-<release>
|
|
auto version_components = std::vector<std::string>{};
|
|
auto sver = scylla_version();
|
|
boost::algorithm::split(version_components, sver, boost::is_any_of("-"));
|
|
|
|
Json::Value version;
|
|
if (version_components.size() >= 2) {
|
|
// <scylla-build>.<date>.<git-hash>
|
|
auto release_components = std::vector<std::string>{};
|
|
boost::algorithm::split(release_components, version_components[1], boost::is_any_of("."));
|
|
|
|
if (release_components.size() >= 3) {
|
|
version["commit_id"] = release_components[2];
|
|
version["date"] = release_components[1];
|
|
}
|
|
version["version"] = version_components[0];
|
|
} else {
|
|
version["version"] = sver;
|
|
}
|
|
|
|
auto current_time = std::time(nullptr);
|
|
char time_str[100];
|
|
::tm time_buf;
|
|
std::strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", ::localtime_r(¤t_time, &time_buf));
|
|
version["run_date_time"] = time_str;
|
|
|
|
results["versions"]["scylla-server"] = std::move(version);
|
|
|
|
auto out = std::ofstream(filename);
|
|
out << results;
|
|
}
|
|
|
|
future<> run_standalone(std::function<void(sharded<abort_source>*)> fun) {
|
|
auto as = make_shared<sharded<abort_source>>();
|
|
co_await as->start();
|
|
|
|
auto stop_handler = [as] {
|
|
as->invoke_on_all(&abort_source::request_abort).get();
|
|
};
|
|
|
|
seastar::handle_signal(SIGINT, stop_handler);
|
|
seastar::handle_signal(SIGTERM, stop_handler);
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await seastar::async([fun = std::move(fun), as_ptr = as.get()] {
|
|
fun(as_ptr);
|
|
});
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
co_await as->stop();
|
|
if (ex) {
|
|
std::rethrow_exception(ex);
|
|
}
|
|
}
|
|
|
|
} // namespace perf
|