Wrap output for customization. Move all output handling to a single managing class.

Instead of passing the output parameters to std::cout straight away, use
helper wrappers. This will allow us to add more formats for gathered
tests results.

Introduce helper writer classes hierarchy that can be extended to
support different output formats (JSON, XML, etc).

Signed-off-by: Vladimir Krivopalov <vladimir@scylladb.com>
This commit is contained in:
Vladimir Krivopalov
2018-01-31 12:34:28 -08:00
parent d9f0c1f097
commit e810fc4e09

View File

@@ -21,6 +21,7 @@
#include <boost/algorithm/string/replace.hpp>
#include <boost/range/irange.hpp>
#include <boost/range/algorithm_ext.hpp>
#include "tests/cql_test_env.hh"
#include "tests/perf/perf.hh"
#include "core/app-template.hh"
@@ -41,7 +42,7 @@ reactor::io_stats s;
static bool errors_found = false;
static void print_error(const sstring& msg) {
std::cout << "^^^ ERROR: " << msg << "\n";
std::cerr << "^^^ ERROR: " << msg << "\n";
errors_found = true;
}
@@ -64,15 +65,160 @@ struct metrics_snapshot {
}
};
class make_printable {
using func_type = std::function<void(std::ostream&)>;
func_type _func;
public:
make_printable(func_type func) : _func(std::move(func)) {}
friend std::ostream& operator<<(std::ostream& out, const make_printable& p) {
p._func(out);
return out;
struct output_item {
sstring value;
sstring format;
};
using output_items = std::vector<output_item>;
using sstring_vec = std::vector<sstring>;
template <typename... Args>
sstring_vec to_sstrings(Args... args)
{
return { to_sstring(args)... };
}
struct test_group {
using requires_cache = seastar::bool_class<class requires_cache_tag>;
enum type {
large_partition,
small_partition,
};
std::string name;
std::string message;
requires_cache needs_cache;
type partition_type;
void (*test_fn)(column_family& cf);
};
using stats_values = std::tuple<
double, // time
uint64_t, // frags
double, // frags_per_second
uint64_t, // aio
uint64_t, // kb
uint64_t, // blocked
uint64_t, // dropped
uint64_t, // idx_hit
uint64_t, // idx_miss
uint64_t, // idx_blk
uint64_t, // c_hit
uint64_t, // c_miss
uint64_t, // c_blk
float // cpu
>;
struct output_writer {
virtual void write_test_group(const test_group& group, bool running) = 0;
virtual void write_test_names(const output_items& param_names, const output_items& stats_names) = 0;
virtual void write_test_static_param(sstring name, sstring description) = 0;
virtual void write_test_values(const sstring_vec& params, const stats_values& stats,
const output_items& param_names, const output_items& stats_names) = 0;
};
std::array<sstring, std::tuple_size<stats_values>::value> stats_formats =
{
"{:.6f}",
"{}",
"{:.0f}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{}",
"{:.1f}%",
};
class text_output_writer final
: public output_writer {
private:
template <std::size_t... Is>
inline sstring_vec stats_values_to_strings_impl(const stats_values& values, std::index_sequence<Is...> seq) {
static_assert(stats_formats.size() == seq.size());
sstring_vec result {format(stats_formats[Is].c_str(), std::get<Is>(values))...};
return result;
}
template <typename ...Ts>
sstring_vec stats_values_to_strings(const std::tuple<Ts...>& values) {
return stats_values_to_strings_impl(values, std::index_sequence_for<Ts...>{});
};
public:
void write_test_group(const test_group& group, bool running) override {
std::cout << std::endl << (running ? "running: " : "skipping: ") << group.name << std::endl;
if (running) {
std::cout << group.message << ":" << std::endl;
}
}
void write_test_names(const output_items& param_names, const output_items& stats_names) override {
for (const auto& name: param_names) {
std::cout << format(name.format.c_str(), name.value) << " ";
}
for (const auto& name: stats_names) {
std::cout << format(name.format.c_str(), name.value) << " ";
}
std::cout << std::endl;
}
void write_test_static_param(sstring name, sstring description) override {
std::cout << description << std::endl;
}
void write_test_values(const sstring_vec& params, const stats_values& stats,
const output_items& param_names, const output_items& stats_names) override {
for (size_t i = 0; i < param_names.size(); ++i) {
std::cout << format(param_names.at(i).format.c_str(), params.at(i)) << " ";
}
sstring_vec stats_strings = stats_values_to_strings(stats);
for (size_t i = 0; i < stats_names.size(); ++i) {
std::cout << format(stats_names.at(i).format.c_str(), stats_strings.at(i)) << " ";
}
std::cout << std::endl;
}
};
class output_manager {
private:
std::unique_ptr<output_writer> _writer;
output_items _param_names;
output_items _stats_names;
public:
output_manager() {
_writer = std::make_unique<text_output_writer>();
}
void add_test_group(const test_group& group, bool running) {
_writer->write_test_group(group, running);
}
void set_test_param_names(output_items param_names, output_items stats_names) {
_param_names = std::move(param_names);
_stats_names = std::move(stats_names);
_writer->write_test_names(_param_names, _stats_names);
}
void add_test_values(const sstring_vec& params, const stats_values& stats) {
_writer->write_test_values(params, stats, _param_names, _stats_names);
}
void add_test_static_param(sstring name, sstring description) {
_writer->write_test_static_param(name, description);
}
};
struct test_result {
@@ -110,25 +256,42 @@ struct test_result {
return float(busy_delta) / (busy_delta + idle_delta);
}
static auto table_header() {
return make_printable([] (std::ostream& out) {
out << sprint("%10s %9s %10s %6s %10s %7s %7s %8s %8s %8s %8s %8s %8s %6s",
"time [s]", "frags", "frag/s", "aio", "[KiB]", "blocked", "dropped",
"idx hit", "idx miss", "idx blk",
"c hit", "c miss", "c ins",
"cpu");
});
static output_items stats_names() {
return {
{"time (s)", "{:>10}"},
{"frags", "{:>9}"},
{"frag/s", "{:>10}"},
{"aio", "{:>6}"},
{"(KiB)", "{:>10}"},
{"blocked", "{:>7}"},
{"dropped", "{:>7}"},
{"idx hit", "{:>8}"},
{"idx miss", "{:>8}"},
{"idx blk", "{:>8}"},
{"c hit", "{:>8}"},
{"c miss", "{:>8}"},
{"c blk", "{:>8}"},
{"cpu", "{:>6}"}
};
}
auto table_row() {
return make_printable([this] (std::ostream& out) {
out << sprint("%10.6f %9d %10.0f %6d %10d %7d %7d %8d %8d %8d %8d %8d %8d %5.1f%%",
duration_in_seconds(), fragments_read, fragment_rate(),
aio_reads(), aio_read_bytes() / 1024, reads_blocked(), read_aheads_discarded(),
index_hits(), index_misses(), index_blocks(),
cache_hits(), cache_misses(), cache_insertions(),
cpu_utilization() * 100);
});
stats_values get_stats_values() {
return stats_values{
duration_in_seconds(),
fragments_read,
fragment_rate(),
aio_reads(),
aio_read_bytes() / 1024,
reads_blocked(),
read_aheads_discarded(),
index_hits(),
index_misses(),
index_blocks(),
cache_hits(),
cache_misses(),
cache_insertions(),
cpu_utilization() * 100
};
}
};
@@ -512,6 +675,8 @@ bool new_test_case = false;
table_config cfg;
int_range live_range;
std::unique_ptr<output_manager> output_mgr;
void clear_cache() {
global_cache_tracker().clear();
}
@@ -534,14 +699,12 @@ void on_test_case() {
};
void test_large_partition_single_key_slice(column_family& cf) {
std::cout << sprint("%-2s %-14s ", "", "range") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"", "{:<2}"}, {"range", "{:<14}"}}, test_result::stats_names());
struct first {
};
auto test = [&](int_range range) {
auto r = test_slicing_using_restrictions(cf, range);
std::cout << sprint("%-2s %-14s ", new_test_case ? "->" : "", sprint("%s", range))
<< r.table_row() << "\n";
new_test_case = false;
output_mgr->add_test_values(to_sstrings(new_test_case ? "->": 0, format("{}", range)), r.get_stats_values());
check_fragment_count(r, cardinality(intersection(range, live_range)));
return r;
};
@@ -626,10 +789,10 @@ void test_large_partition_single_key_slice(column_family& cf) {
}
void test_large_partition_skips(column_family& cf) {
std::cout << sprint("%-7s %-7s ", "read", "skip") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names());
auto do_test = [&] (int n_read, int n_skip) {
auto r = scan_rows_with_stride(cf, cfg.n_rows, n_read, n_skip);
std::cout << sprint("%-7d %-7d ", n_read, n_skip) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(n_read, n_skip), r.get_stats_values());
check_fragment_count(r, count_for_skip_pattern(cfg.n_rows, n_read, n_skip));
};
auto test = [&] (int n_read, int n_skip) {
@@ -658,7 +821,7 @@ void test_large_partition_skips(column_family& cf) {
test(64, 4096);
if (cache_enabled) {
std::cout << "Testing cache scan of large partition with varying row continuity.\n";
output_mgr->add_test_static_param("cache_enabled", "Testing cache scan of large partition with varying row continuity.");
for (auto n_read : {1, 64}) {
for (auto n_skip : {1, 64}) {
on_test_case();
@@ -670,11 +833,11 @@ void test_large_partition_skips(column_family& cf) {
}
void test_large_partition_slicing(column_family& cf) {
std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&] (int offset, int read) {
on_test_case();
auto r = slice_rows(cf, offset, read);
std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values());
check_fragment_count(r, std::min(cfg.n_rows - offset, read));
};
@@ -690,11 +853,11 @@ void test_large_partition_slicing(column_family& cf) {
}
void test_large_partition_slicing_clustering_keys(column_family& cf) {
std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&] (int offset, int read) {
on_test_case();
auto r = slice_rows_by_ck(cf, offset, read);
std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values());
check_fragment_count(r, std::min(cfg.n_rows - offset, read));
};
@@ -710,12 +873,11 @@ void test_large_partition_slicing_clustering_keys(column_family& cf) {
}
void test_large_partition_slicing_single_partition_reader(column_family& cf) {
std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header()
<< "\n";
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&](int offset, int read) {
on_test_case();
auto r = slice_rows_single_key(cf, offset, read);
std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values());
check_fragment_count(r, std::min(cfg.n_rows - offset, read));
};
@@ -731,12 +893,11 @@ void test_large_partition_slicing_single_partition_reader(column_family& cf) {
}
void test_large_partition_select_few_rows(column_family& cf) {
std::cout << sprint("%-7s %-7s ", "stride", "rows") << test_result::table_header()
<< "\n";
output_mgr->set_test_param_names({{"stride", "{:<7}"}, {"rows", "{:<7}"}}, test_result::stats_names());
auto test = [&](int stride, int read) {
on_test_case();
auto r = select_spread_rows(cf, stride, read);
std::cout << sprint("%-7d %-7d ", stride, read) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(stride, read), r.get_stats_values());
check_fragment_count(r, read);
};
@@ -749,25 +910,24 @@ void test_large_partition_select_few_rows(column_family& cf) {
}
void test_large_partition_forwarding(column_family& cf) {
std::cout << sprint("%-7s ", "pk-scan") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"pk-scan", "{:<7}"}}, test_result::stats_names());
on_test_case();
auto r = test_forwarding_with_restriction(cf, cfg, false);
check_fragment_count(r, 2);
std::cout << sprint("%-7s ", "yes") << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings("yes"), r.get_stats_values());
on_test_case();
r = test_forwarding_with_restriction(cf, cfg, true);
check_fragment_count(r, 2);
std::cout << sprint("%-7s ", "no") << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings("no"), r.get_stats_values());
}
void test_small_partition_skips(column_family& cf2) {
std::cout << sprint("%-2s %-7s %-7s ", "", "read", "skip") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"", "{:<2}"}, {"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names());
auto do_test = [&] (int n_read, int n_skip) {
auto r = scan_with_stride_partitions(cf2, cfg.n_rows, n_read, n_skip);
std::cout << sprint("%-2s %-7d %-7d ", new_test_case ? "->" : "", n_read, n_skip) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(new_test_case ? "->" : "", n_read, n_skip), r.get_stats_values());
new_test_case = false;
check_fragment_count(r, count_for_skip_pattern(cfg.n_rows, n_read, n_skip));
return r;
@@ -799,7 +959,7 @@ void test_small_partition_skips(column_family& cf2) {
test(64, 4096);
if (cache_enabled) {
std::cout << "Testing cache scan with small partitions with varying continuity.\n";
output_mgr->add_test_static_param("cache_enabled", "Testing cache scan with small partitions with varying continuity.");
for (auto n_read : {1, 64}) {
for (auto n_skip : {1, 64}) {
on_test_case();
@@ -811,11 +971,11 @@ void test_small_partition_skips(column_family& cf2) {
}
void test_small_partition_slicing(column_family& cf2) {
std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n";
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&] (int offset, int read) {
on_test_case();
auto r = slice_partitions(cf2, cfg.n_rows, offset, read);
std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n";
output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values());
check_fragment_count(r, std::min(cfg.n_rows - offset, read));
};
@@ -830,20 +990,6 @@ void test_small_partition_slicing(column_family& cf2) {
test(cfg.n_rows / 2, 4096);
}
struct test_group {
using requires_cache = seastar::bool_class<class requires_cache_tag>;
enum type {
large_partition,
small_partition,
};
std::string name;
std::string message;
requires_cache needs_cache;
type partition_type;
void (*test_fn)(column_family& cf);
};
static std::initializer_list<test_group> test_groups = {
{
"large-partition-single-key-slice",
@@ -986,6 +1132,8 @@ int main(int argc, char** argv) {
std::cout << "Config: rows: " << cfg.n_rows << ", value size: " << cfg.value_size << "\n";
output_mgr = std::make_unique<output_manager>();
sleep(1s).get(); // wait for system table flushes to quiesce
engine().at_exit([&] {
@@ -1004,17 +1152,15 @@ int main(int argc, char** argv) {
cf.run_with_compaction_disabled([&] {
return seastar::async([&] {
live_range = int_range({0}, {cfg.n_rows - 1});
boost::for_each(
enabled_test_groups
| boost::adaptors::filtered([type] (auto&& tc) { return tc.partition_type == type; }),
[&cf] (auto&& tc) {
if (tc.needs_cache && !cache_enabled) {
std::cout << "\nskipping: " << tc.name << "\n";
output_mgr->add_test_group(tc, false);
} else {
std::cout << "\nrunning: " << tc.name << "\n";
output_mgr->add_test_group(tc, true);
on_test_group();
std::cout << tc.message << ":\n";
tc.test_fn(cf);
}
}