diff --git a/tests/perf/perf_fast_forward.cc b/tests/perf/perf_fast_forward.cc index 8394997768..965f0add3c 100644 --- a/tests/perf/perf_fast_forward.cc +++ b/tests/perf/perf_fast_forward.cc @@ -21,6 +21,7 @@ #include #include +#include #include "tests/cql_test_env.hh" #include "tests/perf/perf.hh" #include "core/app-template.hh" @@ -41,7 +42,7 @@ reactor::io_stats s; static bool errors_found = false; static void print_error(const sstring& msg) { - std::cout << "^^^ ERROR: " << msg << "\n"; + std::cerr << "^^^ ERROR: " << msg << "\n"; errors_found = true; } @@ -64,15 +65,160 @@ struct metrics_snapshot { } }; -class make_printable { - using func_type = std::function; - func_type _func; -public: - make_printable(func_type func) : _func(std::move(func)) {} - friend std::ostream& operator<<(std::ostream& out, const make_printable& p) { - p._func(out); - return out; +struct output_item { + sstring value; + sstring format; +}; + +using output_items = std::vector; +using sstring_vec = std::vector; + +template +sstring_vec to_sstrings(Args... args) +{ + return { to_sstring(args)... }; +} + +struct test_group { + using requires_cache = seastar::bool_class; + enum type { + large_partition, + small_partition, + }; + + std::string name; + std::string message; + requires_cache needs_cache; + type partition_type; + void (*test_fn)(column_family& cf); +}; + +using stats_values = std::tuple< + double, // time + uint64_t, // frags + double, // frags_per_second + uint64_t, // aio + uint64_t, // kb + uint64_t, // blocked + uint64_t, // dropped + uint64_t, // idx_hit + uint64_t, // idx_miss + uint64_t, // idx_blk + uint64_t, // c_hit + uint64_t, // c_miss + uint64_t, // c_blk + float // cpu +>; + + +struct output_writer { + virtual void write_test_group(const test_group& group, bool running) = 0; + + virtual void write_test_names(const output_items& param_names, const output_items& stats_names) = 0; + + virtual void write_test_static_param(sstring name, sstring description) = 0; + + virtual void write_test_values(const sstring_vec& params, const stats_values& stats, + const output_items& param_names, const output_items& stats_names) = 0; +}; + +std::array::value> stats_formats = +{ + "{:.6f}", + "{}", + "{:.0f}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{}", + "{:.1f}%", +}; + +class text_output_writer final + : public output_writer { +private: + + template + inline sstring_vec stats_values_to_strings_impl(const stats_values& values, std::index_sequence seq) { + static_assert(stats_formats.size() == seq.size()); + sstring_vec result {format(stats_formats[Is].c_str(), std::get(values))...}; + return result; } + + template + sstring_vec stats_values_to_strings(const std::tuple& values) { + return stats_values_to_strings_impl(values, std::index_sequence_for{}); + }; +public: + void write_test_group(const test_group& group, bool running) override { + std::cout << std::endl << (running ? "running: " : "skipping: ") << group.name << std::endl; + if (running) { + std::cout << group.message << ":" << std::endl; + } + } + + void write_test_names(const output_items& param_names, const output_items& stats_names) override { + for (const auto& name: param_names) { + std::cout << format(name.format.c_str(), name.value) << " "; + } + for (const auto& name: stats_names) { + std::cout << format(name.format.c_str(), name.value) << " "; + } + std::cout << std::endl; + } + + void write_test_static_param(sstring name, sstring description) override { + std::cout << description << std::endl; + } + + void write_test_values(const sstring_vec& params, const stats_values& stats, + const output_items& param_names, const output_items& stats_names) override { + for (size_t i = 0; i < param_names.size(); ++i) { + std::cout << format(param_names.at(i).format.c_str(), params.at(i)) << " "; + } + sstring_vec stats_strings = stats_values_to_strings(stats); + for (size_t i = 0; i < stats_names.size(); ++i) { + std::cout << format(stats_names.at(i).format.c_str(), stats_strings.at(i)) << " "; + } + std::cout << std::endl; + } +}; + +class output_manager { +private: + std::unique_ptr _writer; + output_items _param_names; + output_items _stats_names; +public: + + output_manager() { + _writer = std::make_unique(); + } + + void add_test_group(const test_group& group, bool running) { + _writer->write_test_group(group, running); + } + + void set_test_param_names(output_items param_names, output_items stats_names) { + _param_names = std::move(param_names); + _stats_names = std::move(stats_names); + _writer->write_test_names(_param_names, _stats_names); + } + + void add_test_values(const sstring_vec& params, const stats_values& stats) { + _writer->write_test_values(params, stats, _param_names, _stats_names); + } + + void add_test_static_param(sstring name, sstring description) { + _writer->write_test_static_param(name, description); + } + }; struct test_result { @@ -110,25 +256,42 @@ struct test_result { return float(busy_delta) / (busy_delta + idle_delta); } - static auto table_header() { - return make_printable([] (std::ostream& out) { - out << sprint("%10s %9s %10s %6s %10s %7s %7s %8s %8s %8s %8s %8s %8s %6s", - "time [s]", "frags", "frag/s", "aio", "[KiB]", "blocked", "dropped", - "idx hit", "idx miss", "idx blk", - "c hit", "c miss", "c ins", - "cpu"); - }); + static output_items stats_names() { + return { + {"time (s)", "{:>10}"}, + {"frags", "{:>9}"}, + {"frag/s", "{:>10}"}, + {"aio", "{:>6}"}, + {"(KiB)", "{:>10}"}, + {"blocked", "{:>7}"}, + {"dropped", "{:>7}"}, + {"idx hit", "{:>8}"}, + {"idx miss", "{:>8}"}, + {"idx blk", "{:>8}"}, + {"c hit", "{:>8}"}, + {"c miss", "{:>8}"}, + {"c blk", "{:>8}"}, + {"cpu", "{:>6}"} + }; } - auto table_row() { - return make_printable([this] (std::ostream& out) { - out << sprint("%10.6f %9d %10.0f %6d %10d %7d %7d %8d %8d %8d %8d %8d %8d %5.1f%%", - duration_in_seconds(), fragments_read, fragment_rate(), - aio_reads(), aio_read_bytes() / 1024, reads_blocked(), read_aheads_discarded(), - index_hits(), index_misses(), index_blocks(), - cache_hits(), cache_misses(), cache_insertions(), - cpu_utilization() * 100); - }); + stats_values get_stats_values() { + return stats_values{ + duration_in_seconds(), + fragments_read, + fragment_rate(), + aio_reads(), + aio_read_bytes() / 1024, + reads_blocked(), + read_aheads_discarded(), + index_hits(), + index_misses(), + index_blocks(), + cache_hits(), + cache_misses(), + cache_insertions(), + cpu_utilization() * 100 + }; } }; @@ -512,6 +675,8 @@ bool new_test_case = false; table_config cfg; int_range live_range; +std::unique_ptr output_mgr; + void clear_cache() { global_cache_tracker().clear(); } @@ -534,14 +699,12 @@ void on_test_case() { }; void test_large_partition_single_key_slice(column_family& cf) { - std::cout << sprint("%-2s %-14s ", "", "range") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"", "{:<2}"}, {"range", "{:<14}"}}, test_result::stats_names()); struct first { }; auto test = [&](int_range range) { auto r = test_slicing_using_restrictions(cf, range); - std::cout << sprint("%-2s %-14s ", new_test_case ? "->" : "", sprint("%s", range)) - << r.table_row() << "\n"; - new_test_case = false; + output_mgr->add_test_values(to_sstrings(new_test_case ? "->": 0, format("{}", range)), r.get_stats_values()); check_fragment_count(r, cardinality(intersection(range, live_range))); return r; }; @@ -626,10 +789,10 @@ void test_large_partition_single_key_slice(column_family& cf) { } void test_large_partition_skips(column_family& cf) { - std::cout << sprint("%-7s %-7s ", "read", "skip") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names()); auto do_test = [&] (int n_read, int n_skip) { auto r = scan_rows_with_stride(cf, cfg.n_rows, n_read, n_skip); - std::cout << sprint("%-7d %-7d ", n_read, n_skip) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(n_read, n_skip), r.get_stats_values()); check_fragment_count(r, count_for_skip_pattern(cfg.n_rows, n_read, n_skip)); }; auto test = [&] (int n_read, int n_skip) { @@ -658,7 +821,7 @@ void test_large_partition_skips(column_family& cf) { test(64, 4096); if (cache_enabled) { - std::cout << "Testing cache scan of large partition with varying row continuity.\n"; + output_mgr->add_test_static_param("cache_enabled", "Testing cache scan of large partition with varying row continuity."); for (auto n_read : {1, 64}) { for (auto n_skip : {1, 64}) { on_test_case(); @@ -670,11 +833,11 @@ void test_large_partition_skips(column_family& cf) { } void test_large_partition_slicing(column_family& cf) { - std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&] (int offset, int read) { on_test_case(); auto r = slice_rows(cf, offset, read); - std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values()); check_fragment_count(r, std::min(cfg.n_rows - offset, read)); }; @@ -690,11 +853,11 @@ void test_large_partition_slicing(column_family& cf) { } void test_large_partition_slicing_clustering_keys(column_family& cf) { - std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&] (int offset, int read) { on_test_case(); auto r = slice_rows_by_ck(cf, offset, read); - std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values()); check_fragment_count(r, std::min(cfg.n_rows - offset, read)); }; @@ -710,12 +873,11 @@ void test_large_partition_slicing_clustering_keys(column_family& cf) { } void test_large_partition_slicing_single_partition_reader(column_family& cf) { - std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() - << "\n"; + output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&](int offset, int read) { on_test_case(); auto r = slice_rows_single_key(cf, offset, read); - std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values()); check_fragment_count(r, std::min(cfg.n_rows - offset, read)); }; @@ -731,12 +893,11 @@ void test_large_partition_slicing_single_partition_reader(column_family& cf) { } void test_large_partition_select_few_rows(column_family& cf) { - std::cout << sprint("%-7s %-7s ", "stride", "rows") << test_result::table_header() - << "\n"; + output_mgr->set_test_param_names({{"stride", "{:<7}"}, {"rows", "{:<7}"}}, test_result::stats_names()); auto test = [&](int stride, int read) { on_test_case(); auto r = select_spread_rows(cf, stride, read); - std::cout << sprint("%-7d %-7d ", stride, read) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(stride, read), r.get_stats_values()); check_fragment_count(r, read); }; @@ -749,25 +910,24 @@ void test_large_partition_select_few_rows(column_family& cf) { } void test_large_partition_forwarding(column_family& cf) { - std::cout << sprint("%-7s ", "pk-scan") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"pk-scan", "{:<7}"}}, test_result::stats_names()); on_test_case(); auto r = test_forwarding_with_restriction(cf, cfg, false); check_fragment_count(r, 2); - std::cout << sprint("%-7s ", "yes") << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings("yes"), r.get_stats_values()); on_test_case(); r = test_forwarding_with_restriction(cf, cfg, true); check_fragment_count(r, 2); - std::cout << sprint("%-7s ", "no") << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings("no"), r.get_stats_values()); } void test_small_partition_skips(column_family& cf2) { - std::cout << sprint("%-2s %-7s %-7s ", "", "read", "skip") << test_result::table_header() << "\n"; - + output_mgr->set_test_param_names({{"", "{:<2}"}, {"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names()); auto do_test = [&] (int n_read, int n_skip) { auto r = scan_with_stride_partitions(cf2, cfg.n_rows, n_read, n_skip); - std::cout << sprint("%-2s %-7d %-7d ", new_test_case ? "->" : "", n_read, n_skip) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(new_test_case ? "->" : "", n_read, n_skip), r.get_stats_values()); new_test_case = false; check_fragment_count(r, count_for_skip_pattern(cfg.n_rows, n_read, n_skip)); return r; @@ -799,7 +959,7 @@ void test_small_partition_skips(column_family& cf2) { test(64, 4096); if (cache_enabled) { - std::cout << "Testing cache scan with small partitions with varying continuity.\n"; + output_mgr->add_test_static_param("cache_enabled", "Testing cache scan with small partitions with varying continuity."); for (auto n_read : {1, 64}) { for (auto n_skip : {1, 64}) { on_test_case(); @@ -811,11 +971,11 @@ void test_small_partition_skips(column_family& cf2) { } void test_small_partition_slicing(column_family& cf2) { - std::cout << sprint("%-7s %-7s ", "offset", "read") << test_result::table_header() << "\n"; + output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&] (int offset, int read) { on_test_case(); auto r = slice_partitions(cf2, cfg.n_rows, offset, read); - std::cout << sprint("%-7d %-7d ", offset, read) << r.table_row() << "\n"; + output_mgr->add_test_values(to_sstrings(offset, read), r.get_stats_values()); check_fragment_count(r, std::min(cfg.n_rows - offset, read)); }; @@ -830,20 +990,6 @@ void test_small_partition_slicing(column_family& cf2) { test(cfg.n_rows / 2, 4096); } -struct test_group { - using requires_cache = seastar::bool_class; - enum type { - large_partition, - small_partition, - }; - - std::string name; - std::string message; - requires_cache needs_cache; - type partition_type; - void (*test_fn)(column_family& cf); -}; - static std::initializer_list test_groups = { { "large-partition-single-key-slice", @@ -986,6 +1132,8 @@ int main(int argc, char** argv) { std::cout << "Config: rows: " << cfg.n_rows << ", value size: " << cfg.value_size << "\n"; + output_mgr = std::make_unique(); + sleep(1s).get(); // wait for system table flushes to quiesce engine().at_exit([&] { @@ -1004,17 +1152,15 @@ int main(int argc, char** argv) { cf.run_with_compaction_disabled([&] { return seastar::async([&] { live_range = int_range({0}, {cfg.n_rows - 1}); - boost::for_each( enabled_test_groups | boost::adaptors::filtered([type] (auto&& tc) { return tc.partition_type == type; }), [&cf] (auto&& tc) { if (tc.needs_cache && !cache_enabled) { - std::cout << "\nskipping: " << tc.name << "\n"; + output_mgr->add_test_group(tc, false); } else { - std::cout << "\nrunning: " << tc.name << "\n"; + output_mgr->add_test_group(tc, true); on_test_group(); - std::cout << tc.message << ":\n"; tc.test_fn(cf); } }