Files
scylladb/test/unit/row_cache_stress_test.cc
Botond Dénes a0d8102a1f replica/memtable: s/make_flat_reader/make_mutation_reader/
Following the recent refactoring of removing "flat" and "v2" from reader
names, replacing all the fully qualified names with simply "mutation_reader".

Closes scylladb/scylladb#23346
2025-04-01 17:58:13 +03:00

419 lines
16 KiB
C++

/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "seastarx.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/log.hh"
#include <seastar/core/app-template.hh>
#include "replica/memtable.hh"
#include "db/row_cache.hh"
#include "partition_slice_builder.hh"
#include "utils/assert.hh"
#include "utils/int_range.hh"
#include "utils/div_ceil.hh"
#include "utils/to_string.hh"
#include "test/lib/memtable_snapshot_source.hh"
#include <seastar/core/reactor.hh>
#include <fmt/core.h>
#include <fmt/std.h>
static thread_local bool cancelled = false;
using namespace std::chrono_literals;
namespace row_cache_stress_test {
struct table {
simple_schema s;
reader_concurrency_semaphore semaphore;
std::vector<dht::decorated_key> p_keys;
std::vector<api::timestamp_type> p_writetime; // committed writes
std::vector<clustering_key> c_keys;
uint64_t mutation_phase = 0;
uint64_t mutations = 0;
uint64_t reads_started = 0;
uint64_t scans_started = 0;
lw_shared_ptr<replica::memtable> mt;
lw_shared_ptr<replica::memtable> prev_mt;
memtable_snapshot_source underlying;
cache_tracker tracker;
row_cache cache;
table(unsigned partitions, unsigned rows)
: semaphore(reader_concurrency_semaphore::no_limits{}, __FILE__, reader_concurrency_semaphore::register_metrics::no)
, mt(make_lw_shared<replica::memtable>(s.schema()))
, underlying(s.schema())
, cache(s.schema(), snapshot_source([this] { return underlying(); }), tracker)
{
p_keys = s.make_pkeys(partitions);
p_writetime.resize(p_keys.size());
c_keys = s.make_ckeys(rows);
}
reader_permit make_permit() {
return semaphore.make_tracking_only_permit(s.schema(), "test", db::no_timeout, {});
}
future<> stop() noexcept {
return semaphore.stop();
}
void set_schema(schema_ptr new_s) {
s.set_schema(new_s);
mt->set_schema(new_s);
if (prev_mt) {
prev_mt->set_schema(new_s);
}
cache.set_schema(new_s);
underlying.set_schema(new_s);
}
size_t index_of_key(const dht::decorated_key& dk) {
for (auto i : std::views::iota(0u, p_keys.size())) {
if (p_keys[i].equal(*s.schema(), dk)) {
return i;
}
}
throw std::runtime_error(format("key not found: {}", dk));
}
sstring value_tag(int key, uint64_t phase) {
return format("k_0x{:x}_p_0x{:x}", key, phase);
}
mutation get_mutation(int key, api::timestamp_type t, const sstring& tag) {
mutation m(s.schema(), p_keys[key]);
for (auto ck : c_keys) {
s.add_row(m, ck, tag, t);
}
return m;
}
// Must not be called concurrently
void flush() {
testlog.trace("flushing");
prev_mt = std::exchange(mt, make_lw_shared<replica::memtable>(s.schema()));
auto flushed = make_lw_shared<replica::memtable>(s.schema());
flushed->apply(*prev_mt, make_permit()).get();
prev_mt->mark_flushed(flushed->as_data_source());
testlog.trace("updating cache");
cache.update(row_cache::external_updater([&] {
underlying.apply(flushed);
}), *prev_mt).get();
testlog.trace("flush done");
prev_mt = {};
}
void mutate_next_phase() {
testlog.trace("mutating, phase={}", mutation_phase);
for (auto i : std::views::iota(0u, p_keys.size())) {
auto t = s.new_timestamp();
auto tag = value_tag(i, mutation_phase);
auto m = get_mutation(i, t, tag);
mt->apply(std::move(m));
p_writetime[i] = t;
testlog.trace("updated key {}, {} @{}", i, tag, t);
++mutations;
yield().get();
}
testlog.trace("mutated whole ring");
++mutation_phase;
// FIXME: mutate concurrently with flush
flush();
}
struct reader {
dht::partition_range pr;
query::partition_slice slice;
std::optional<mutation_fragment_v1_stream> rd;
reader(dht::partition_range pr_, query::partition_slice slice_) noexcept
: pr(std::move(pr_))
, slice(std::move(slice_))
{ }
~reader() {
rd->close().get();
}
};
void alter_schema() {
static thread_local int col_id = 0;
auto new_s = schema_builder(s.schema())
.with_column(to_bytes(format("_a{}", col_id++)), byte_type)
.build();
testlog.trace("changing schema to {}", *new_s);
set_schema(new_s);
}
std::unique_ptr<reader> make_reader(dht::partition_range pr, query::partition_slice slice) {
testlog.trace("making reader, pk={} ck={}", pr, slice);
auto r = std::make_unique<reader>(std::move(pr), std::move(slice));
std::vector<mutation_reader> rd;
auto permit = make_permit();
if (prev_mt) {
rd.push_back(prev_mt->make_mutation_reader(s.schema(), permit, r->pr, r->slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
}
rd.push_back(mt->make_mutation_reader(s.schema(), permit, r->pr, r->slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
rd.push_back(cache.make_reader(s.schema(), permit, r->pr, r->slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
r->rd = mutation_fragment_v1_stream(make_combined_reader(s.schema(), permit, std::move(rd), streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
return r;
}
std::unique_ptr<reader> make_single_key_reader(int pk, int_range ck_range) {
++reads_started;
auto slice = partition_slice_builder(*s.schema())
.with_range(ck_range.transform([this] (int key) { return c_keys[key]; }))
.build();
auto pr = dht::partition_range::make_singular(p_keys[pk]);
return make_reader(std::move(pr), std::move(slice));
}
std::unique_ptr<reader> make_scanning_reader() {
++scans_started;
return make_reader(query::full_partition_range, s.schema()->full_slice());
}
};
struct reader_id {
sstring name;
};
} // namespace row_cache_stress_test
// TODO: use format_as() after {fmt} v10
template <> struct fmt::formatter<row_cache_stress_test::reader_id> : fmt::formatter<string_view> {
auto format(const row_cache_stress_test::reader_id& id, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", id.name);
}
};
namespace row_cache_stress_test {
class validating_consumer {
table& _t;
reader_id _id;
std::optional<sstring> _value;
size_t _row_count = 0;
size_t _key = 0;
std::vector<api::timestamp_type> _writetimes;
schema_ptr _s;
public:
validating_consumer(table& t, reader_id id, schema_ptr s)
: _t(t)
, _id(id)
, _writetimes(t.p_writetime)
, _s(s)
{ }
void consume_new_partition(const dht::decorated_key& key) {
testlog.trace("reader {}: enters partition {}", _id, key);
_value = {};
_key = _t.index_of_key(key);
}
stop_iteration consume_end_of_partition() { return stop_iteration::no; }
stop_iteration consume(tombstone) { return stop_iteration::no; }
stop_iteration consume(const static_row&) { return stop_iteration::no; }
stop_iteration consume(const range_tombstone&) { return stop_iteration::no; }
stop_iteration consume(const clustering_row& row) {
++_row_count;
sstring value;
api::timestamp_type t;
std::tie(value, t) = _t.s.get_value(*_s, row);
testlog.trace("reader {}: {} @{}, {}", _id, value, t, clustering_row::printer(*_s, row));
if (_value && value != _value) {
throw std::runtime_error(fmt::format("Saw values from two different writes in partition {:d}: {} and {}", _key, _value, value));
}
auto lowest_timestamp = _writetimes[_key];
if (t < lowest_timestamp) {
throw std::runtime_error(fmt::format("Expected to see the write @{:d}, but saw @{:d} ({}), c_key={}", lowest_timestamp, t, value, row.key()));
}
_value = std::move(value);
return stop_iteration::no;
}
size_t consume_end_of_stream() {
testlog.trace("reader {}: done, {} rows", _id, _row_count);
return _row_count;
}
};
template<typename T>
class monotonic_counter {
std::function<T()> _getter;
T _prev;
public:
monotonic_counter(std::function<T()> getter)
: _getter(std::move(getter)) {
_prev = _getter();
}
// Return change in value since the last call to change() or rate().
auto change() {
auto now = _getter();
return now - std::exchange(_prev, now);
}
};
}
using namespace row_cache_stress_test;
int main(int argc, char** argv) {
namespace bpo = boost::program_options;
app_template app;
app.add_options()
("trace", "Enables trace-level logging for the test actions")
("concurrency", bpo::value<unsigned>()->default_value(10), "Number of concurrent single partition readers")
("scan-concurrency", bpo::value<unsigned>()->default_value(2), "Number of concurrent ring scanners")
("partitions", bpo::value<unsigned>()->default_value(10), "Number of partitions")
("rows", bpo::value<unsigned>()->default_value(10000), "Number of rows in each partitions")
("seconds", bpo::value<unsigned>()->default_value(600), "Duration [s] after which the test terminates with a success")
;
return app.run(argc, argv, [&app] {
if (app.configuration().contains("trace")) {
testlog.set_level(seastar::log_level::trace);
}
return seastar::async([&app] {
auto concurrency = app.configuration()["concurrency"].as<unsigned>();
auto scan_concurrency = app.configuration()["scan-concurrency"].as<unsigned>();
auto partitions = app.configuration()["partitions"].as<unsigned>();
auto rows = app.configuration()["rows"].as<unsigned>();
auto seconds = app.configuration()["seconds"].as<unsigned>();
row_cache_stress_test::table t(partitions, rows);
auto stop_t = deferred_stop(t);
auto stop_test = defer([] {
cancelled = true;
});
timer<> completion_timer;
completion_timer.set_callback([&] {
testlog.info("Test done.");
cancelled = true;
});
completion_timer.arm(std::chrono::seconds(seconds));
auto fail = [&] (sstring msg) {
testlog.error("{}", msg);
cancelled = true;
completion_timer.cancel();
};
// Stats printer
timer<> stats_printer;
monotonic_counter<uint64_t> reads([&] { return t.reads_started; });
monotonic_counter<uint64_t> scans([&] { return t.scans_started; });
monotonic_counter<uint64_t> mutations([&] { return t.mutations; });
monotonic_counter<uint64_t> flushes([&] { return t.mutation_phase; });
stats_printer.set_callback([&] {
auto MB = 1024 * 1024;
testlog.info("reads/s: {}, scans/s: {}, mutations/s: {}, flushes/s: {}, Cache: {}/{} [MB], LSA: {}/{} [MB], std free: {} [MB]",
reads.change(), scans.change(), mutations.change(), flushes.change(),
t.tracker.region().occupancy().used_space() / MB,
t.tracker.region().occupancy().total_space() / MB,
logalloc::shard_tracker().region_occupancy().used_space() / MB,
logalloc::shard_tracker().region_occupancy().total_space() / MB,
seastar::memory::stats().free_memory() / MB);
});
stats_printer.arm_periodic(1s);
auto single_partition_reader = [&] (int i, reader_id id) {
auto n_keys = t.c_keys.size();
// Assign ranges so that there is ~30% overlap between adjacent readers.
auto len = div_ceil(n_keys, concurrency);
len = std::min(n_keys, len + div_ceil(len, 3)); // so that read ranges overlap
auto start = (n_keys - len) * i / (std::max(concurrency - 1, 1u));
int_range ck_range = make_int_range(start, start + len);
int pk = t.p_keys.size() / 2; // FIXME: spread over 3 consecutive partitions
testlog.info("{} is using pk={} ck={}", id, pk, ck_range);
while (!cancelled) {
testlog.trace("{}: starting read", id);
auto rd = t.make_single_key_reader(pk, ck_range);
auto row_count = rd->rd->consume(validating_consumer(t, id, t.s.schema())).get();
if (row_count != len) {
throw std::runtime_error(format("Expected {:d} fragments, got {:d}", len, row_count));
}
}
};
auto scanning_reader = [&] (reader_id id) {
auto expected_row_count = t.p_keys.size() * t.c_keys.size();
while (!cancelled) {
testlog.trace("{}: starting read", id);
auto rd = t.make_scanning_reader();
auto row_count = rd->rd->consume(validating_consumer(t, id, t.s.schema())).get();
if (row_count != expected_row_count) {
throw std::runtime_error(format("Expected {:d} fragments, got {:d}", expected_row_count, row_count));
}
}
};
// populate the initial phase, readers expect constant fragment count.
t.mutate_next_phase();
auto readers = parallel_for_each(std::views::iota(0u, concurrency), [&] (auto i) {
reader_id id{format("single-{:d}", i)};
return seastar::async([&, i, id] {
single_partition_reader(i, id);
}).handle_exception([&, id] (auto e) {
fail(format("{} failed: {}", id, e));
});
});
auto scanning_readers = parallel_for_each(std::views::iota(0u, scan_concurrency), [&] (auto i) {
reader_id id{format("scan-{:d}", i)};
return seastar::async([&, id] {
scanning_reader(id);
}).handle_exception([&, id] (auto e) {
fail(format("{} failed: {}", id, e));
});
});
timer<> evictor;
evictor.set_callback([&] {
testlog.trace("evicting");
t.cache.evict();
});
evictor.arm_periodic(3s);
timer<> schema_changer;
schema_changer.set_callback([&] {
t.alter_schema();
});
schema_changer.arm_periodic(1s);
// Mutator
while (!cancelled) {
t.mutate_next_phase();
}
stats_printer.cancel();
completion_timer.cancel();
evictor.cancel();
readers.get();
scanning_readers.get();
t.cache.evict();
t.tracker.cleaner().drain().get();
t.tracker.memtable_cleaner().drain().get();
SCYLLA_ASSERT(t.tracker.get_stats().partitions == 0);
SCYLLA_ASSERT(t.tracker.get_stats().rows == 0);
});
});
}