Files
scylladb/test/boost/mutation_fragment_test.cc
Benny Halevy 3feb759943 everywhere: use utils::chunked_vector for list of mutations
Currently, we use std::vector<*mutation> to keep
a list of mutations for processing.
This can lead to large allocation, e.g. when the vector
size is a function of the number of tables.

Use a chunked vector instead to prevent oversized allocations.

`perf-simple-query --smp 1` results obtained for fixed 400MHz frequency
and PGO disabled:

Before (read path):
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...

89055.97 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39417 insns/op,   18003 cycles/op,        0 errors)
103372.72 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39380 insns/op,   17300 cycles/op,        0 errors)
98942.27 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39413 insns/op,   17336 cycles/op,        0 errors)
103752.93 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39407 insns/op,   17252 cycles/op,        0 errors)
102516.77 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39403 insns/op,   17288 cycles/op,        0 errors)
throughput:
	mean=   99528.13 standard-deviation=6155.71
	median= 102516.77 median-absolute-deviation=3844.59
	maximum=103752.93 minimum=89055.97
instructions_per_op:
	mean=   39403.99 standard-deviation=14.25
	median= 39406.75 median-absolute-deviation=9.30
	maximum=39416.63 minimum=39380.39
cpu_cycles_per_op:
	mean=   17435.81 standard-deviation=318.24
	median= 17300.40 median-absolute-deviation=147.59
	maximum=18002.53 minimum=17251.75
```

After (read path)
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=read, query_single_key=no, counters=no}
Disabling auto compaction
Creating 10000 partitions...
59755.04 tps ( 66.2 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39466 insns/op,   22834 cycles/op,        0 errors)
71854.16 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39417 insns/op,   17883 cycles/op,        0 errors)
82149.45 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.2 tasks/op,   39411 insns/op,   17409 cycles/op,        0 errors)
49640.04 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.3 tasks/op,   39474 insns/op,   19975 cycles/op,        0 errors)
54963.22 tps ( 66.1 allocs/op,   0.0 logallocs/op,  14.3 tasks/op,   39474 insns/op,   18235 cycles/op,        0 errors)
throughput:
	mean=   63672.38 standard-deviation=13195.12
	median= 59755.04 median-absolute-deviation=8709.16
	maximum=82149.45 minimum=49640.04
instructions_per_op:
	mean=   39448.38 standard-deviation=31.60
	median= 39466.17 median-absolute-deviation=25.75
	maximum=39474.12 minimum=39411.42
cpu_cycles_per_op:
	mean=   19267.01 standard-deviation=2217.03
	median= 18234.80 median-absolute-deviation=1384.25
	maximum=22834.26 minimum=17408.67
```

`perf-simple-query --smp 1 --write` results obtained for fixed 400MHz frequency
and PGO disabled:

Before (write path):
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=write, query_single_key=no, counters=no}
Disabling auto compaction
63736.96 tps ( 59.4 allocs/op,  16.4 logallocs/op,  14.3 tasks/op,   49667 insns/op,   19924 cycles/op,        0 errors)
64109.41 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   49992 insns/op,   20084 cycles/op,        0 errors)
56950.47 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50005 insns/op,   20501 cycles/op,        0 errors)
44858.42 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50014 insns/op,   21947 cycles/op,        0 errors)
28592.87 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50027 insns/op,   27659 cycles/op,        0 errors)
throughput:
	mean=   51649.63 standard-deviation=15059.74
	median= 56950.47 median-absolute-deviation=12087.33
	maximum=64109.41 minimum=28592.87
instructions_per_op:
	mean=   49941.18 standard-deviation=153.76
	median= 50005.24 median-absolute-deviation=73.01
	maximum=50027.07 minimum=49667.05
cpu_cycles_per_op:
	mean=   22023.01 standard-deviation=3249.92
	median= 20500.74 median-absolute-deviation=1938.76
	maximum=27658.75 minimum=19924.32
```

After (write path)
```
enable-cache=1
Running test with config: {partitions=10000, concurrency=100, mode=write, query_single_key=no, counters=no}
Disabling auto compaction
53395.93 tps ( 59.4 allocs/op,  16.5 logallocs/op,  14.3 tasks/op,   50326 insns/op,   21252 cycles/op,        0 errors)
46527.83 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50704 insns/op,   21555 cycles/op,        0 errors)
55846.30 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50731 insns/op,   21060 cycles/op,        0 errors)
55669.30 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50735 insns/op,   21521 cycles/op,        0 errors)
52130.17 tps ( 59.3 allocs/op,  16.0 logallocs/op,  14.3 tasks/op,   50757 insns/op,   21334 cycles/op,        0 errors)
throughput:
	mean=   52713.91 standard-deviation=3795.38
	median= 53395.93 median-absolute-deviation=2955.40
	maximum=55846.30 minimum=46527.83
instructions_per_op:
	mean=   50650.57 standard-deviation=182.46
	median= 50731.38 median-absolute-deviation=84.09
	maximum=50756.62 minimum=50325.87
cpu_cycles_per_op:
	mean=   21344.42 standard-deviation=202.86
	median= 21334.00 median-absolute-deviation=176.37
	maximum=21554.61 minimum=21060.24
```

Fixes #24815

Improvement for rare corner cases. No backport required

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>

Closes scylladb/scylladb#24919
2025-07-13 19:13:11 +03:00

707 lines
30 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/thread.hh>
#include <seastar/testing/on_internal_error.hh>
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/closeable.hh>
#include <fmt/std.h>
#include "test/lib/mutation_source_test.hh"
#include "mutation/mutation_fragment.hh"
#include "mutation/frozen_mutation.hh"
#include "schema/schema_builder.hh"
#include "test/boost/total_order_check.hh"
#include "schema_upgrader.hh"
#include "readers/combined.hh"
#include "replica/memtable.hh"
#include "utils/to_string.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/fragment_scatterer.hh"
#include "test/lib/test_utils.hh"
#include "readers/from_mutations.hh"
#include <boost/range/join.hpp>
SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
run_mutation_source_tests([&](schema_ptr s, const utils::chunked_vector<mutation>& partitions) -> mutation_source {
// We create a mutation source which combines N memtables.
// The input fragments are spread among the memtables according to some selection logic,
const int n = 5;
std::vector<lw_shared_ptr<replica::memtable>> memtables;
for (int i = 0; i < n; ++i) {
memtables.push_back(make_lw_shared<replica::memtable>(s));
}
for (auto&& m : partitions) {
auto rd = make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m});
auto close_rd = deferred_close(rd);
auto muts = rd.consume(fragment_scatterer(s, n)).get();
for (int i = 0; i < n; ++i) {
memtables[i]->apply(std::move(muts[i]));
}
}
return mutation_source([memtables] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
std::vector<mutation_reader> readers;
for (int i = 0; i < n; ++i) {
readers.push_back(memtables[i]->make_mutation_reader(s, permit, range, slice, trace_state, fwd, fwd_mr));
}
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
});
});
});
}
SEASTAR_TEST_CASE(test_range_tombstones_stream) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck1", int32_type, column_kind::clustering_key)
.with_column("ck2", int32_type, column_kind::clustering_key)
.with_column("r", int32_type)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pk = partition_key::from_single_value(*s, int32_type->decompose(0));
auto create_ck = [&] (std::vector<int> v) {
std::vector<bytes> vs;
std::ranges::transform(v, std::back_inserter(vs), [] (int x) { return int32_type->decompose(x); });
return clustering_key_prefix::from_exploded(*s, std::move(vs));
};
tombstone t0(0, { });
tombstone t1(1, { });
auto rt1 = range_tombstone(create_ck({ 1 }), t0, bound_kind::incl_start, create_ck({ 1, 3 }), bound_kind::incl_end);
auto rt2 = range_tombstone(create_ck({ 1, 1 }), t1, bound_kind::incl_start, create_ck({ 1, 3 }), bound_kind::excl_end);
auto rt3 = range_tombstone(create_ck({ 1, 1 }), t0, bound_kind::incl_start, create_ck({ 2 }), bound_kind::incl_end);
auto rt4 = range_tombstone(create_ck({ 2 }), t0, bound_kind::incl_start, create_ck({ 2, 2 }), bound_kind::incl_end);
auto permit = semaphore.make_permit();
mutation_fragment cr1(*s, permit, clustering_row(create_ck({ 0, 0 })));
mutation_fragment cr2(*s, permit, clustering_row(create_ck({ 1, 0 })));
mutation_fragment cr3(*s, permit, clustering_row(create_ck({ 1, 1 })));
auto cr4 = rows_entry(create_ck({ 1, 2 }));
auto cr5 = rows_entry(create_ck({ 1, 3 }));
range_tombstone_stream rts(*s, permit);
rts.apply(range_tombstone(rt1));
rts.apply(range_tombstone(rt2));
rts.apply(range_tombstone(rt4));
mutation_fragment_opt mf = rts.get_next(cr1);
BOOST_REQUIRE(!mf);
mf = rts.get_next(cr2);
BOOST_REQUIRE(mf && mf->is_range_tombstone());
auto expected1 = range_tombstone(create_ck({ 1 }), t0, bound_kind::incl_start, create_ck({ 1, 1 }), bound_kind::excl_end);
BOOST_REQUIRE(mf->as_range_tombstone().equal(*s, expected1));
mf = rts.get_next(cr2);
BOOST_REQUIRE(!mf);
mf = rts.get_next(mutation_fragment(*s, permit, range_tombstone(rt3)));
BOOST_REQUIRE(mf && mf->is_range_tombstone());
BOOST_REQUIRE(mf->as_range_tombstone().equal(*s, rt2));
mf = rts.get_next(cr3);
BOOST_REQUIRE(!mf);
mf = rts.get_next(cr4);
BOOST_REQUIRE(!mf);
mf = rts.get_next(cr5);
BOOST_REQUIRE(mf && mf->is_range_tombstone());
auto expected2 = range_tombstone(create_ck({ 1, 3 }), t0, bound_kind::incl_start, create_ck({ 1, 3 }), bound_kind::incl_end);
BOOST_REQUIRE(mf->as_range_tombstone().equal(*s, expected2));
mf = rts.get_next();
BOOST_REQUIRE(mf && mf->is_range_tombstone());
BOOST_REQUIRE(mf->as_range_tombstone().equal(*s, rt4));
mf = rts.get_next();
BOOST_REQUIRE(!mf);
});
}
static
composite cell_name(const schema& s, const clustering_key& ck, const column_definition& col) {
if (s.is_dense()) {
return composite::serialize_value(ck.components(s), s.is_compound());
} else {
const managed_bytes_view column_name = bytes_view(col.name());
return composite::serialize_value(boost::range::join(
boost::make_iterator_range(ck.begin(s), ck.end(s)),
boost::make_iterator_range(&column_name, &column_name + 1)),
s.is_compound());
}
}
static
composite cell_name_for_static_column(const schema& s, const column_definition& cdef) {
const bytes_view column_name = cdef.name();
return composite::serialize_static(s, boost::make_iterator_range(&column_name, &column_name + 1));
}
inline
composite composite_for_key(const schema& s, const clustering_key& ck) {
return composite::serialize_value(ck.components(s), s.is_compound());
}
inline
composite composite_before_key(const schema& s, const clustering_key& ck) {
return composite::serialize_value(ck.components(s), s.is_compound(), composite::eoc::start);
}
inline
composite composite_after_prefixed(const schema& s, const clustering_key& ck) {
return composite::serialize_value(ck.components(s), s.is_compound(), composite::eoc::end);
}
inline
position_in_partition position_for_row(const clustering_key& ck) {
return position_in_partition(position_in_partition::clustering_row_tag_t(), ck);
}
inline
position_in_partition position_before(const clustering_key& ck) {
return position_in_partition(position_in_partition::range_tag_t(), bound_view(ck, bound_kind::incl_start));
}
inline
position_in_partition position_after_prefixed(const clustering_key& ck) {
return position_in_partition(position_in_partition::range_tag_t(), bound_view(ck, bound_kind::incl_end));
}
SEASTAR_TEST_CASE(test_ordering_of_position_in_partition_and_composite_view) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck1", int32_type, column_kind::clustering_key)
.with_column("ck2", int32_type, column_kind::clustering_key)
.with_column("s1", int32_type, column_kind::static_column)
.with_column("v", int32_type)
.build();
const column_definition& v_def = *s->get_column_definition("v");
const column_definition& s_def = *s->get_column_definition("s1");
auto make_ck = [&] (int ck1, int ck2) {
std::vector<data_value> cells;
cells.push_back(data_value(ck1));
cells.push_back(data_value(ck2));
return clustering_key::from_deeply_exploded(*s, cells);
};
auto ck1 = make_ck(1, 2);
auto ck2 = make_ck(2, 1);
auto ck3 = make_ck(2, 3);
auto ck4 = make_ck(3, 1);
using cmp = position_in_partition::composite_tri_compare;
total_order_check<cmp, position_in_partition, composite>(cmp(*s))
.next(cell_name_for_static_column(*s, s_def))
.equal_to(position_range::full().start())
.next(position_before(ck1))
.equal_to(composite_before_key(*s, ck1))
.equal_to(composite_for_key(*s, ck1))
.equal_to(position_for_row(ck1))
.next(cell_name(*s, ck1, v_def))
.next(position_after_prefixed(ck1))
.equal_to(composite_after_prefixed(*s, ck1))
.next(position_before(ck2))
.equal_to(composite_before_key(*s, ck2))
.equal_to(composite_for_key(*s, ck2))
.equal_to(position_for_row(ck2))
.next(cell_name(*s, ck2, v_def))
.next(position_after_prefixed(ck2))
.equal_to(composite_after_prefixed(*s, ck2))
.next(position_before(ck3))
.equal_to(composite_before_key(*s, ck3))
.equal_to(composite_for_key(*s, ck3))
.equal_to(position_for_row(ck3))
.next(cell_name(*s, ck3, v_def))
.next(position_after_prefixed(ck3))
.equal_to(composite_after_prefixed(*s, ck3))
.next(position_before(ck4))
.equal_to(composite_before_key(*s, ck4))
.equal_to(composite_for_key(*s, ck4))
.equal_to(position_for_row(ck4))
.next(cell_name(*s, ck4, v_def))
.next(position_after_prefixed(ck4))
.equal_to(composite_after_prefixed(*s, ck4))
.next(position_range::full().end())
.check();
});
}
SEASTAR_TEST_CASE(test_ordering_of_position_in_partition_and_composite_view_in_a_dense_table) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck1", int32_type, column_kind::clustering_key)
.with_column("ck2", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.set_is_dense(true)
.build();
auto make_ck = [&] (int ck1, std::optional<int> ck2 = std::nullopt) {
std::vector<data_value> cells;
cells.push_back(data_value(ck1));
if (ck2) {
cells.push_back(data_value(ck2));
}
return clustering_key::from_deeply_exploded(*s, cells);
};
auto ck1 = make_ck(1);
auto ck2 = make_ck(1, 2);
auto ck3 = make_ck(2);
auto ck4 = make_ck(2, 3);
auto ck5 = make_ck(2, 4);
auto ck6 = make_ck(3);
using cmp = position_in_partition::composite_tri_compare;
total_order_check<cmp, position_in_partition, composite>(cmp(*s))
.next(composite())
.next(position_range::full().start())
.next(position_before(ck1))
.equal_to(composite_before_key(*s, ck1))
.equal_to(composite_for_key(*s, ck1))
.equal_to(position_for_row(ck1))
// .next(position_after(ck1)) // FIXME: #1446
.next(position_before(ck2))
.equal_to(composite_before_key(*s, ck2))
.equal_to(composite_for_key(*s, ck2))
.equal_to(position_for_row(ck2))
.next(position_after_prefixed(ck2))
.equal_to(composite_after_prefixed(*s, ck2))
.next(position_after_prefixed(ck1)) // prefix of ck2
.equal_to(composite_after_prefixed(*s, ck1))
.next(position_before(ck3))
.equal_to(composite_before_key(*s, ck3))
.equal_to(composite_for_key(*s, ck3))
.equal_to(position_for_row(ck3))
// .next(position_after(ck3)) // FIXME: #1446
.next(position_before(ck4))
.equal_to(composite_before_key(*s, ck4))
.equal_to(composite_for_key(*s, ck4))
.equal_to(position_for_row(ck4))
.next(position_after_prefixed(ck4))
.equal_to(composite_after_prefixed(*s, ck4))
.next(position_before(ck5))
.equal_to(composite_before_key(*s, ck5))
.equal_to(composite_for_key(*s, ck5))
.equal_to(position_for_row(ck5))
.next(position_after_prefixed(ck5))
.equal_to(composite_after_prefixed(*s, ck5))
.next(position_after_prefixed(ck3)) // prefix of ck4-ck5
.equal_to(composite_after_prefixed(*s, ck3))
.next(position_before(ck6))
.equal_to(composite_before_key(*s, ck6))
.equal_to(composite_for_key(*s, ck6))
.equal_to(position_for_row(ck6))
.next(position_after_prefixed(ck6))
.equal_to(composite_after_prefixed(*s, ck6))
.next(position_range::full().end())
.check();
});
}
SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
for_each_mutation_pair([&](const mutation& m1, const mutation& m2, are_equal eq) {
if (m1.schema()->version() != m2.schema()->version()) {
// upgrade m1 to m2's schema
auto reader = transform(make_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1}), schema_upgrader_v2(m2.schema()));
auto close_reader = deferred_close(reader);
auto from_upgrader = read_mutation_from_mutation_reader(reader).get();
auto regular = m1;
regular.upgrade(m2.schema());
assert_that(from_upgrader).has_mutation().is_equal_to(regular);
}
});
});
}
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_mutate_exception_safety) {
struct dummy_exception { };
simple_schema s;
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
const auto available_res = sem.available_resources();
const sstring val(1024, 'a');
// partition start
{
try {
auto ps = mutation_fragment(*s.schema(), permit, partition_start(s.make_pkey(0), {}));
ps.mutate_as_partition_start(*s.schema(), [&] (partition_start&) {
throw dummy_exception{};
});
} catch (dummy_exception&) { }
BOOST_REQUIRE(available_res == sem.available_resources());
}
// static row
{
try {
auto sr = s.make_static_row(permit, val);
// Copy to move to our permit.
sr = mutation_fragment(*s.schema(), permit, sr);
sr.mutate_as_clustering_row(*s.schema(), [&] (clustering_row&) {
throw dummy_exception{};
});
} catch (dummy_exception&) { }
BOOST_REQUIRE(available_res == sem.available_resources());
}
// clustering row
{
try {
auto cr = s.make_row(permit, s.make_ckey(0), val);
// Copy to move to our permit.
cr = mutation_fragment(*s.schema(), permit, cr);
cr.mutate_as_clustering_row(*s.schema(), [&] (clustering_row&) {
throw dummy_exception{};
});
} catch (dummy_exception&) { }
BOOST_REQUIRE(available_res == sem.available_resources());
}
// range tombstone
{
try {
auto rt = mutation_fragment(*s.schema(), permit, s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(0))));
rt.mutate_as_range_tombstone(*s.schema(), [&] (range_tombstone&) {
throw dummy_exception{};
});
} catch (dummy_exception&) { }
BOOST_REQUIRE(available_res == sem.available_resources());
}
}
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
testing::scoped_no_abort_on_internal_error _;
simple_schema ss;
const auto dkeys = ss.make_pkeys(3);
const auto& dk_ = dkeys[0];
const auto& dk0 = dkeys[1];
const auto& dk1 = dkeys[2];
const auto ck0 = ss.make_ckey(0);
const auto ck1 = ss.make_ckey(1);
const auto ck2 = ss.make_ckey(2);
const auto ck3 = ss.make_ckey(3);
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {});
auto expect = [&] (bool expect_valid, const char* desc, unsigned at, auto&& first_mf, auto&&... mf) {
std::vector<mutation_fragment_v2> mfs;
{
bool need_inject_ps = false;
if constexpr (std::is_same_v<std::remove_reference_t<decltype(first_mf)>, mutation_fragment_v2>) {
need_inject_ps = !first_mf.is_partition_start();
} else {
need_inject_ps = !std::is_same_v<std::remove_reference_t<decltype(first_mf)>, partition_start>;
}
if (need_inject_ps) {
testlog.trace("Injecting partition start");
mfs.emplace_back(*ss.schema(), permit, partition_start(dk_, {}));
if (at != std::numeric_limits<unsigned>::max()) {
++at;
}
}
mfs.emplace_back(*ss.schema(), permit, std::move(first_mf));
auto _ = std::vector<mutation_fragment_v2*>{&mfs.emplace_back(*ss.schema(), permit, std::move(mf))..., };
}
testlog.info("Checking scenario {} with validator", desc);
{
unsigned i = 0;
mutation_fragment_stream_validator validator(*ss.schema());
bool valid = true;
for (const auto& mf : mfs) {
testlog.trace("validate fragment [{}] {} @ {}", i, mf.mutation_fragment_kind(), mf.position());
valid &= bool(validator(mf));
if (expect_valid) {
if (!valid) {
BOOST_FAIL(fmt::format("Unexpected invalid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
}
} else {
if (i == at && valid) {
BOOST_FAIL(fmt::format("Unexpected valid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
}
}
++i;
}
if (expect_valid || i <= at) {
valid &= bool(validator.on_end_of_stream());
BOOST_REQUIRE(valid == expect_valid);
}
}
testlog.info("Checking scenario {} with validating filter", desc);
{
unsigned i = 0;
mutation_fragment_stream_validating_filter validator(get_name(), *ss.schema(), mutation_fragment_stream_validation_level::clustering_key);
for (const auto& mf : mfs) {
testlog.trace("validate fragment [{}] {} @ {}", i, mf.mutation_fragment_kind(), mf.position());
try {
validator(mf);
if (!expect_valid && i == at) {
BOOST_FAIL(fmt::format("Unexpected valid fragment {} @ {}", mf.mutation_fragment_kind(), mf.position()));
}
} catch (invalid_mutation_fragment_stream& e) {
if (expect_valid || i < at) {
BOOST_FAIL(fmt::format("Unexpected invalid fragment {} @ {}: {}", mf.mutation_fragment_kind(), mf.position(), e));
} else {
testlog.trace("Got expected exception for fragment {} @ {}: {}", mf.mutation_fragment_kind(), mf.position(), e);
}
}
++i;
}
if (expect_valid || i <= at) {
try {
validator.on_end_of_stream();
if (!expect_valid) {
BOOST_FAIL("Unexpected valid EOS");
}
} catch (invalid_mutation_fragment_stream& e) {
if (expect_valid) {
BOOST_FAIL(fmt::format("Unexpected invalid EOS: {}", e));
} else {
testlog.trace("Got expected exception at EOS: {}", e);
}
}
}
}
};
auto expect_valid = [&] (const char* desc, auto&&... mf) {
return expect(true, desc, std::numeric_limits<unsigned>::max(), std::move(mf)...);
};
auto expect_invalid_at_eos = [&] (const char* desc, auto&&... mf) {
return expect(false, desc, std::numeric_limits<unsigned>::max(), std::move(mf)...);
};
auto expect_invalid_at_fragment = [&] (const char* desc, unsigned at, auto&&... mf) {
return expect(false, desc, at, std::move(mf)...);
};
expect_valid(
"kitchen sink",
partition_start(dk0, {}),
ss.make_static_row_v2(permit, "v"),
ss.make_row_v2(permit, ck0, "ck0"),
ss.make_row_v2(permit, ck1, "ck1"),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::before_key(ck2), {ss.new_tombstone()}),
ss.make_row_v2(permit, ck2, "ck2"),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{},
partition_start(dk1, {}),
partition_end{});
expect_valid(
"static row alone",
partition_start(dk0, {}),
ss.make_static_row_v2(permit, "v"),
partition_end{});
expect_valid(
"clustering row alone",
partition_start(dk0, {}),
ss.make_row_v2(permit, ck0, "ck0"),
partition_end{});
expect_valid(
"2 range tombstone changes",
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{});
expect_valid(
"null range tombstone change alone",
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{});
expect_invalid_at_eos(
"missing partition end at EOS",
partition_start(dk0, {}));
expect_invalid_at_fragment(
"active range tombstone end at partition end",
2,
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
partition_end{});
const auto ps = mutation_fragment_v2(*ss.schema(), permit, partition_start(dk1, {}));
const auto sr = ss.make_static_row_v2(permit, "v");
const auto cr = ss.make_row_v2(permit, ck2, "ck2");
const auto rtc = mutation_fragment_v2(*ss.schema(), permit, range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}));
const auto pe = mutation_fragment_v2(*ss.schema(), permit, partition_end{});
auto check_invalid_after = [&] (auto&& mf_raw, std::initializer_list<const mutation_fragment_v2*> invalid_mfs) {
auto mf = mutation_fragment_v2(*ss.schema(), permit, std::move(mf_raw));
for (const auto invalid_mf : invalid_mfs) {
std::string desc;
if (mf.position().region() == partition_region::clustered) {
desc = fmt::format("{} @ {} after {} @ {}", invalid_mf->mutation_fragment_kind(), invalid_mf->position(), mf.mutation_fragment_kind(), mf.position());
} else {
desc = fmt::format("{} after {}", invalid_mf->mutation_fragment_kind(), mf.mutation_fragment_kind());
}
expect_invalid_at_fragment(
desc.c_str(),
1,
mutation_fragment_v2(*ss.schema(), permit, mf),
mutation_fragment_v2(*ss.schema(), permit, *invalid_mf));
}
};
check_invalid_after(partition_start(dk0, {}), {&ps});
check_invalid_after(sr, {&sr, &ps});
check_invalid_after(cr, {&ps, &sr, &cr});
check_invalid_after(rtc, {&ps, &sr});
check_invalid_after(pe, {&sr, &cr, &rtc, &pe});
}
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage) {
simple_schema ss;
const auto dkeys = ss.make_pkeys(3);
const auto& dk_ = dkeys[0];
const auto& dk0 = dkeys[1];
const auto ck0 = ss.make_ckey(0);
const auto ck1 = ss.make_ckey(1);
const auto ck2 = ss.make_ckey(2);
const auto ck3 = ss.make_ckey(3);
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {});
mutation_fragment_stream_validator validator(*ss.schema());
using mf_kind = mutation_fragment_v2::kind;
BOOST_REQUIRE(validator(mf_kind::partition_start, {}));
BOOST_REQUIRE(validator(dk_.token()));
BOOST_REQUIRE(validator(mf_kind::static_row, position_in_partition_view(position_in_partition_view::static_row_tag_t{}), {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(!validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck0), {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck1), {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck1), {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck1), {}));
BOOST_REQUIRE(validator(mf_kind::partition_end, {}));
BOOST_REQUIRE(validator(dk0));
BOOST_REQUIRE(!validator(dk0));
}
SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_validation_level) {
simple_schema ss;
const auto dkeys = ss.make_pkeys(5);
const auto& dk_ = dkeys[0];
const auto& dk0 = dkeys[1];
const auto& dk1 = dkeys[2];
const auto ck0 = ss.make_ckey(0);
const auto ck1 = ss.make_ckey(1);
const auto ck2 = ss.make_ckey(2);
const auto ck3 = ss.make_ckey(3);
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 100);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(ss.schema(), get_name(), db::no_timeout, {});
using vl = mutation_fragment_stream_validation_level;
using mf_kind = mutation_fragment_v2::kind;
const auto ps_pos = position_in_partition_view(position_in_partition_view::partition_start_tag_t{});
const auto sr_pos = position_in_partition_view(position_in_partition_view::static_row_tag_t{});
const auto pe_pos = position_in_partition_view(position_in_partition_view::end_of_partition_tag_t{});
for (const auto validation_level : {vl::none, vl::partition_region, vl::token, vl::partition_key, vl::clustering_key}) {
testlog.info("valiation_level={}", static_cast<int>(validation_level));
mutation_fragment_stream_validating_filter validator("test", *ss.schema(), validation_level, false);
BOOST_REQUIRE(validator(mf_kind::partition_start, ps_pos, {}));
BOOST_REQUIRE(validator(dk_));
BOOST_REQUIRE(validator(mf_kind::static_row, sr_pos, {}));
// OOO fragment kind
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition::for_key(ck0), {}));
BOOST_REQUIRE(validation_level < vl::partition_region || !validator(mf_kind::static_row, sr_pos, {}));
// OOO clustering row
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition::for_key(ck1), {}));
BOOST_REQUIRE(validation_level < vl::clustering_key || !validator(mf_kind::clustering_row, position_in_partition::for_key(ck0), {}));
// Active range tombstone at partition-end
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck2), ss.new_tombstone()));
if (validation_level == vl::none) {
BOOST_REQUIRE(validator(mf_kind::partition_end, pe_pos, {}));
} else {
BOOST_REQUIRE(!validator(mf_kind::partition_end, pe_pos, {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck3), tombstone()));
BOOST_REQUIRE(validator(mf_kind::partition_end, pe_pos, {}));
}
BOOST_REQUIRE(validator(dk1));
// OOO partition-key
BOOST_REQUIRE(validation_level < vl::partition_key || !validator(dk1));
// OOO token
BOOST_REQUIRE(validation_level < vl::token || !validator(dk0));
}
}