Files
scylladb/test/boost/mutation_reader_test.cc
Botond Dénes 6004e84f18 test: move away from tombstone_gc_state(nullptr) ctor
Use for_tests() instead (or no_gc() where approriate).
2026-03-03 14:09:28 +02:00

4635 lines
194 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <list>
#include <random>
#include <source_location>
#include <algorithm>
#include <fmt/ranges.h>
#include <fmt/std.h>
#include <seastar/core/sleep.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/util/closeable.hh>
#include "sstables/generation_type.hh"
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/eventually.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/mutation_reader_assertions.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/make_random_string.hh"
#include "test/lib/dummy_sharder.hh"
#include "test/lib/reader_lifecycle_policy.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/simple_position_reader_queue.hh"
#include "test/lib/fragment_scatterer.hh"
#include "test/lib/key_utils.hh"
#include "test/lib/test_utils.hh"
#include <boost/test/unit_test.hpp>
#include "dht/sharder.hh"
#include "schema/schema_builder.hh"
#include "replica/cell_locking.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_set_impl.hh"
#include "replica/database.hh"
#include "partition_slice_builder.hh"
#include "schema/schema_registry.hh"
#include "utils/assert.hh"
#include "mutation/mutation_rebuilder.hh"
#include "readers/from_mutations.hh"
#include "readers/forwardable.hh"
#include "readers/from_fragments.hh"
#include "readers/mutation_fragment_v1_stream.hh"
#include "readers/generating.hh"
#include "readers/empty.hh"
#include "readers/next_partition_adaptor.hh"
#include "readers/combined.hh"
#include "readers/compacting.hh"
#include "readers/foreign.hh"
#include "readers/filtering.hh"
#include "readers/evictable.hh"
#include "readers/queue.hh"
BOOST_AUTO_TEST_SUITE(mutation_reader_test)
namespace test_label = boost::unit_test;
static schema_ptr make_schema() {
return schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type, column_kind::regular_column)
.build();
}
SEASTAR_TEST_CASE(test_combining_two_readers_with_the_same_row) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
mutation m1(s, partition_key::from_single_value(*s, "key1"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
mutation m2(s, partition_key::from_single_value(*s, "key1"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
assert_that(make_combined_reader(s, permit, make_mutation_reader_from_mutations(s, permit, m1), make_mutation_reader_from_mutations(s, permit, m2)))
.produces(m2)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
mutation m1(s, partition_key::from_single_value(*s, "keyB"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
mutation m2(s, partition_key::from_single_value(*s, "keyA"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
auto cr = make_combined_reader(s, permit, make_mutation_reader_from_mutations(s, permit, m1), make_mutation_reader_from_mutations(s, permit, m2));
assert_that(std::move(cr))
.produces(m2)
.produces(m1)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
mutation m1(s, partition_key::from_single_value(*s, "keyA"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
mutation m2(s, partition_key::from_single_value(*s, "keyB"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 1);
mutation m3(s, partition_key::from_single_value(*s, "keyC"));
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
assert_that(make_combined_reader(s, permit, make_mutation_reader_from_mutations(s, permit, {m1, m2}), make_mutation_reader_from_mutations(s, permit, {m2, m3})))
.produces(m1)
.produces(m2)
.produces(m3)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_one_reader_with_many_partitions) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
mutation m1(s, partition_key::from_single_value(*s, "keyA"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
mutation m2(s, partition_key::from_single_value(*s, "keyB"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 1);
mutation m3(s, partition_key::from_single_value(*s, "keyC"));
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s, permit, {m1, m2, m3}));
assert_that(make_combined_reader(s, permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(m1)
.produces(m2)
.produces(m3)
.produces_end_of_stream();
});
}
SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_within_partition_test) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto pk = s.make_pkey();
const auto ckeys = s.make_ckeys(10);
auto make_partition = [&] (auto&& ckey_indexes) -> mutation {
mutation mut(s.schema(), pk);
for (auto ckey_index : ckey_indexes) {
s.add_row(mut, ckeys[ckey_index], format("val_{:02d}", ckey_index), 1);
}
return mut;
};
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_partition(std::views::iota(0, 5))));
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_partition(std::views::iota(5, 10))));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition(std::views::iota(0, 10)))
.produces_end_of_stream();
}
template<typename Collection>
mutation make_partition_with_clustering_rows(simple_schema& s, const dht::decorated_key& pkey, Collection&& ckey_nums) {
mutation mut(s.schema(), pkey);
for (auto i : ckey_nums) {
s.add_row(mut, s.make_ckey(i), format("val_{:02d}", i), 1);
}
return mut;
}
SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_galloping_over_multiple_partitions_test) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto k = s.make_pkeys(2);
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], std::views::iota(5, 10)),
make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 5))
}));
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 5)),
make_partition_with_clustering_rows(s, k[1], std::views::iota(5, 10))
}));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 10)))
.produces(make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 10)))
.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_changing_multiple_partitions_test) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto k = s.make_pkeys(2);
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 5)),
make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 5))
}));
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], std::views::iota(5, 10)),
make_partition_with_clustering_rows(s, k[1], std::views::iota(5, 10)),
}));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 10)))
.produces(make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 10)))
.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_combined_reader_range_tombstone_change_merging) {
simple_schema s;
const auto schema = s.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto rtc = [&] (uint32_t ckey, std::optional<api::timestamp_type> ts) {
return range_tombstone_change(
position_in_partition::before_key(s.make_ckey(ckey)),
ts ? tombstone(*ts, {}) : tombstone());
};
struct input {
std::vector<range_tombstone_change> rtcs;
};
struct output {
std::vector<range_tombstone_change> rtcs;
};
auto check = [&] (const char* desc, std::vector<input> in, output out, std::source_location sl = std::source_location::current()) {
testlog.info("check() {} @ {}:{}", desc, sl.file_name(), sl.line());
std::vector<mutation_reader> readers;
for (auto& i : in) {
std::deque<mutation_fragment_v2> fragments;
fragments.emplace_back(*schema, permit, partition_start(s.make_pkey(0), {}));
for (auto& rtc : i.rtcs) {
fragments.emplace_back(*schema, permit, std::move(rtc));
}
fragments.emplace_back(*schema, permit, partition_end{});
readers.emplace_back(make_mutation_reader_from_fragments(schema, permit, std::move(fragments)));
}
auto combined_reader = assert_that(make_combined_reader(schema, permit, std::move(readers)));
combined_reader.produces_partition_start(s.make_pkey(0));
for (const auto& rtc : out.rtcs) {
combined_reader.produces_range_tombstone_change(rtc);
}
combined_reader.produces_partition_end();
combined_reader.produces_end_of_stream();
};
check("single stream",
{
input{{rtc(1, 100), rtc(2, {})}}
},
output{{rtc(1, 100), rtc(2, {})}});
check("two streams, two rtc with same pos",
{
input{{rtc(1, 100), rtc(2, {})}},
input{{rtc(1, 200), rtc(2, {})}}
},
output{{rtc(1, 200), rtc(2, {})}});
check("two streams, alternating updates to both streams",
{
input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}},
input{{rtc(1, 200), rtc(2, 100), rtc(4, {})}}
},
output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}});
check("two streams, active tombstone terminated, fallback to overshadowed tombstone",
{
input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}},
input{{rtc(1, 200), rtc(2, {})}}
},
output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}});
check("three streams, active tombstone terminated, fallback to overshadowed tombstones",
{
input{{rtc(1, 100), rtc(4, 200), rtc(5, {})}},
input{{rtc(1, 200), rtc(3, {}), rtc(4, {})}},
input{{rtc(1, 300), rtc(2, {})}}
},
output{{rtc(1, 300), rtc(2, 200), rtc(3, 100), rtc(4, 200), rtc(5, {})}});
check("three streams, tombstones not representing timestamp upgrades are omitted",
{
input{{rtc(1, 100), rtc(6, {})}},
input{{rtc(2, 100), rtc(5, {})}},
input{{rtc(3, 100), rtc(4, {})}}
},
output{{rtc(1, 100), rtc(6, {})}});
check("two streams, new tombstone has smaller timestamp",
{
input{{rtc(1, 200), rtc(3, {})}},
input{{rtc(2, 100), rtc(4, {})}},
},
output{{rtc(1, 200), rtc(3, 100), rtc(4, {})}});
}
static mutation make_mutation_with_key(schema_ptr s, dht::decorated_key dk) {
mutation m(s, std::move(dk));
m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
return m;
}
static mutation make_mutation_with_key(schema_ptr s, const char* key) {
return make_mutation_with_key(s, dht::decorate_key(*s, partition_key::from_single_value(*s, bytes(key))));
}
SEASTAR_TEST_CASE(test_filtering) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto m1 = make_mutation_with_key(s, "key1");
auto m2 = make_mutation_with_key(s, "key2");
auto m3 = make_mutation_with_key(s, "key3");
auto m4 = make_mutation_with_key(s, "key4");
// All pass
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[] (const dht::decorated_key& dk) { return true; }))
.produces(m1)
.produces(m2)
.produces(m3)
.produces(m4)
.produces_end_of_stream();
// None pass
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[] (const dht::decorated_key& dk) { return false; }))
.produces_end_of_stream();
// Trim front
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m1.key()); }))
.produces(m2)
.produces(m3)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m1.key()) && !dk.key().equal(*s, m2.key()); }))
.produces(m3)
.produces(m4)
.produces_end_of_stream();
// Trim back
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m4.key()); }))
.produces(m1)
.produces(m2)
.produces(m3)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m4.key()) && !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces_end_of_stream();
// Trim middle
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m2.key()) && !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m4)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
mutation m1(s, partition_key::from_single_value(*s, "key1"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
assert_that(make_combined_reader(s, permit, make_mutation_reader_from_mutations(s, permit, m1), make_empty_mutation_reader(s, permit)))
.produces(m1)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_two_empty_readers) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
assert_that(make_combined_reader(s, permit, make_empty_mutation_reader(s, permit), make_empty_mutation_reader(s, permit)))
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_one_empty_reader) {
return seastar::async([] {
std::vector<mutation_reader> v;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto s = make_schema();
v.push_back(make_empty_mutation_reader(s, permit));
assert_that(make_combined_reader(s, permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces_end_of_stream();
});
}
std::vector<dht::decorated_key> generate_keys(schema_ptr s, int count) {
auto keys =
std::views::iota(0, count) | std::views::transform([s] (int key) {
auto pk = partition_key::from_single_value(*s, int32_type->decompose(data_value(key)));
return dht::decorate_key(*s, std::move(pk));
}) | std::ranges::to<std::vector<dht::decorated_key>>();
std::ranges::sort(keys, dht::decorated_key::less_comparator(s));
return keys;
}
std::vector<dht::ring_position> to_ring_positions(const std::vector<dht::decorated_key>& keys) {
return keys | std::views::transform([] (const dht::decorated_key& key) {
return dht::ring_position(key);
}) | std::ranges::to<std::vector>();
}
SEASTAR_TEST_CASE(test_fast_forwarding_combining_reader) {
return seastar::async([] {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto keys = generate_keys(s, 7);
auto ring = to_ring_positions(keys);
std::vector<utils::chunked_vector<mutation>> mutations {
{
make_mutation_with_key(s, keys[0]),
make_mutation_with_key(s, keys[1]),
make_mutation_with_key(s, keys[2]),
},
{
make_mutation_with_key(s, keys[2]),
make_mutation_with_key(s, keys[3]),
make_mutation_with_key(s, keys[4]),
},
{
make_mutation_with_key(s, keys[1]),
make_mutation_with_key(s, keys[3]),
make_mutation_with_key(s, keys[5]),
},
{
make_mutation_with_key(s, keys[0]),
make_mutation_with_key(s, keys[5]),
make_mutation_with_key(s, keys[6]),
},
};
auto make_reader = [&] (reader_permit permit, const dht::partition_range& pr) {
return make_combined_reader(s, permit, mutations | std::views::transform([&pr, s, permit] (auto& ms) {
return make_mutation_reader_from_mutations(s, permit, {ms}, pr);
}) | std::ranges::to<std::vector>());
};
auto pr = dht::partition_range::make_open_ended_both_sides();
assert_that(make_reader(semaphore.make_permit(), pr))
.produces(keys[0])
.produces(keys[1])
.produces(keys[2])
.produces(keys[3])
.produces(keys[4])
.produces(keys[5])
.produces(keys[6])
.produces_end_of_stream();
pr = dht::partition_range::make(ring[0], ring[0]);
assert_that(make_reader(semaphore.make_permit(), pr))
.produces(keys[0])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(ring[1], ring[1]))
.produces(keys[1])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(ring[3], ring[4]))
.produces(keys[3])
.fast_forward_to(dht::partition_range::make({ ring[4], false }, ring[5]))
.produces(keys[5])
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make_starting_with(ring[6]))
.produces(keys[6])
.produces_end_of_stream();
});
}
SEASTAR_THREAD_TEST_CASE(test_fast_forwarding_combining_reader_with_galloping) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto pkeys = s.make_pkeys(7);
const auto ckeys = s.make_ckeys(10);
auto ring = to_ring_positions(pkeys);
auto make_n_mutations = [&] (auto ckeys, int n) {
utils::chunked_vector<mutation> ret;
for (int i = 0; i < n; i++) {
ret.push_back(make_partition_with_clustering_rows(s, pkeys[i], ckeys));
}
return ret;
};
auto pr = dht::partition_range::make(ring[0], ring[0]);
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(std::views::iota(0, 5), 7), pr));
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(std::views::iota(5, 10), 7), pr));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
.produces(make_partition_with_clustering_rows(s, pkeys[0], std::views::iota(0, 10)))
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(ring[1], ring[1]))
.produces(make_partition_with_clustering_rows(s, pkeys[1], std::views::iota(0, 10)))
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(ring[3], ring[4]))
.produces(make_partition_with_clustering_rows(s, pkeys[3], std::views::iota(0, 10)))
.fast_forward_to(dht::partition_range::make({ ring[4], false }, ring[5]))
.produces(make_partition_with_clustering_rows(s, pkeys[5], std::views::iota(0, 10)))
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make_starting_with(ring[6]))
.produces(make_partition_with_clustering_rows(s, pkeys[6], std::views::iota(0, 10)))
.produces_end_of_stream();
}
SEASTAR_TEST_CASE(test_sm_fast_forwarding_combining_reader) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto pkeys = s.make_pkeys(4);
const auto ckeys = s.make_ckeys(4);
auto make_mutation = [&] (uint32_t n) {
mutation m(s.schema(), pkeys[n]);
int i{0};
s.add_row(m, ckeys[i], format("val_{:d}", i));
++i;
s.add_row(m, ckeys[i], format("val_{:d}", i));
++i;
s.add_row(m, ckeys[i], format("val_{:d}", i));
++i;
s.add_row(m, ckeys[i], format("val_{:d}", i));
return m;
};
std::vector<utils::chunked_vector<mutation>> readers_mutations{
{make_mutation(0), make_mutation(1), make_mutation(2), make_mutation(3)},
{make_mutation(0)},
{make_mutation(2)},
};
std::vector<mutation_reader> readers;
for (auto& mutations : readers_mutations) {
readers.emplace_back(make_mutation_reader_from_mutations(s.schema(), permit, mutations, streamed_mutation::forwarding::yes));
}
assert_that(make_combined_reader(s.schema(), permit, std::move(readers), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no))
.produces_partition_start(pkeys[0])
.produces_end_of_stream()
.fast_forward_to(position_range::all_clustered_rows())
.produces_row_with_key(ckeys[0])
.next_partition()
.produces_partition_start(pkeys[1])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[2]), position_in_partition::after_key(*s.schema(), ckeys[2])))
.produces_row_with_key(ckeys[2])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::after_key(*s.schema(), ckeys[2]), position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(ckeys[3])
.produces_end_of_stream()
.next_partition()
.produces_partition_start(pkeys[2])
.fast_forward_to(position_range::all_clustered_rows())
.produces_row_with_key(ckeys[0])
.produces_row_with_key(ckeys[1])
.produces_row_with_key(ckeys[2])
.produces_row_with_key(ckeys[3])
.produces_end_of_stream();
});
}
SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping) {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
const auto pkeys = s.make_pkeys(3);
const auto ckeys = s.make_ckeys(10);
auto ring = to_ring_positions(pkeys);
auto make_n_mutations = [&] (auto ckeys, int n) {
utils::chunked_vector<mutation> ret;
for (int i = 0; i < n; i++) {
ret.push_back(make_partition_with_clustering_rows(s, pkeys[i], ckeys));
}
return ret;
};
auto pr = dht::partition_range::make(ring[0], ring[0]);
std::vector<mutation_reader> v;
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(std::views::iota(0, 5), 3), streamed_mutation::forwarding::yes));
v.push_back(make_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(std::views::iota(5, 10), 3), streamed_mutation::forwarding::yes));
auto reader = make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
auto assertions = assert_that(std::move(reader));
assertions.produces_partition_start(pkeys[0])
.produces_end_of_stream()
.fast_forward_to(position_range::all_clustered_rows())
.produces_row_with_key(ckeys[0])
.produces_row_with_key(ckeys[1])
.produces_row_with_key(ckeys[2])
.produces_row_with_key(ckeys[3])
.next_partition()
.produces_partition_start(pkeys[1])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[0]), position_in_partition::after_key(*s.schema(), ckeys[3])))
.produces_row_with_key(ckeys[0])
.produces_row_with_key(ckeys[1])
.produces_row_with_key(ckeys[2])
.produces_row_with_key(ckeys[3])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::after_key(*s.schema(), ckeys[6]), position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(ckeys[7])
.produces_row_with_key(ckeys[8])
.produces_row_with_key(ckeys[9])
.produces_end_of_stream()
.next_partition()
.produces_partition_start(pkeys[2])
.fast_forward_to(position_range::all_clustered_rows());
for (int i = 0; i < 10; i++) {
assertions.produces_row_with_key(ckeys[i]);
}
assertions.produces_end_of_stream();
}
class selector_of_empty_readers : public reader_selector {
schema_ptr _schema;
reader_permit _permit;
size_t _remaining;
public:
selector_of_empty_readers(schema_ptr s, reader_permit permit, size_t count)
: reader_selector(s, dht::ring_position_view::min(), count)
, _schema(s)
, _permit(std::move(permit))
, _remaining(count) {
}
virtual std::vector<mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
if (_remaining == 0) {
return {};
}
--_remaining;
std::vector<mutation_reader> ret;
ret.push_back(make_empty_mutation_reader(_schema, _permit));
return ret;
}
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
SCYLLA_ASSERT(false); // Fast forward not supported by this reader
return {};
}
};
// Reproduces scylladb/scylladb#14415
SEASTAR_THREAD_TEST_CASE(test_combined_reader_with_incrementally_opened_empty_readers) {
static constexpr size_t empty_reader_count = 10 * 1000;
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto reader = make_combined_reader(s.schema(), permit,
std::make_unique<selector_of_empty_readers>(s.schema(), permit, empty_reader_count),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
// Expect that the reader won't produce a stack overflow
assert_that(std::move(reader))
.produces_end_of_stream();
}
SEASTAR_TEST_CASE(combined_mutation_reader_test) {
return sstables::test_env::do_with_async([] (sstables::test_env& env) {
simple_schema s;
auto pkeys = s.make_pkeys(6);
const auto ckeys = s.make_ckeys(4);
std::ranges::sort(pkeys, [&s] (const dht::decorated_key& a, const dht::decorated_key& b) {
return a.less_compare(*s.schema(), b);
});
auto make_sstable_mutations = [&] (sstring value_prefix, unsigned ckey_index, bool static_row, std::vector<unsigned> pkey_indexes) {
utils::chunked_vector<mutation> muts;
for (auto pkey_index : pkey_indexes) {
muts.emplace_back(s.schema(), pkeys[pkey_index]);
auto& mut = muts.back();
s.add_row(mut, ckeys[ckey_index], format("{}_{:d}_val", value_prefix, ckey_index));
if (static_row) {
s.add_static_row(mut, format("{}_static_val", value_prefix));
}
}
return muts;
};
utils::chunked_vector<mutation> sstable_level_0_0_mutations = make_sstable_mutations("level_0_0", 0, true, {0, 1, 4 });
utils::chunked_vector<mutation> sstable_level_1_0_mutations = make_sstable_mutations("level_1_0", 1, false, {0, 1 });
utils::chunked_vector<mutation> sstable_level_1_1_mutations = make_sstable_mutations("level_1_1", 1, false, { 2, 3 });
utils::chunked_vector<mutation> sstable_level_2_0_mutations = make_sstable_mutations("level_2_0", 2, false, { 1, 4 });
utils::chunked_vector<mutation> sstable_level_2_1_mutations = make_sstable_mutations("level_2_1", 2, false, { 5});
const mutation expexted_mutation_0 = sstable_level_0_0_mutations[0] + sstable_level_1_0_mutations[0];
const mutation expexted_mutation_1 = sstable_level_0_0_mutations[1] + sstable_level_1_0_mutations[1] + sstable_level_2_0_mutations[0];
const mutation expexted_mutation_2 = sstable_level_1_1_mutations[0];
const mutation expexted_mutation_3 = sstable_level_1_1_mutations[1];
const mutation expexted_mutation_4 = sstable_level_0_0_mutations[2] + sstable_level_2_0_mutations[1];
const mutation expexted_mutation_5 = sstable_level_2_1_mutations[0];
auto sst_factory = [&, s = s.schema()] (uint32_t level) {
auto sst = env.make_sstable(s);
sst->set_sstable_level(level);
return sst;
};
std::vector<sstables::shared_sstable> sstable_list = {
make_sstable_containing(sst_factory(0), std::move(sstable_level_0_0_mutations)),
make_sstable_containing(sst_factory(1), std::move(sstable_level_1_0_mutations)),
make_sstable_containing(sst_factory(1), std::move(sstable_level_1_1_mutations)),
make_sstable_containing(sst_factory(2), std::move(sstable_level_2_0_mutations)),
make_sstable_containing(sst_factory(2), std::move(sstable_level_2_1_mutations)),
};
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::leveled, {});
auto sstable_set = make_lw_shared<sstables::sstable_set>(env.make_sstable_set(cs, s.schema()));
std::vector<mutation_reader> sstable_mutation_readers;
auto list_permit = env.make_reader_permit();
for (auto sst : sstable_list) {
sstable_set->insert(sst);
sstable_mutation_readers.emplace_back(
sst->as_mutation_source().make_mutation_reader(
s.schema(),
list_permit,
query::full_partition_range,
s.schema()->full_slice(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no));
}
auto list_reader = make_combined_reader(s.schema(), list_permit,
std::move(sstable_mutation_readers));
auto incremental_reader = sstable_set->make_local_shard_sstable_reader(
s.schema(),
env.make_reader_permit(),
query::full_partition_range,
s.schema()->full_slice(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
assert_that(std::move(list_reader))
.produces(expexted_mutation_0)
.produces(expexted_mutation_1)
.produces(expexted_mutation_2)
.produces(expexted_mutation_3)
.produces(expexted_mutation_4)
.produces(expexted_mutation_5)
.produces_end_of_stream();
assert_that(std::move(incremental_reader))
.produces(expexted_mutation_0)
.produces(expexted_mutation_1)
.produces(expexted_mutation_2)
.produces(expexted_mutation_3)
.produces(expexted_mutation_4)
.produces(expexted_mutation_5)
.produces_end_of_stream();
});
}
static mutation make_mutation_with_key(simple_schema& s, dht::decorated_key dk) {
static int i{0};
mutation m(s.schema(), std::move(dk));
s.add_row(m, s.make_ckey(++i), format("val_{:d}", i));
return m;
}
class dummy_incremental_selector : public reader_selector {
// To back _selector_position.
schema_ptr _schema;
reader_permit _permit;
dht::ring_position _position;
std::vector<utils::chunked_vector<mutation>> _readers_mutations;
streamed_mutation::forwarding _fwd;
dht::partition_range _pr;
mutation_reader pop_reader() {
auto muts = std::move(_readers_mutations.back());
_readers_mutations.pop_back();
_position = _readers_mutations.empty() ? dht::ring_position::max() : _readers_mutations.back().front().decorated_key();
_selector_position = _position;
return make_mutation_reader_from_mutations(_schema, _permit, std::move(muts), _pr, _fwd);
}
public:
// readers_mutations is expected to be sorted on both levels.
// 1) the inner vector is expected to be sorted by decorated_key.
// 2) the outer vector is expected to be sorted by the decorated_key
// of its first mutation.
dummy_incremental_selector(schema_ptr s,
reader_permit permit,
std::vector<utils::chunked_vector<mutation>> reader_mutations,
dht::partition_range pr = query::full_partition_range,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no)
: reader_selector(s, dht::ring_position_view::min(),
std::accumulate(reader_mutations.begin(), reader_mutations.end(), 0, [](size_t count, const auto& readers) { return count + readers.size(); }))
, _schema(s)
, _permit(std::move(permit))
, _position(dht::ring_position::min())
, _readers_mutations(std::move(reader_mutations))
, _fwd(fwd)
, _pr(std::move(pr)) {
// So we can pop the next reader off the back
std::ranges::reverse(_readers_mutations);
}
virtual std::vector<mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
if (_readers_mutations.empty()) {
return {};
}
std::vector<mutation_reader> readers;
if (!pos) {
readers.emplace_back(pop_reader());
return readers;
}
while (!_readers_mutations.empty() && dht::ring_position_tri_compare(*_s, _selector_position, *pos) <= 0) {
readers.emplace_back(pop_reader());
}
return readers;
}
virtual std::vector<mutation_reader> fast_forward_to(const dht::partition_range& pr) override {
_pr = pr;
return create_new_readers(dht::ring_position_view::for_range_start(_pr));
}
};
SEASTAR_TEST_CASE(reader_selector_gap_between_readers_test) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pkeys = s.make_pkeys(3);
std::ranges::sort(pkeys, [&s] (const dht::decorated_key& a, const dht::decorated_key& b) {
return a.less_compare(*s.schema(), b);
});
auto mut1 = make_mutation_with_key(s, pkeys[0]);
auto mut2a = make_mutation_with_key(s, pkeys[1]);
auto mut2b = make_mutation_with_key(s, pkeys[1]);
auto mut3 = make_mutation_with_key(s, pkeys[2]);
std::vector<utils::chunked_vector<mutation>> readers_mutations{
{mut1},
{mut2a},
{mut2b},
{mut3}
};
auto permit = semaphore.make_permit();
auto reader = make_combined_reader(s.schema(), permit,
std::make_unique<dummy_incremental_selector>(s.schema(), permit, std::move(readers_mutations)),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
assert_that(std::move(reader))
.produces_partition(mut1)
.produces_partition(mut2a + mut2b)
.produces_partition(mut3)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(reader_selector_overlapping_readers_test) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pkeys = s.make_pkeys(4);
std::ranges::sort(pkeys, [&s] (const dht::decorated_key& a, const dht::decorated_key& b) {
return a.less_compare(*s.schema(), b);
});
auto mut1 = make_mutation_with_key(s, pkeys[0]);
auto mut2a = make_mutation_with_key(s, pkeys[1]);
auto mut2b = make_mutation_with_key(s, pkeys[1]);
auto mut3a = make_mutation_with_key(s, pkeys[2]);
auto mut3b = make_mutation_with_key(s, pkeys[2]);
auto mut3c = make_mutation_with_key(s, pkeys[2]);
auto mut4a = make_mutation_with_key(s, pkeys[3]);
auto mut4b = make_mutation_with_key(s, pkeys[3]);
tombstone tomb(100, {});
mut2b.partition().apply(tomb);
s.add_row(mut2a, s.make_ckey(1), "a");
s.add_row(mut2b, s.make_ckey(2), "b");
s.add_row(mut3a, s.make_ckey(1), "a");
s.add_row(mut3b, s.make_ckey(2), "b");
s.add_row(mut3c, s.make_ckey(3), "c");
s.add_row(mut4a, s.make_ckey(1), "a");
s.add_row(mut4b, s.make_ckey(2), "b");
std::vector<utils::chunked_vector<mutation>> readers_mutations{
{mut1, mut2a, mut3a},
{mut2b, mut3b},
{mut3c, mut4a},
{mut4b},
};
auto permit = semaphore.make_permit();
auto reader = make_combined_reader(s.schema(), permit,
std::make_unique<dummy_incremental_selector>(s.schema(), permit, std::move(readers_mutations)),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
assert_that(std::move(reader))
.produces_partition(mut1)
.produces_partition(mut2a + mut2b)
.produces_partition(mut3a + mut3b + mut3c)
.produces_partition(mut4a + mut4b)
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(reader_selector_fast_forwarding_test) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pkeys = s.make_pkeys(5);
std::ranges::sort(pkeys, [&s] (const dht::decorated_key& a, const dht::decorated_key& b) {
return a.less_compare(*s.schema(), b);
});
auto mut1a = make_mutation_with_key(s, pkeys[0]);
auto mut1b = make_mutation_with_key(s, pkeys[0]);
auto mut2a = make_mutation_with_key(s, pkeys[1]);
auto mut2c = make_mutation_with_key(s, pkeys[1]);
auto mut3a = make_mutation_with_key(s, pkeys[2]);
auto mut3d = make_mutation_with_key(s, pkeys[2]);
auto mut4b = make_mutation_with_key(s, pkeys[3]);
auto mut5b = make_mutation_with_key(s, pkeys[4]);
std::vector<utils::chunked_vector<mutation>> readers_mutations{
{mut1a, mut2a, mut3a},
{mut1b, mut4b, mut5b},
{mut2c},
{mut3d},
};
auto permit = semaphore.make_permit();
auto reader = make_combined_reader(s.schema(), permit,
std::make_unique<dummy_incremental_selector>(s.schema(), permit,
std::move(readers_mutations),
dht::partition_range::make_ending_with(dht::partition_range::bound(pkeys[1], false))),
streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes);
assert_that(std::move(reader))
.produces_partition(mut1a + mut1b)
.produces_end_of_stream()
.fast_forward_to(dht::partition_range::make(dht::partition_range::bound(pkeys[2], true), dht::partition_range::bound(pkeys[3], true)))
.produces_partition(mut3a + mut3d)
.fast_forward_to(dht::partition_range::make_starting_with(dht::partition_range::bound(pkeys[4], true)))
.produces_partition(mut5b)
.produces_end_of_stream();
});
}
static mutation compacted(const mutation& m) {
auto result = m;
result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), gc_clock::now(), tombstone_gc_state::for_tests());
return result;
}
SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicing) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
auto permit = env.make_reader_permit();
const int n_readers = 10;
auto keys = gen.make_partition_keys(3);
utils::chunked_vector<mutation> combined;
std::list<dht::partition_range> reader_ranges;
std::vector<mutation_reader> readers;
for (int i = 0; i < n_readers; ++i) {
utils::chunked_vector<mutation> muts;
for (auto&& key : keys) {
mutation m = compacted(gen());
muts.push_back(mutation(s, key, std::move(m.partition())));
}
if (combined.empty()) {
combined = muts;
} else {
int j = 0;
for (auto&& m : muts) {
combined[j++].apply(m);
}
}
mutation_source ds = make_sstable_containing(env.make_sstable(s), muts)->as_mutation_source();
reader_ranges.push_back(dht::partition_range::make({keys[0]}, {keys[0]}));
readers.push_back(ds.make_mutation_reader(s,
permit,
reader_ranges.back(),
s->full_slice(), nullptr,
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::yes));
}
auto rd = mutation_fragment_v1_stream(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::yes));
auto close_rd = deferred_close(rd);
std::vector<query::clustering_range> ranges = gen.make_random_ranges(3);
auto check_next_partition = [&] (const mutation& expected) {
mutation result(expected.schema(), expected.decorated_key());
rd.consume_pausable([&](mutation_fragment&& mf) {
position_in_partition::less_compare less(*s);
if (!less(mf.position(), position_in_partition_view::before_all_clustered_rows())) {
BOOST_FAIL(format("Received clustering fragment: {}", mutation_fragment::printer(*s, mf)));
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
for (auto&& range : ranges) {
auto prange = position_range(range);
rd.fast_forward_to(prange).get();
rd.consume_pausable([&](mutation_fragment&& mf) {
if (!mf.relevant_for_range(*s, prange.start())) {
BOOST_FAIL(format("Received fragment which is not relevant for range: {}, range: {}", mutation_fragment::printer(*s, mf), prange));
}
position_in_partition::less_compare less(*s);
if (!less(mf.position(), prange.end())) {
BOOST_FAIL(format("Received fragment is out of range: {}, range: {}", mutation_fragment::printer(*s, mf), prange));
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
}
assert_that(result).is_equal_to(expected, ranges);
};
check_next_partition(combined[0]);
auto prange = dht::partition_range::make_singular(keys[2]);
rd.fast_forward_to(prange).get();
check_next_partition(combined[2]);
});
}
SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones) {
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
simple_schema ss;
auto s = ss.schema();
auto rt1 = ss.make_range_tombstone(ss.make_ckey_range(1, 10));
auto rt2 = ss.make_range_tombstone(ss.make_ckey_range(1, 5)); // rt1 + rt2 = {[1, 5], (5, 10]}
mutation m1(ss.schema(), tests::generate_partition_key(ss.schema()));
m1.partition().apply_delete(*s, rt1);
mutation m2 = m1;
m2.partition().apply_delete(*s, rt2);
ss.add_row(m2, ss.make_ckey(4), "v2"); // position after rt2.position() but before rt2.end_position().
std::vector<mutation_reader> readers;
mutation_source ds1 = make_sstable_containing(env.make_sstable(s), {m1})->as_mutation_source();
mutation_source ds2 = make_sstable_containing(env.make_sstable(s), {m2})->as_mutation_source();
// upper bound ends before the row in m2, so that the raw is fetched after next fast forward.
auto range = ss.make_ckey_range(0, 3);
{
auto permit = env.make_reader_permit();
auto slice = partition_slice_builder(*s).with_range(range).build();
readers.push_back(ds1.make_mutation_reader(s, permit, query::full_partition_range, slice));
readers.push_back(ds2.make_mutation_reader(s, permit, query::full_partition_range, slice));
auto rd = mutation_fragment_v1_stream(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
auto close_rd = deferred_close(rd);
auto prange = position_range(range);
mutation result(m1.schema(), m1.decorated_key());
rd.consume_pausable([&] (mutation_fragment&& mf) {
if (mf.position().has_clustering_key() && !mf.range(*s).overlaps(*s, prange.start(), prange.end())) {
BOOST_FAIL(format("Received fragment which is not relevant for the slice: {}, slice: {}", mutation_fragment::printer(*s, mf), range));
}
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range}));
}
// Check fast_forward_to()
{
auto permit = env.make_reader_permit();
readers.push_back(ds1.make_mutation_reader(s, permit, query::full_partition_range, s->full_slice(),
nullptr, streamed_mutation::forwarding::yes));
readers.push_back(ds2.make_mutation_reader(s, permit, query::full_partition_range, s->full_slice(),
nullptr, streamed_mutation::forwarding::yes));
auto rd = mutation_fragment_v1_stream(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::yes, mutation_reader::forwarding::no));
auto close_rd = deferred_close(rd);
auto prange = position_range(range);
mutation result(m1.schema(), m1.decorated_key());
rd.consume_pausable([&](mutation_fragment&& mf) {
BOOST_REQUIRE(!mf.position().has_clustering_key());
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
}).get();
rd.fast_forward_to(prange).get();
position_in_partition last_pos = position_in_partition::before_all_clustered_rows();
auto consume_clustered = [&] (mutation_fragment&& mf) {
position_in_partition::less_compare less(*s);
if (less(mf.position(), last_pos)) {
BOOST_FAIL(format("Out of order fragment: {}, last pos: {}", mutation_fragment::printer(*s, mf), last_pos));
}
last_pos = position_in_partition(mf.position());
result.partition().apply(*s, std::move(mf));
return stop_iteration::no;
};
rd.consume_pausable(consume_clustered).get();
rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get();
rd.consume_pausable(consume_clustered).get();
assert_that(result).is_equal_to(m1 + m2);
}
});
}
SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
// Creates a mutation source which combines N mutation sources with mutation fragments spread
// among them in a round robin fashion.
auto make_combined_populator = [&semaphore] (int n_sources) mutable {
return [=, &semaphore] (schema_ptr s, const utils::chunked_vector<mutation>& muts) mutable {
std::vector<lw_shared_ptr<replica::memtable>> memtables;
for (int i = 0; i < n_sources; ++i) {
memtables.push_back(make_lw_shared<replica::memtable>(s));
}
for (auto&& m : muts) {
auto rd = make_mutation_reader_from_mutations(s, semaphore.make_permit(), m);
auto close_rd = deferred_close(rd);
auto muts = rd.consume(fragment_scatterer(s, n_sources)).get();
for (int i = 0; i < n_sources; ++i) {
memtables[i]->apply(std::move(muts[i]));
}
}
std::vector<mutation_source> sources;
for (auto&& mt : memtables) {
sources.push_back(mt->as_data_source());
}
return make_combined_mutation_source(std::move(sources));
};
};
run_mutation_source_tests(make_combined_populator(1));
run_mutation_source_tests(make_combined_populator(2));
run_mutation_source_tests(make_combined_populator(3));
});
}
// Best run with SMP >= 2
SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source, *test_label::label("nightly")) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
auto populate = [&env] (schema_ptr s, const utils::chunked_vector<mutation>& mutations) {
const auto remote_shard = (this_shard_id() + 1) % smp::count;
auto frozen_mutations =
mutations
| std::views::transform([] (const mutation& m) { return freeze(m); })
| std::ranges::to<utils::chunked_vector<frozen_mutation>>();
auto remote_mt = smp::submit_to(remote_shard, [s = global_schema_ptr(s), &frozen_mutations] {
auto mt = make_lw_shared<replica::memtable>(s.get());
for (auto& mut : frozen_mutations) {
mt->apply(mut, s.get());
}
return make_foreign(mt);
}).get();
auto reader_factory = [&env, remote_shard, remote_mt = std::move(remote_mt)] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto remote_reader = env.db().invoke_on(remote_shard,
[&, s = global_schema_ptr(s), fwd_sm, fwd_mr, trace_state = tracing::global_trace_state_ptr(trace_state)] (replica::database& db) {
return make_foreign(std::make_unique<mutation_reader>(remote_mt->make_mutation_reader(s.get(),
make_reader_permit(env),
range,
slice,
trace_state.get(),
fwd_sm,
fwd_mr)));
}).get();
return make_foreign_reader(s, std::move(permit), std::move(remote_reader), fwd_sm);
};
auto reader_factory_ptr = make_lw_shared<decltype(reader_factory)>(std::move(reader_factory));
return mutation_source([reader_factory_ptr] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
return (*reader_factory_ptr)(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd_sm, fwd_mr);
});
};
run_mutation_source_tests(populate);
return make_ready_future<>();
}).get();
}
SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) {
struct null { };
struct missing { };
struct key {
int c0;
std::variant<int, null, missing> c1;
key(int c0, int c1) : c0(c0), c1(c1) { }
key(int c0, null) : c0(c0), c1(null{}) { }
key(int c0) : c0(c0), c1(missing{}) { }
clustering_key to_clustering_key(const schema& s) const {
std::vector<bytes> v;
v.push_back(int32_type->decompose(data_value(c0)));
std::visit(make_visitor(
[&v] (int c1) { v.push_back(int32_type->decompose(data_value(c1))); },
[&v] (null c1) { v.push_back(bytes{}); },
[] (missing) { }),
c1);
return clustering_key::from_exploded(s, std::move(v));
}
};
struct incl {
key value;
incl(int c0, int c1) : value(c0, c1) { }
incl(int c0, null) : value(c0, null{}) { }
incl(int c0) : value(c0) { }
};
struct excl {
key value;
excl(int c0, int c1) : value(c0, c1) { }
excl(int c0, null) : value(c0, null{}) { }
excl(int c0) : value(c0) { }
};
struct bound {
key value;
bool inclusive;
bound(incl b) : value(b.value), inclusive(true) { }
bound(excl b) : value(b.value), inclusive(false) { }
};
struct inf {
};
struct range {
std::optional<bound> start;
std::optional<bound> end;
bool singular = false;
range(bound s, bound e) : start(s), end(e) { }
range(inf, bound e) : end(e) { }
range(bound s, inf) : start(s) { }
range(inf, inf) { }
range(bound b) : start(b), end(b), singular(true) { }
static std::optional<interval_bound<clustering_key>> to_bound(const schema& s, std::optional<bound> b) {
if (b) {
return interval_bound<clustering_key>(b->value.to_clustering_key(s), b->inclusive);
}
return {};
}
query::clustering_range to_clustering_range(const schema& s) const {
return query::clustering_range(to_bound(s, start), to_bound(s, end), singular);
}
};
const auto schema = schema_builder("ks", get_name())
.with_column("p0", int32_type, column_kind::partition_key)
.with_column("c0", int32_type, column_kind::clustering_key)
.with_column("c1", int32_type, column_kind::clustering_key)
.with_column("v1", int32_type, column_kind::regular_column)
.build();
const auto check = [](std::vector<range> ranges, key key, std::vector<range> output_ranges, schema_ptr schema,
std::source_location sl = std::source_location::current()) {
auto actual_ranges = ranges | std::views::transform(
[&] (const range& r) { return r.to_clustering_range(*schema); })
| std::ranges::to<query::clustering_row_ranges>();
query::trim_clustering_row_ranges_to(*schema, actual_ranges, key.to_clustering_key(*schema));
const auto expected_ranges = output_ranges | std::views::transform(
[&] (const range& r) { return r.to_clustering_range(*schema); })
| std::ranges::to<query::clustering_row_ranges>();
if (!std::equal(actual_ranges.begin(), actual_ranges.end(), expected_ranges.begin(), expected_ranges.end(),
[tri_cmp = clustering_key::tri_compare(*schema)] (const query::clustering_range& a, const query::clustering_range& b) {
return a.equal(b, tri_cmp);
})) {
BOOST_FAIL(fmt::format("Unexpected result\nexpected {}\ngot {}\ncalled from {}:{}", expected_ranges, actual_ranges, sl.file_name(), sl.line()));
}
};
auto check_forward = [schema, &check] (std::vector<range> ranges, key key, std::vector<range> output_ranges,
std::source_location sl = std::source_location::current()) {
return check(std::move(ranges), std::move(key), std::move(output_ranges), std::move(schema), sl);
};
auto check_reversed = [schema = schema->make_reversed(), &check] (std::vector<range> ranges, key key, std::vector<range> output_ranges,
std::source_location sl = std::source_location::current()) {
return check(std::move(ranges), std::move(key), std::move(output_ranges), std::move(schema), sl);
};
// We want to check the following cases:
// 1) Before range
// 2) Equal to begin(range with incl begin)
// 3) Equal to begin(range with excl begin)
// 4) Intersect with range (excl end)
// 5) Intersect with range (incl end)
// 6) Intersect with range (inf end)
// 7) Equal to end(range with incl end)
// 8) Equal to end(range with excl end)
// 9) After range
// 10) Full range
// 11) Prefix key is before range
// 12) Prefix key is equal to prefix start of range
// 13) Prefix key intersects with range
// 14) Prefix key is after range
// (1)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{1, 0},
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (2)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{1, 6},
{ {excl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (2) - prefix
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{3, 6},
{ {excl{3, 6}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (3)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 3},
{ {excl{2, 3}, incl{2, 4}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (3) - prefix
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {excl{2, 3}, incl{2, 4}}, {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{3, 7},
{ {excl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (4)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{2, 0},
{ {excl{2, 0}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} });
// (5)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, incl{999, 0}} },
{90, 90},
{ {excl{90, 90}, incl{999, 0}} });
// (6)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, inf{}} },
{90, 90},
{ {excl{90, 90}, inf{}} });
// (7)
check_forward(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 3},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (7) - prefix
check_forward(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, incl{4}}, {incl{7, 9}, excl{999, 0}} },
{4, 39},
{ {excl{4, 39}, incl{4}}, {incl{7, 9}, excl{999, 0}} });
// (8)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 3},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (8) - prefix
check_forward(
{ {incl{1, 6}, incl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{4, 11},
{ {incl{7, 9}, excl{999, 0}} });
// (9)
check_forward(
{ {incl{1, 6}, excl{2, 3}}, {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} },
{2, 4},
{ {incl{3}, excl{4}}, {incl{7, 9}, excl{999, 0}} });
// (10)
check_forward(
{ {inf{}, inf{}} },
{7, 9},
{ {excl{7, 9}, inf{}} });
// (11)
check_forward(
{ {incl{10, 10}, excl{10, 30}} },
{10},
{ {incl{10, 10}, excl{10, 30}} });
// (12)
check_forward(
{ {incl{10}, excl{10, 30}} },
{10},
{ {incl{10, null{}}, excl{10, 30}} });
// (13)
check_forward(
{ {incl{9, 10}, excl{10, 30}} },
{10},
{ {incl{10, null{}}, excl{10, 30}} });
// (14)
check_forward(
{ {incl{9, 10}, excl{10, 30}} },
{11},
{ });
// In reversed now
// (1)
check_reversed(
{ {incl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} },
{999, 1},
{ {incl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} });
// (2)
check_reversed(
{ {incl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {incl{2, 4}, excl{2, 3}}, {excl{2, 3}, incl{1, 6}} },
{2, 4},
{ {excl{2, 4}, excl{2, 3}}, {excl{2, 3}, incl{1, 6}} });
// (2) - prefix
check_reversed(
{ {incl{999, 0}, incl{7, 9}}, {incl{4}, incl{3}} , {incl{2, 4}, excl{2, 3}}, {excl{2, 3}, incl{1, 6}} },
{4, 43453},
{ {excl{4, 43453}, incl{3}}, {incl{2, 4}, excl{2, 3}}, {excl{2, 3}, incl{1, 6}} });
// (3)
check_reversed(
{ { incl{999, 0}, incl{7, 9} }, { excl{4}, incl{3} }, { excl{2, 3}, incl{1, 6} } },
{2, 3},
{ {excl{2, 3}, incl{1, 6}} });
// (3) - prefix
check_reversed(
{ {incl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} },
{4, 3},
{ {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} });
// (4)
check_reversed(
{ {excl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} },
{8, 0},
{ {excl{8, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} });
// (5)
check_reversed(
{ {incl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} },
{90, 90},
{ {excl{90, 90}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} });
// (6)
check_reversed(
{ {inf{}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, inf{}} },
{1, 90},
{ {excl{1, 90}, inf{}} });
// (7)
check_reversed(
{ {excl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {incl{2, 3}, incl{1, 6}} },
{7, 9},
{ {excl{4}, incl{3}}, {incl{2, 3}, incl{1, 6}} });
// (7) - prefix
check_reversed(
{ {excl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {incl{2, 3}, incl{1, 6}} },
{3, 673},
{ {excl{3, 673}, incl{3}}, {incl{2, 3}, incl{1, 6}} });
// (8)
check_reversed(
{ {excl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, excl{1, 6}} },
{1, 6},
{ });
// (8) - prefix
check_reversed(
{ {excl{999, 0}, incl{7, 9} }, {excl{4}, excl{3} }, {incl{2, 3}, incl{1, 6} } },
{3, 673},
{ {incl{2, 3}, incl{1, 6}} });
// (9)
check_reversed(
{ {excl{999, 0}, incl{7, 9}}, {excl{4}, incl{3}}, {excl{2, 3}, incl{1, 6}} },
{0, 4},
{});
// (10)
check_reversed(
{ {inf{}, inf{}} },
{7, 9},
{ {excl{7, 9}, inf{}} });
// (11)
check_reversed(
{ {excl{10, 30}, incl{10, 10}} },
{11},
{ {excl{10, 30}, incl{10, 10}} });
// (12)
check_reversed(
{ {incl{10}, excl{9, 39}} },
{10},
{ {incl{10, null{}}, excl{9, 39}} });
// (13)
check_reversed(
{ {incl{10, 30}, incl{9, 10}} },
{10},
{ {incl{10, null{}}, incl{9, 10}} });
// (14)
check_reversed(
{ {excl{10, 30}, incl{9, 10}} },
{9},
{ });
return make_ready_future<>();
}
// Best run with SMP >= 3
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_reading_empty_table) {
if (smp::count < 3) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
std::vector<std::atomic<bool>> shards_touched(smp::count);
simple_schema s;
env.execute_cql(s.cql()).get();
auto& table = env.db().local().find_column_family(s.schema()->ks_name(), s.schema()->cf_name());
auto erm = table.get_effective_replication_map();
auto factory = [&shards_touched] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
shards_touched[this_shard_id()] = true;
return make_empty_mutation_reader(s, std::move(permit));
};
assert_that(make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
s.schema(),
erm,
make_reader_permit(env),
query::full_partition_range,
s.schema()->full_slice()))
.produces_end_of_stream();
for (unsigned i = 0; i < smp::count; ++i) {
BOOST_REQUIRE(shards_touched.at(i));
}
return make_ready_future<>();
}).get();
}
// A reader that can controlled by it's "creator" after it's created.
//
// It can execute one of a set of actions on it's fill_buffer() call:
// * fill the buffer completely with generated data
// * block until the pupet master releases it
//
// It's primary purpose is to aid in testing multishard_combining_reader's
// read-ahead related corner-cases. It allows for the test code to have
// fine-grained control over which shard will fill the multishard reader's
// buffer and how much read-ahead it launches and consequently when the
// read-ahead terminates.
class puppet_reader : public mutation_reader::impl {
public:
struct control {
promise<> buffer_filled;
bool destroyed = true;
bool pending = false;
unsigned fast_forward_to = 0;
};
enum class fill_buffer_action {
fill,
block
};
private:
simple_schema _s;
control& _ctrl;
std::vector<fill_buffer_action> _actions;
std::vector<uint32_t> _pkeys;
unsigned _partition_index = 0;
bool maybe_push_next_partition() {
if (_partition_index == _pkeys.size()) {
_end_of_stream = true;
return false;
}
push_mutation_fragment(*_s.schema(), _permit, partition_start(_s.make_pkey(_pkeys.at(_partition_index++)), {}));
return true;
}
void do_fill_buffer() {
if (!maybe_push_next_partition()) {
return;
}
auto ck = uint32_t(0);
while (!is_buffer_full()) {
push_mutation_fragment(*_s.schema(), _permit, _s.make_row_v2(_permit, _s.make_ckey(ck++), make_random_string(2 << 5)));
}
push_mutation_fragment(*_s.schema(), _permit, partition_end());
}
public:
puppet_reader(simple_schema s, reader_permit permit, control& ctrl, std::vector<fill_buffer_action> actions, std::vector<uint32_t> pkeys)
: impl(s.schema(), std::move(permit))
, _s(std::move(s))
, _ctrl(ctrl)
, _actions(std::move(actions))
, _pkeys(std::move(pkeys)) {
std::ranges::reverse(_actions);
_end_of_stream = false;
_ctrl.destroyed = false;
}
~puppet_reader() {
_ctrl.destroyed = true;
}
virtual future<> fill_buffer() override {
if (is_end_of_stream() || !is_buffer_empty()) {
return make_ready_future<>();
}
if (_actions.empty()) {
_end_of_stream = true;
return make_ready_future<>();
}
auto action = _actions.back();
_actions.pop_back();
switch (action) {
case fill_buffer_action::fill:
do_fill_buffer();
return make_ready_future<>();
case fill_buffer_action::block:
do_fill_buffer();
_ctrl.pending = true;
return _ctrl.buffer_filled.get_future().then([this] {
BOOST_REQUIRE(!_ctrl.destroyed);
_ctrl.pending = false;
return make_ready_future<>();
});
}
abort();
}
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
++_ctrl.fast_forward_to;
clear_buffer();
_end_of_stream = true;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
return make_ready_future<>();
};
};
// Test a background pending read-ahead.
//
// Foreign reader launches a new background read-ahead (fill_buffer()) after
// each remote operation (fill_buffer() and fast_forward_to()) is completed.
// This read-ahead executes on the background and is only synchronized with
// when a next remote operation is executed. If the reader is destroyed before
// this synchronization can happen then the remote read-ahead will outlive its
// owner. Check that when the reader is closed, it waits on any background
// readhead to complete gracefully and will not cause any memory errors.
//
// Theory of operation:
// 1) Call foreign_reader::fill_buffer() -> will start read-ahead in the
// background;
// 2) [shard 1] puppet_reader blocks the read-ahead;
// 3) Start closing the foreign_reader;
// 4) Unblock read-ahead -> the now orphan read-ahead fiber executes;
//
// Best run with smp >= 2
SEASTAR_THREAD_TEST_CASE(test_stopping_reader_with_pending_read_ahead) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
do_with_cql_env_thread([] (cql_test_env& env) -> future<> {
const auto shard_of_interest = (this_shard_id() + 1) % smp::count;
auto s = simple_schema();
auto remote_control_remote_reader = smp::submit_to(shard_of_interest, [&env, gs = global_simple_schema(s)] {
using control_type = foreign_ptr<std::unique_ptr<puppet_reader::control>>;
using reader_type = foreign_ptr<std::unique_ptr<mutation_reader>>;
auto control = make_foreign(std::make_unique<puppet_reader::control>());
auto reader = make_foreign(std::make_unique<mutation_reader>(make_mutation_reader<puppet_reader>(gs.get(),
make_reader_permit(env),
*control,
std::vector{puppet_reader::fill_buffer_action::fill, puppet_reader::fill_buffer_action::block},
std::vector<uint32_t>{0, 1})));
return make_ready_future<std::tuple<control_type, reader_type>>(std::tuple(std::move(control), std::move(reader)));
}).get();
auto& remote_control = std::get<0>(remote_control_remote_reader);
auto& remote_reader = std::get<1>(remote_control_remote_reader);
auto reader = make_foreign_reader(
s.schema(),
make_reader_permit(env),
std::move(remote_reader));
reader.fill_buffer().get();
BOOST_REQUIRE(!reader.is_buffer_empty());
BOOST_REQUIRE(!smp::submit_to(shard_of_interest, [remote_control = remote_control.get()] {
return remote_control->destroyed;
}).get());
bool buffer_filled = false;
auto destroyed_after_close = reader.close().then([&] {
// close should wait on readahead and complete
// only after `remote_control->buffer_filled.set_value()`
// is executed below.
BOOST_REQUIRE(buffer_filled);
return smp::submit_to(shard_of_interest, [remote_control = remote_control.get()] {
return remote_control->destroyed;
});
});
smp::submit_to(shard_of_interest, [remote_control = remote_control.get(), &buffer_filled] {
buffer_filled = true;
remote_control->buffer_filled.set_value();
}).get();
BOOST_REQUIRE(destroyed_after_close.get());
return make_ready_future<>();
}).get();
}
struct multishard_reader_for_read_ahead {
static const unsigned min_shards = 3;
static const unsigned blocked_shard = 2;
mutation_reader reader;
std::unique_ptr<dht::sharder> sharder;
std::vector<foreign_ptr<std::unique_ptr<puppet_reader::control>>> remote_controls;
std::unique_ptr<dht::partition_range> pr;
};
multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(simple_schema& s, reader_permit permit) {
auto remote_controls = std::vector<foreign_ptr<std::unique_ptr<puppet_reader::control>>>();
remote_controls.reserve(smp::count);
for (unsigned i = 0; i < smp::count; ++i) {
remote_controls.emplace_back(nullptr);
}
parallel_for_each(std::views::iota(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
return smp::submit_to(shard, [] {
return make_foreign(std::make_unique<puppet_reader::control>());
}).then([shard, &remote_controls] (foreign_ptr<std::unique_ptr<puppet_reader::control>>&& ctr) mutable {
remote_controls[shard] = std::move(ctr);
});
}).get();
// We need two tokens for each shard
std::map<dht::token, unsigned> pkeys_by_tokens;
for (unsigned i = 0; i < smp::count * 2; ++i) {
pkeys_by_tokens.emplace(s.make_pkey(i).token(), i);
}
auto shard_pkeys = std::vector<std::vector<uint32_t>>(smp::count, std::vector<uint32_t>{});
auto i = unsigned(0);
for (auto pkey : pkeys_by_tokens | std::views::values) {
shard_pkeys[i++ % smp::count].push_back(pkey);
}
auto remote_control_refs = std::vector<puppet_reader::control*>();
remote_control_refs.reserve(smp::count);
for (auto& rc : remote_controls) {
remote_control_refs.push_back(rc.get());
}
auto factory = [gs = global_simple_schema(s), remote_controls = std::move(remote_control_refs), shard_pkeys = std::move(shard_pkeys)] (
schema_ptr,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding) mutable {
const auto shard = this_shard_id();
auto actions = [shard] () -> std::vector<puppet_reader::fill_buffer_action> {
if (shard < multishard_reader_for_read_ahead::blocked_shard) {
return {puppet_reader::fill_buffer_action::fill};
} else if (shard == multishard_reader_for_read_ahead::blocked_shard) {
return {puppet_reader::fill_buffer_action::block};
} else {
return {};
}
}();
return make_mutation_reader<puppet_reader>(gs.get(), permit, *remote_controls.at(shard), std::move(actions), shard_pkeys.at(shard));
};
auto pr = std::make_unique<dht::partition_range>(dht::partition_range::make(dht::ring_position::starting_at(pkeys_by_tokens.begin()->first),
dht::ring_position::ending_at(pkeys_by_tokens.rbegin()->first)));
auto sharder = std::make_unique<dummy_sharder>(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
auto reader = make_multishard_combining_reader_for_tests(*sharder, seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
s.schema(), permit, *pr, s.schema()->full_slice());
return {std::move(reader), std::move(sharder), std::move(remote_controls), std::move(pr)};
}
// Regression test for #7945
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_custom_shard_number) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
auto no_shards = smp::count - 1;
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
std::vector<std::atomic<bool>> shards_touched(smp::count);
simple_schema s;
auto sharder = std::make_unique<dht::static_sharder>(no_shards, 0);
auto factory = [&shards_touched] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
shards_touched[this_shard_id()] = true;
return make_empty_mutation_reader(s, std::move(permit));
};
assert_that(make_multishard_combining_reader_for_tests(
*sharder,
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
s.schema(),
make_reader_permit(env),
query::full_partition_range,
s.schema()->full_slice()))
.produces_end_of_stream();
for (unsigned i = 0; i < no_shards; ++i) {
BOOST_REQUIRE(shards_touched[i]);
}
BOOST_REQUIRE(!shards_touched[no_shards]);
return make_ready_future<>();
}).get();
}
// Regression test for #8161
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_only_reads_from_needed_shards) {
if (smp::count < 2) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 2" << std::endl;
return;
}
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
std::vector<std::atomic<bool>> shards_touched(smp::count);
simple_schema s;
auto factory = [&shards_touched] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
shards_touched[this_shard_id()] = true;
return make_empty_mutation_reader(s, std::move(permit));
};
env.execute_cql(s.cql()).get();
auto& table = env.db().local().find_column_family(s.schema()->ks_name(), s.schema()->cf_name());
auto erm = table.get_effective_replication_map();
std::vector<bool> expected_shards_touched(smp::count);
const dht::sharder& sharder = erm->get_sharder(*s.schema());
dht::token start_token(0);
dht::token end_token(0);
const auto additional_shards = tests::random::get_int<unsigned>(0, smp::count - 1);
auto shard = sharder.shard_for_reads(start_token);
expected_shards_touched[shard] = true;
for (auto i = 0u; i < additional_shards; ++i) {
shard = (shard + 1) % smp::count;
end_token = sharder.token_for_next_shard_for_reads(end_token, shard);
expected_shards_touched[shard] = true;
}
const auto inclusive_end = !additional_shards || tests::random::get_bool();
auto pr = dht::partition_range::make(
dht::ring_position(start_token, dht::ring_position::token_bound::start),
dht::ring_position(end_token, inclusive_end ? dht::ring_position::token_bound::end : dht::ring_position::token_bound::start));
if (!inclusive_end) {
expected_shards_touched[shard] = false;
}
testlog.info("{}: including {} additional shards out of a total of {}, with an {} end", get_name(), additional_shards, smp::count,
inclusive_end ? "inclusive" : "exclusive");
assert_that(make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
s.schema(),
erm,
make_reader_permit(env),
pr,
s.schema()->full_slice()))
.produces_end_of_stream();
for (unsigned i = 0; i < smp::count; ++i) {
testlog.info("[{}]: {} == {}", i, shards_touched[i], expected_shards_touched[i]);
BOOST_CHECK(shards_touched[i] == expected_shards_touched[i]);
}
return make_ready_future<>();
}).get();
}
// Test a background pending read-ahead outliving the reader.
//
// The multishard reader will issue read-aheads according to its internal
// concurrency. This concurrency starts from 1 and is increased every time
// a remote reader blocks (buffer is empty) within the same fill_buffer() call.
// The read-ahead is run in the background and the fiber will not be
// synchronized with until the multishard reader reaches the shard in question
// with the normal reading. If the multishard reader is destroyed before the
// synchronization happens the fiber is orphaned. Test that the fiber is
// prepared for this possibility and doesn't attempt to read any members of any
// destroyed objects causing memory errors.
//
// Theory of operation:
// 1) First read a full buffer from shard 0;
// 2) Shard 0 has no more data so move on to shard 1; puppet reader's buffer is
// empty -> increase concurrency to 2 because we traversed to another shard
// in the same fill_buffer() call;
// 3) [shard 2] puppet reader -> read-ahead launched in the background but it's
// blocked;
// 4) Reader is destroyed;
// 5) Resume the shard 2's puppet reader -> the now orphan read-ahead fiber
// executes;
//
// Best run with smp >= 3
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending_read_ahead) {
if (smp::count < multishard_reader_for_read_ahead::min_shards) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < " << multishard_reader_for_read_ahead::min_shards << std::endl;
return;
}
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
auto s = simple_schema();
auto reader_sharder_remote_controls__ = prepare_multishard_reader_for_read_ahead_test(s, make_reader_permit(env));
auto&& reader = reader_sharder_remote_controls__.reader;
auto&& remote_controls = reader_sharder_remote_controls__.remote_controls;
// This will read shard 0's buffer only
reader.fill_buffer().get();
BOOST_REQUIRE(reader.is_buffer_full());
reader.detach_buffer();
// This will move to shard 1 and trigger read-ahead on shard 2
reader.fill_buffer().get();
BOOST_REQUIRE(reader.is_buffer_full());
// Check that shard with read-ahead is indeed blocked.
BOOST_REQUIRE(eventually_true([&] {
return smp::submit_to(multishard_reader_for_read_ahead::blocked_shard,
[control = remote_controls.at(multishard_reader_for_read_ahead::blocked_shard).get()] {
return control->pending;
}).get();
}));
// Destroy reader.
testlog.debug("Starting to close the reader");
auto fut = reader.close();
parallel_for_each(std::views::iota(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
return smp::submit_to(shard, [control = remote_controls.at(shard).get()] {
control->buffer_filled.set_value();
});
}).get();
fut.get();
testlog.debug("Reader is closed");
BOOST_REQUIRE(eventually_true([&] {
return map_reduce(std::views::iota(0u, smp::count), [&] (unsigned shard) {
return smp::submit_to(shard, [&remote_controls, shard] {
return remote_controls.at(shard)->destroyed;
});
},
true,
std::logical_and<bool>()).get();
}));
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pending_read_ahead) {
if (smp::count < multishard_reader_for_read_ahead::min_shards) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < " << multishard_reader_for_read_ahead::min_shards << std::endl;
return;
}
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
auto s = simple_schema();
auto reader_sharder_remote_controls_pr = prepare_multishard_reader_for_read_ahead_test(s, make_reader_permit(env));
auto&& reader = reader_sharder_remote_controls_pr.reader;
auto&& remote_controls = reader_sharder_remote_controls_pr.remote_controls;
auto&& pr = reader_sharder_remote_controls_pr.pr;
reader.fill_buffer().get();
BOOST_REQUIRE(reader.is_buffer_full());
reader.detach_buffer();
reader.fill_buffer().get();
BOOST_REQUIRE(reader.is_buffer_full());
reader.detach_buffer();
BOOST_REQUIRE(eventually_true([&] {
return smp::submit_to(multishard_reader_for_read_ahead::blocked_shard,
[control = remote_controls.at(multishard_reader_for_read_ahead::blocked_shard).get()] {
return control->pending;
}).get();
}));
auto end_token = dht::token(pr->end()->value().token());
++end_token._data;
auto next_pr = dht::partition_range::make_starting_with(dht::ring_position::starting_at(end_token));
auto fut = reader.fast_forward_to(next_pr);
smp::submit_to(multishard_reader_for_read_ahead::blocked_shard,
[control = remote_controls.at(multishard_reader_for_read_ahead::blocked_shard).get()] {
control->buffer_filled.set_value();
}).get();
fut.get();
const auto all_shard_fast_forwarded = map_reduce(
std::views::iota(0u, multishard_reader_for_read_ahead::blocked_shard + 1u),
[&] (unsigned shard) {
return smp::submit_to(shard, [control = remote_controls.at(shard).get()] {
return control->fast_forward_to == 1;
});
},
true,
std::logical_and<bool>()).get();
BOOST_REQUIRE(all_shard_fast_forwarded);
reader.fill_buffer().get();
BOOST_REQUIRE(reader.is_buffer_empty());
BOOST_REQUIRE(reader.is_end_of_stream());
reader.close().get();
BOOST_REQUIRE(eventually_true([&] {
return map_reduce(std::views::iota(0u, smp::count), [&] (unsigned shard) {
return smp::submit_to(shard, [&remote_controls, shard] {
return remote_controls.at(shard)->destroyed;
});
},
true,
std::logical_and<bool>()).get();
}));
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
env.execute_cql("CREATE KEYSPACE multishard_combining_reader_next_partition_ks"
" WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1};").get();
env.execute_cql("CREATE TABLE multishard_combining_reader_next_partition_ks.test (pk int, v int, PRIMARY KEY(pk));").get();
const auto insert_id = env.prepare("INSERT INTO multishard_combining_reader_next_partition_ks.test (\"pk\", \"v\") VALUES (?, ?);").get();
const auto partition_count = 1000;
for (int pk = 0; pk < partition_count; ++pk) {
env.execute_prepared(insert_id, {{
cql3::raw_value::make_value(serialized(pk)),
cql3::raw_value::make_value(serialized(0))}}).get();
}
auto schema = env.local_db().find_column_family("multishard_combining_reader_next_partition_ks", "test").schema();
auto& partitioner = schema->get_partitioner();
auto pkeys =
std::views::iota(0, partition_count) |
std::views::transform([schema, &partitioner] (int i) {
return partitioner.decorate_key(*schema, partition_key::from_singular(*schema, i));
}) |
std::ranges::to<std::vector<dht::decorated_key>>();
// We want to test corner cases around next_partition() called when it
// cannot be resolved with just the buffer and it has to be forwarded
// to the correct shard reader, so set a buffer size so that only a
// single fragment can fit into it at a time.
const auto max_buffer_size = size_t{1};
auto factory = [db = &env.db()] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) {
auto& table = db->local().find_column_family(schema);
auto reader = table.as_mutation_source().make_mutation_reader(
schema,
std::move(permit),
range,
slice,
std::move(trace_state),
streamed_mutation::forwarding::no,
fwd_mr);
reader.set_max_buffer_size(max_buffer_size);
return reader;
};
auto& table = env.db().local().find_column_family(schema);
auto reader = make_multishard_combining_reader(
seastar::make_shared<test_reader_lifecycle_policy>(std::move(factory)),
schema,
table.get_effective_replication_map(),
make_reader_permit(env),
query::full_partition_range,
schema->full_slice());
reader.set_max_buffer_size(max_buffer_size);
std::ranges::sort(pkeys, [schema] (const dht::decorated_key& a, const dht::decorated_key& b) {
return dht::ring_position_tri_compare(*schema, a, b) < 0;
});
testlog.info("Start test");
auto assertions = assert_that(std::move(reader));
for (int i = 0; i < partition_count; ++i) {
assertions.produces(pkeys[i]);
}
assertions.produces_end_of_stream();
return make_ready_future<>();
}).get();
}
// Test the multishard streaming reader in the context it was designed to work
// in: as a mean to read data belonging to a shard according to a different
// sharding configuration.
// The reference data is provided by a filtering reader.
SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
if (smp::count < 3) {
std::cerr << "Cannot run test " << get_name() << " with smp::count < 3" << std::endl;
return;
}
do_with_cql_env_thread([&] (cql_test_env& env) -> future<> {
env.execute_cql("CREATE KEYSPACE multishard_streaming_reader_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1};").get();
env.execute_cql("CREATE TABLE multishard_streaming_reader_ks.test (pk int, v int, PRIMARY KEY(pk));").get();
const auto insert_id = env.prepare("INSERT INTO multishard_streaming_reader_ks.test (\"pk\", \"v\") VALUES (?, ?);").get();
const auto partition_count = 10000;
for (int pk = 0; pk < partition_count; ++pk) {
env.execute_prepared(insert_id, {{
cql3::raw_value::make_value(serialized(pk)),
cql3::raw_value::make_value(serialized(0))}}).get();
}
auto schema = env.local_db().find_column_family("multishard_streaming_reader_ks", "test").schema();
auto token_range = dht::token_range::make_open_ended_both_sides();
auto partition_range = dht::to_partition_range(token_range);
auto& local_partitioner = schema->get_sharder();
auto remote_partitioner = dht::static_sharder(local_partitioner.shard_count() - 1, local_partitioner.sharding_ignore_msb());
auto tested_reader = make_multishard_streaming_reader(env.db(), schema, make_reader_permit(env),
[sharder = dht::selective_token_range_sharder(remote_partitioner, token_range, 0)] () mutable -> std::optional<dht::partition_range> {
if (auto next = sharder.next()) {
return dht::to_partition_range(*next);
}
return std::nullopt;
}, gc_clock::now(), {}, read_ahead::yes);
auto close_tested_reader = deferred_close(tested_reader);
auto reader_factory = [db = &env.db()] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr) mutable {
auto& table = db->local().find_column_family(s);
return table.as_mutation_source().make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state),
streamed_mutation::forwarding::no, fwd_mr);
};
auto& table = env.db().local().find_column_family(schema);
auto erm = table.get_effective_replication_map();
auto reference_reader = make_filtering_reader(
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory)),
schema, erm, make_reader_permit(env), partition_range, schema->full_slice()),
[&remote_partitioner] (const dht::decorated_key& pkey) {
return remote_partitioner.shard_of(pkey.token()) == 0;
});
auto close_reference_reader = deferred_close(reference_reader);
utils::chunked_vector<mutation> reference_muts;
while (auto mut_opt = read_mutation_from_mutation_reader(reference_reader).get()) {
reference_muts.push_back(std::move(*mut_opt));
}
utils::chunked_vector<mutation> tested_muts;
while (auto mut_opt = read_mutation_from_mutation_reader(tested_reader).get()) {
tested_muts.push_back(std::move(*mut_opt));
}
BOOST_CHECK_EQUAL(reference_muts.size(), tested_muts.size());
const auto min_size = std::min(reference_muts.size(), tested_muts.size());
for (size_t i = 0; i < min_size; ++i) {
testlog.trace("Comparing mutation {:d}/{:d}", i, min_size - 1);
assert_that(tested_muts[i]).is_equal_to(reference_muts[i]);
}
return make_ready_future<>();
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto gen = random_mutation_generator(random_mutation_generator::generate_counters::no);
const auto expected_muts = gen(20);
// Simultaneous read and write
{
auto read_all = [] (mutation_reader& reader, utils::chunked_vector<mutation>& muts) {
return async([&reader, &muts] {
auto close_reader = deferred_close(reader);
while (auto mut_opt = read_mutation_from_mutation_reader(reader).get()) {
muts.emplace_back(std::move(*mut_opt));
}
});
};
auto write_all = [&semaphore] (queue_reader_handle& handle, const utils::chunked_vector<mutation>& muts) {
return async([&] {
auto reader = make_mutation_reader_from_mutations(muts.front().schema(), semaphore.make_permit(), muts);
auto close_reader = deferred_close(reader);
while (auto mf_opt = reader().get()) {
handle.push(std::move(*mf_opt)).get();
}
handle.push_end_of_stream();
});
};
auto actual_muts = utils::chunked_vector<mutation>{};
actual_muts.reserve(20);
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
when_all_succeed(read_all(reader, actual_muts), write_all(handle, expected_muts)).get();
BOOST_REQUIRE_EQUAL(actual_muts.size(), expected_muts.size());
for (size_t i = 0; i < expected_muts.size(); ++i) {
BOOST_REQUIRE_EQUAL(actual_muts.at(i), expected_muts.at(i));
}
}
// abort() -- check that consumer is aborted
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get())).get();
BOOST_REQUIRE(!fill_buffer_fut.available());
handle.abort(std::make_exception_ptr<std::runtime_error>(std::runtime_error("error")));
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE(!reader.is_end_of_stream());
}
// abort() -- check that producer is aborted
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
reader.set_max_buffer_size(1);
auto expected_reader = make_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
auto push_fut = make_ready_future<>();
while (push_fut.available()) {
push_fut = handle.push(std::move(*expected_reader().get()));
}
BOOST_REQUIRE(!push_fut.available());
handle.abort(std::make_exception_ptr<std::runtime_error>(std::runtime_error("error")));
BOOST_REQUIRE_THROW(reader.fill_buffer().get(), std::runtime_error);
BOOST_REQUIRE_THROW(push_fut.get(), std::runtime_error);
BOOST_REQUIRE(!reader.is_end_of_stream());
}
// Detached handle
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto fill_buffer_fut = reader.fill_buffer();
{
auto throwaway_reader = std::move(reader);
throwaway_reader.close().get();
}
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push_end_of_stream(), std::runtime_error);
BOOST_REQUIRE_NO_THROW(fill_buffer_fut.get());
}
// Abandoned handle aborts, move-assignment
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get())).get();
BOOST_REQUIRE(!fill_buffer_fut.available());
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& throwaway_reader = std::get<0>(p);
auto& throwaway_handle = std::get<1>(p);
auto close_throwaway_reader = deferred_close(throwaway_reader);
// Overwrite handle
handle = std::move(throwaway_handle);
}
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
}
// Abandoned handle aborts, destructor
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get())).get();
BOOST_REQUIRE(!fill_buffer_fut.available());
{
// Destroy handle
queue_reader_handle throwaway_handle(std::move(handle));
}
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
}
// Life-cycle, relies on ASAN for error reporting
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
{
auto throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& throwaway_reader = std::get<0>(throwaway_p);
auto& throwaway_handle = std::get<1>(throwaway_p);
auto close_throwaway_reader = deferred_close(throwaway_reader);
// Overwrite handle
handle = std::move(throwaway_handle);
auto another_throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& another_throwaway_reader = std::get<0>(another_throwaway_p);
auto& another_throwaway_handle = std::get<1>(another_throwaway_p);
auto close_another_throwaway_reader = deferred_close(another_throwaway_reader);
// Overwrite with moved-from handle (move assignment operator)
another_throwaway_handle = std::move(throwaway_handle);
// Overwrite with moved-from handle (move constructor)
queue_reader_handle yet_another_throwaway_handle(std::move(throwaway_handle));
}
}
// push_end_of_stream() detaches handle from reader, relies on ASAN for error reporting
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
{
auto throwaway_handle = std::move(handle);
throwaway_handle.push_end_of_stream();
}
}
}
SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) {
auto make_populate = [] (bool single_fragment_buffer) {
return [single_fragment_buffer] (schema_ptr s, const utils::chunked_vector<mutation>& mutations, gc_clock::time_point query_time) mutable {
auto mt = make_memtable(s, mutations);
return mutation_source([=] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto source = mt->make_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
source = make_forwardable(std::move(source));
}
auto mr = make_compacting_reader(std::move(source), query_time,
can_never_purge,
tombstone_gc_state::for_tests(), fwd_sm);
if (single_fragment_buffer) {
mr.set_max_buffer_size(1);
}
return mr;
});
};
};
BOOST_TEST_MESSAGE("run_mutation_source_tests(single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false));
BOOST_TEST_MESSAGE("run_mutation_source_tests(single_fragment_buffer=true)");
run_mutation_source_tests(make_populate(true));
}
// Check that next_partition() in the middle of a partition works properly.
SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
simple_schema ss(simple_schema::with_static::no);
const auto& schema = *ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
std::deque<mutation_fragment_v2> expected;
auto mr = [&] () {
auto permit = semaphore.make_permit();
const size_t buffer_size = 1024;
std::deque<mutation_fragment_v2> mfs;
auto dks = ss.make_pkeys(2);
const auto& dk0 = dks[0];
const auto& dk1 = dks[1];
mfs.emplace_back(*ss.schema(), permit, partition_start(dk0, tombstone{}));
auto i = 0;
size_t mfs_size = 0;
while (mfs_size <= buffer_size) {
mfs.emplace_back(*ss.schema(), permit, ss.make_row_v2(permit, ss.make_ckey(i++), "v"));
mfs_size += mfs.back().memory_usage();
}
mfs.emplace_back(*ss.schema(), permit, partition_end{});
mfs.emplace_back(*ss.schema(), permit, partition_start(dk1, tombstone{}));
mfs.emplace_back(*ss.schema(), permit, ss.make_row_v2(permit, ss.make_ckey(0), "v"));
mfs.emplace_back(*ss.schema(), permit, partition_end{});
for (const auto& mf : mfs) {
if (mf.is_partition_start()) {
expected.emplace_back(*ss.schema(), permit, partition_start(mf.as_partition_start()));
} else if (mf.is_clustering_row()) {
expected.emplace_back(*ss.schema(), permit, clustering_row(*ss.schema(), mf.as_clustering_row()));
} else { // partition-end
expected.emplace_back(*ss.schema(), permit, partition_end{});
}
}
auto mr = make_compacting_reader(make_mutation_reader_from_fragments(ss.schema(), permit, std::move(mfs)),
gc_clock::now(),
can_never_purge,
tombstone_gc_state::for_tests());
mr.set_max_buffer_size(buffer_size);
return mr;
}();
auto reader_assertions = assert_that(std::move(mr));
reader_assertions
.produces(schema, expected[0]) // partition start
.produces(schema, expected[1]) // first row
.next_partition();
auto it = expected.end() - 3;
while (it != expected.end()) {
reader_assertions.produces(schema, *it++);
}
reader_assertions.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_compacting_reader_is_consistent_with_compaction) {
simple_schema ss;
schema_ptr s = ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto m = ss.new_mutation("pk");
auto r = ss.make_ckey_range(1, 2);
auto rt = ss.new_tombstone();
ss.delete_range(m, r, rt);
auto p_tomb = ss.new_tombstone();
m.partition().apply(p_tomb);
auto read_m = [&] {
return make_mutation_reader_from_mutations(m.schema(), permit, m);
};
assert_that(read_m())
.produces_partition_start(m.decorated_key(), p_tomb)
.produces_range_tombstone_change({position_in_partition::for_range_start(r), rt})
.produces_range_tombstone_change({position_in_partition::for_range_end(r), {}})
.produces_partition_end();
assert_that(make_compacting_reader(read_m(), gc_clock::time_point::min(), can_never_purge, tombstone_gc_state::for_tests()))
.exact()
.produces_partition_start(m.decorated_key(), p_tomb)
.produces_partition_end();
}
SEASTAR_THREAD_TEST_CASE(test_auto_paused_evictable_reader_is_mutation_source) {
auto make_populate = [] (schema_ptr s, const utils::chunked_vector<mutation>& mutations, gc_clock::time_point query_time) {
auto mt = make_memtable(s, mutations);
return mutation_source([=] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto mr = make_auto_paused_evictable_reader(mt->as_data_source(), std::move(s), permit, range, slice, std::move(trace_state), fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
}
return mr;
});
};
run_mutation_source_tests(make_populate);
}
SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) {
class maybe_pausing_reader : public mutation_reader::impl {
mutation_reader _reader;
std::optional<evictable_reader_handle> _handle;
private:
void maybe_pause() {
if (!tests::random::get_int(0, 4)) {
_handle->pause();
}
}
public:
maybe_pausing_reader(
replica::memtable& mt,
schema_ptr query_schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: impl(std::move(query_schema), std::move(permit)), _reader(nullptr) {
std::tie(_reader, _handle) = make_manually_paused_evictable_reader(mt.as_data_source(), _schema, _permit, pr, ps,
std::move(trace_state), fwd_mr);
}
virtual future<> fill_buffer() override {
return _reader.fill_buffer().then([this] {
_end_of_stream = _reader.is_end_of_stream();
_reader.move_buffer_content_to(*this);
}).then([this] {
maybe_pause();
});
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return make_ready_future<>();
}
_end_of_stream = false;
return _reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_end_of_stream = false;
return _reader.fast_forward_to(pr).then([this] {
maybe_pause();
});
}
virtual future<> fast_forward_to(position_range pr) override {
throw_with_backtrace<std::bad_function_call>();
}
virtual future<> close() noexcept override {
return _reader.close();
}
};
auto make_populate = [] (schema_ptr s, const utils::chunked_vector<mutation>& mutations, gc_clock::time_point query_time) {
auto mt = make_memtable(s, mutations);
return mutation_source([=] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto mr = make_mutation_reader<maybe_pausing_reader>(*mt, s, std::move(permit), range, slice, std::move(trace_state), fwd_mr);
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
}
return mr;
});
};
run_mutation_source_tests(make_populate);
}
namespace {
std::deque<mutation_fragment_v2> copy_fragments(const schema& s, reader_permit permit, const std::deque<mutation_fragment_v2>& o) {
std::deque<mutation_fragment_v2> buf;
for (const auto& mf : o) {
buf.emplace_back(s, permit, mf);
}
return buf;
}
mutation_reader create_evictable_reader_and_evict_after_first_buffer(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::list<std::deque<mutation_fragment_v2>> buffers,
position_in_partition_view first_buf_last_fragment_position,
size_t max_buffer_size,
bool detach_buffer = true) {
class factory {
schema_ptr _schema;
reader_permit _permit;
std::list<std::deque<mutation_fragment_v2>> _buffers;
size_t _max_buffer_size;
public:
factory(schema_ptr schema, reader_permit permit, std::list<std::deque<mutation_fragment_v2>> buffers, size_t max_buffer_size)
: _schema(std::move(schema))
, _permit(std::move(permit))
, _buffers(std::move(buffers))
, _max_buffer_size(max_buffer_size) {
}
factory(const factory& o)
: _schema(o._schema)
, _permit(o._permit) {
for (const auto& buf : o._buffers) {
_buffers.emplace_back(copy_fragments(*_schema, _permit, buf));
}
}
factory(factory&& o) = default;
mutation_reader operator()(
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
BOOST_REQUIRE(s == _schema);
if (!_buffers.empty()) {
auto buf = std::move(_buffers.front());
_buffers.pop_front();
auto rd = make_mutation_reader_from_fragments(_schema, std::move(permit), std::move(buf));
rd.set_max_buffer_size(_max_buffer_size);
return rd;
}
return make_empty_mutation_reader(_schema, std::move(permit));
}
};
auto ms = mutation_source(factory(schema, permit, std::move(buffers), max_buffer_size));
auto rd = make_auto_paused_evictable_reader(
std::move(ms),
schema,
permit,
prange,
slice,
nullptr,
mutation_reader::forwarding::yes);
rd.set_max_buffer_size(max_buffer_size);
rd.fill_buffer().get();
const auto eq_cmp = position_in_partition::equal_compare(*schema);
BOOST_REQUIRE(rd.is_buffer_full());
BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), first_buf_last_fragment_position));
BOOST_REQUIRE(!rd.is_end_of_stream());
if (detach_buffer) {
rd.detach_buffer();
}
while(permit.semaphore().try_evict_one_inactive_read());
return rd;
}
mutation_reader create_evictable_reader_and_evict_after_first_buffer(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment_v2> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment_v2> last_buffer,
size_t max_buffer_size,
bool detach_buffer = true) {
std::list<std::deque<mutation_fragment_v2>> list;
list.emplace_back(std::move(first_buffer));
list.emplace_back(std::move(last_buffer));
return create_evictable_reader_and_evict_after_first_buffer(
std::move(schema),
std::move(permit),
prange,
slice,
std::move(list),
last_fragment_position,
max_buffer_size,
detach_buffer);
}
void check_evictable_reader_validation_is_triggered(
std::string_view test_name,
std::string_view error_prefix, // empty str if no exception is expected
schema_ptr schema,
reader_permit permit,
const dht::partition_range& prange,
const query::partition_slice& slice,
std::deque<mutation_fragment_v2> first_buffer,
position_in_partition_view last_fragment_position,
std::deque<mutation_fragment_v2> second_buffer,
size_t max_buffer_size) {
testlog.info("check_evictable_reader_validation_is_triggered(): checking {} test case: {}", error_prefix.empty() ? "positive" : "negative", test_name);
auto rd = create_evictable_reader_and_evict_after_first_buffer(std::move(schema), std::move(permit), prange, slice, std::move(first_buffer),
last_fragment_position, std::move(second_buffer), max_buffer_size);
auto close_rd = deferred_close(rd);
const bool fail = !error_prefix.empty();
try {
rd.fill_buffer().get();
} catch (std::runtime_error& e) {
if (fail) {
if (error_prefix == std::string_view(e.what(), error_prefix.size())) {
testlog.trace("Expected exception caught: {}", std::current_exception());
return;
} else {
BOOST_FAIL(fmt::format("Exception with unexpected message caught: {}", std::current_exception()));
}
} else {
BOOST_FAIL(fmt::format("Unexpected exception caught: {}", std::current_exception()));
}
}
if (fail) {
BOOST_FAIL(fmt::format("Expected exception not thrown"));
}
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) {
set_abort_on_internal_error(false);
auto reset_on_internal_abort = defer([] {
set_abort_on_internal_error(true);
});
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pkeys = s.make_pkeys(4);
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
size_t max_buffer_size = 512;
const int first_ck = 100;
const int second_buffer_ck = first_ck + 100;
const int last_ck = second_buffer_ck + 100;
static const char partition_error_prefix[] = "validate_partition_start(): validation failed";
static const char position_in_partition_error_prefix[] = "validate_position_in_partition(): validation failed";
const auto prange = dht::partition_range::make(
dht::partition_range::bound(pkeys[1], true),
dht::partition_range::bound(pkeys[2], true));
const auto ckrange = query::clustering_range::make(
query::clustering_range::bound(s.make_ckey(first_ck), true),
query::clustering_range::bound(s.make_ckey(last_ck), true));
const auto slice = partition_slice_builder(*s.schema()).with_range(ckrange).build();
std::deque<mutation_fragment_v2> first_buffer;
first_buffer.emplace_back(*s.schema(), permit, partition_start{pkeys[1], {}});
size_t mem_usage = first_buffer.back().memory_usage();
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(*s.schema(), permit, s.make_row_v2(permit, s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage();
}
max_buffer_size = mem_usage;
auto last_fragment_position = position_in_partition(first_buffer.back().position());
first_buffer.emplace_back(*s.schema(), permit, s.make_row_v2(permit, s.make_ckey(second_buffer_ck), "v"));
auto make_second_buffer = [&s, permit, &max_buffer_size] (dht::decorated_key pkey, std::optional<int> first_ckey = {}) mutable {
auto ckey = first_ckey ? *first_ckey : second_buffer_ck;
std::deque<mutation_fragment_v2> second_buffer;
second_buffer.emplace_back(*s.schema(), permit, partition_start{std::move(pkey), {}});
size_t mem_usage = second_buffer.back().memory_usage();
while (mem_usage <= max_buffer_size) {
second_buffer.emplace_back(*s.schema(), permit, s.make_row_v2(permit, s.make_ckey(ckey++), "v"));
mem_usage += second_buffer.back().memory_usage();
}
second_buffer.emplace_back(*s.schema(), permit, partition_end{});
return second_buffer;
};
//
// Continuing the same partition
//
check_evictable_reader_validation_is_triggered(
"pkey < _last_pkey; pkey ∉ prange",
partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[0]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey",
"",
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange (<)",
position_in_partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], first_ck - 10),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition",
position_in_partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], second_buffer_ck - 2),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∈ ckrange",
"",
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], second_buffer_ck),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey; position_in_partition ∉ ckrange (>)",
position_in_partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1], last_ck + 10),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∈ pkrange",
"",
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[2]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[3]),
max_buffer_size);
//
// Continuing from next partition
//
first_buffer.clear();
first_buffer.emplace_back(*s.schema(), permit, partition_start{pkeys[1], {}});
mem_usage = first_buffer.back().memory_usage();
for (int i = 0; i < second_buffer_ck; ++i) {
first_buffer.emplace_back(*s.schema(), permit, s.make_row_v2(permit, s.make_ckey(i++), "v"));
mem_usage += first_buffer.back().memory_usage();
}
first_buffer.emplace_back(*s.schema(), permit, partition_end{});
mem_usage += first_buffer.back().memory_usage();
last_fragment_position = position_in_partition(first_buffer.back().position());
max_buffer_size = mem_usage;
first_buffer.emplace_back(*s.schema(), permit, partition_start{pkeys[2], {}});
check_evictable_reader_validation_is_triggered(
"pkey < _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[0]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey == _last_pkey",
partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[1]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∈ pkrange",
"",
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[2]),
max_buffer_size);
check_evictable_reader_validation_is_triggered(
"pkey > _last_pkey; pkey ∉ pkrange",
partition_error_prefix,
s.schema(),
permit,
prange,
slice,
copy_fragments(*s.schema(), permit, first_buffer),
last_fragment_position,
make_second_buffer(pkeys[3]),
max_buffer_size);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_recreate_before_fast_forward_to) {
class test_reader : public mutation_reader::impl {
simple_schema _s;
const std::vector<dht::decorated_key> _pkeys;
std::vector<dht::decorated_key>::const_iterator _it;
std::vector<dht::decorated_key>::const_iterator _end;
private:
void on_range_change(const dht::partition_range& pr) {
dht::ring_position_comparator cmp(*_schema);
_it = _pkeys.begin();
while (_it != _pkeys.end() && !pr.contains(*_it, cmp)) {
++_it;
}
_end = _it;
while (_end != _pkeys.end() && pr.contains(*_end, cmp)) {
++_end;
}
}
public:
test_reader(simple_schema s, reader_permit permit, const dht::partition_range& pr, std::vector<dht::decorated_key> pkeys)
: impl(s.schema(), std::move(permit))
, _s(std::move(s))
, _pkeys(std::move(pkeys)) {
on_range_change(pr);
}
virtual future<> fill_buffer() override {
if (_it == _end) {
_end_of_stream = true;
return make_ready_future<>();
}
push_mutation_fragment(*_schema, _permit, partition_start(*_it++, {}));
uint32_t ck = 0;
while (!is_buffer_full()) {
auto ckey = _s.make_ckey(ck);
push_mutation_fragment(*_schema, _permit, _s.make_row_v2(_permit, _s.make_ckey(ck++), make_random_string(1024)));
++ck;
}
push_mutation_fragment(*_schema, _permit, partition_end());
return make_ready_future<>();
}
virtual future<> next_partition() override {
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
on_range_change(pr);
clear_buffer();
_end_of_stream = false;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
return make_ready_future<>();
};
};
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name(), reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pkeys = s.make_pkeys(6);
std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema()));
auto ms = mutation_source([&] (schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
std::vector<dht::decorated_key> pkeys_with_data;
bool empty = false;
for (const auto& pkey : pkeys) {
empty = !empty;
if (empty) {
pkeys_with_data.push_back(pkey);
}
}
return make_mutation_reader<test_reader>(
s,
std::move(permit),
range,
std::move(pkeys_with_data));
});
auto pr0 = dht::partition_range::make({pkeys[0], true}, {pkeys[3], true});
auto [reader, handle] = make_manually_paused_evictable_reader(std::move(ms), s.schema(), permit, pr0, s.schema()->full_slice(),
{}, mutation_reader::forwarding::yes);
auto reader_assert = assert_that(std::move(reader));
reader_assert.produces(pkeys[0]);
reader_assert.produces(pkeys[2]);
handle.pause();
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
reader_assert.produces_end_of_stream();
auto pr1 = dht::partition_range::make({pkeys[4], true}, {pkeys[5], true});
reader_assert.fast_forward_to(pr1);
// Failure will happen in the form of `on_internal_error()`.
reader_assert.produces(pkeys[4]);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_drop_flags) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pkeys = s.make_pkeys(2);
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
return pk1.less_compare(*s.schema(), pk2);
});
const auto& pkey1 = pkeys[0];
const auto& pkey2 = pkeys[1];
const int second_buffer_ck = 10;
struct buffer {
simple_schema& s;
reader_permit permit;
std::deque<mutation_fragment_v2> frags;
utils::chunked_vector<mutation> muts;
std::optional<mutation_rebuilder_v2> mut_rebuilder;
size_t size = 0;
std::optional<position_in_partition_view> last_pos;
buffer(simple_schema& s_, reader_permit permit_, dht::decorated_key key)
: s(s_), permit(std::move(permit_)) {
add_partition(key);
}
size_t add_partition(dht::decorated_key key) {
size += frags.emplace_back(*s.schema(), permit, partition_start{key, {}}).memory_usage();
if (mut_rebuilder) {
muts.emplace_back(*std::move(*mut_rebuilder).consume_end_of_stream());
}
mut_rebuilder.emplace(s.schema());
mut_rebuilder->consume(partition_start{key, {}});
return size;
}
size_t add_mutation_fragment(mutation_fragment_v2&& mf, bool only_to_frags = false) {
if (!only_to_frags) {
SCYLLA_ASSERT(mut_rebuilder);
mut_rebuilder->consume(mutation_fragment_v2(*s.schema(), permit, mf));
}
size += frags.emplace_back(*s.schema(), permit, std::move(mf)).memory_usage();
return size;
}
size_t add_static_row(std::optional<mutation_fragment_v2> sr = {}) {
auto srow = sr ? std::move(*sr) : s.make_static_row_v2(permit, "s");
return add_mutation_fragment(std::move(srow));
}
size_t add_clustering_row(int i, bool only_to_frags = false) {
return add_mutation_fragment(mutation_fragment_v2(*s.schema(), permit, s.make_row_v2(permit, s.make_ckey(i), "v")), only_to_frags);
}
size_t add_clustering_rows(int start, int end) {
for (int i = start; i < end; ++i) {
add_clustering_row(i);
}
return size;
}
size_t add_partition_end() {
size += frags.emplace_back(*s.schema(), permit, partition_end{}).memory_usage();
return size;
}
void end_of_stream() {
muts.emplace_back(*std::move(*mut_rebuilder).consume_end_of_stream());
}
void save_position() { last_pos = frags.back().position(); }
void find_position(size_t buf_size) {
size_t s = 0;
for (const auto& frag : frags) {
s += frag.memory_usage();
if (s >= buf_size) {
last_pos = frag.position();
break;
}
}
BOOST_REQUIRE(last_pos);
}
};
auto make_reader = [&] (const buffer& first_buffer, const buffer& second_buffer, const buffer* const third_buffer, size_t max_buffer_size) {
std::list<std::deque<mutation_fragment_v2>> buffers;
buffers.emplace_back(copy_fragments(*s.schema(), permit, first_buffer.frags));
buffers.emplace_back(copy_fragments(*s.schema(), permit, second_buffer.frags));
if (third_buffer) {
buffers.emplace_back(copy_fragments(*s.schema(), permit, third_buffer->frags));
}
return create_evictable_reader_and_evict_after_first_buffer(
s.schema(),
permit,
query::full_partition_range,
s.schema()->full_slice(),
std::move(buffers),
*first_buffer.last_pos,
max_buffer_size,
false);
};
testlog.info("Same partition, with static row");
{
buffer first_buffer(s, permit, pkey1);
first_buffer.add_static_row();
auto srow = mutation_fragment_v2(*s.schema(), permit, first_buffer.frags.back());
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck);
first_buffer.end_of_stream();
buffer second_buffer(s, permit, pkey1);
second_buffer.add_static_row(std::move(srow));
second_buffer.add_clustering_row(second_buffer_ck);
second_buffer.add_clustering_row(second_buffer_ck + 1);
second_buffer.add_partition_end();
second_buffer.end_of_stream();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Same partition, no static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck);
first_buffer.end_of_stream();
buffer second_buffer(s, permit, pkey1);
second_buffer.add_clustering_row(second_buffer_ck);
second_buffer.add_clustering_row(second_buffer_ck + 1);
second_buffer.add_partition_end();
second_buffer.end_of_stream();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Same partition as expected, no static row, next partition has static row (#8923)");
{
buffer second_buffer(s, permit, pkey1);
second_buffer.add_clustering_rows(second_buffer_ck, second_buffer_ck + second_buffer_ck / 2);
// We want to end the buffer on the partition-start below, but since a
// partition start will be dropped from it, we have to use the size
// without it.
const auto buf_size = second_buffer.add_partition_end();
second_buffer.add_partition(pkey2);
second_buffer.add_static_row();
auto srow = mutation_fragment_v2(*s.schema(), permit, second_buffer.frags.back());
second_buffer.add_clustering_rows(0, 2);
second_buffer.end_of_stream();
buffer first_buffer(s, permit, pkey1);
for (int i = 0; first_buffer.add_clustering_row(i) < buf_size; ++i);
first_buffer.save_position();
first_buffer.add_mutation_fragment(mutation_fragment_v2(*s.schema(), permit, second_buffer.frags[1]));
first_buffer.end_of_stream();
buffer third_buffer(s, permit, pkey2);
third_buffer.add_static_row(std::move(srow));
third_buffer.add_clustering_rows(0, 2);
third_buffer.add_partition_end();
third_buffer.end_of_stream();
first_buffer.find_position(buf_size);
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, &third_buffer, buf_size))
.produces(first_buffer.muts[0] + second_buffer.muts[0])
.produces(second_buffer.muts[1] + third_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Next partition, with no static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
first_buffer.end_of_stream();
buffer second_buffer(s, permit, pkey2);
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
second_buffer.add_partition_end();
second_buffer.end_of_stream();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0])
.produces(second_buffer.muts[0])
.produces_end_of_stream();
}
testlog.info("Next partition, with static row");
{
buffer first_buffer(s, permit, pkey1);
const auto buf_size = first_buffer.add_clustering_rows(0, second_buffer_ck);
first_buffer.save_position();
first_buffer.add_clustering_row(second_buffer_ck + 1, true);
first_buffer.end_of_stream();
buffer second_buffer(s, permit, pkey2);
second_buffer.add_static_row();
second_buffer.add_clustering_rows(0, second_buffer_ck / 2);
second_buffer.add_partition_end();
second_buffer.end_of_stream();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.has_monotonic_positions();
assert_that(make_reader(first_buffer, second_buffer, nullptr, buf_size))
.produces(first_buffer.muts[0])
.produces(second_buffer.muts[0])
.produces_end_of_stream();
}
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto schema = s.schema();
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pkey = s.make_pkey();
const auto prange = dht::partition_range::make_open_ended_both_sides();
mutation expected_mut(schema, pkey);
std::deque<mutation_fragment_v2> frags;
{
mutation_rebuilder_v2 mut_builder(schema);
auto push_mf = [&] (auto mf) {
using mf_type = decltype(mf);
frags.emplace_back(*schema, permit, mf_type(mf));
mut_builder.consume(mutation_fragment_v2(*schema, permit, std::move(mf)));
};
push_mf(partition_start(pkey, {}));
for (int i = 0; i < 10; ++i) {
const auto ckey = s.make_ckey(i);
const auto pos = i % 2 ? position_in_partition::after_key(*s.schema(), ckey) : position_in_partition::before_key(ckey);
for (int j = 0; j < 10; ++j) {
push_mf(range_tombstone_change(pos, tombstone(s.new_timestamp(), {})));
}
}
push_mf(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), tombstone{}));
push_mf(partition_end{});
auto mut_opt = mut_builder.consume_end_of_stream();
BOOST_REQUIRE(mut_opt);
expected_mut = std::move(*mut_opt);
}
auto ms = mutation_source([&frags] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto reader = make_mutation_reader_from_fragments(schema, permit, copy_fragments(*schema, permit, frags), range, slice);
reader.set_max_buffer_size(1);
return reader;
});
auto reader = make_auto_paused_evictable_reader(std::move(ms), schema, permit, prange, schema->full_slice(),
nullptr, mutation_reader::forwarding::no);
auto close_reader = deferred_close(reader);
auto actual_mut = read_mutation_from_mutation_reader(reader).get();
BOOST_REQUIRE(actual_mut);
BOOST_REQUIRE(reader.is_end_of_stream());
BOOST_REQUIRE_EQUAL(*actual_mut, expected_mut);
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_partition) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto schema = s.schema();
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pkeys = s.make_pkeys(2);
std::sort(pkeys.begin(), pkeys.end(), [&s] (const auto& pk1, const auto& pk2) {
return pk1.less_compare(*s.schema(), pk2);
});
const auto& pkey1 = pkeys[0];
const auto& pkey2 = pkeys[1];
const auto first_rtc = range_tombstone_change(position_in_partition::before_key(s.make_ckey(0)), tombstone{s.new_timestamp(), {}});
const auto last_rtc = range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), tombstone{});
size_t buffer_size = 0;
std::deque<mutation_fragment_v2> first_buffer;
{
auto push_mf = [&] (auto mf) {
first_buffer.emplace_back(*schema, permit, std::move(mf));
};
push_mf(partition_start(pkey1, {}));
buffer_size += first_buffer.back().memory_usage();
push_mf(first_rtc);
buffer_size += first_buffer.back().memory_usage();
for (int i = 0; i < 5; ++i) {
push_mf(s.make_row_v2(permit, s.make_ckey(i), "v"));
buffer_size += first_buffer.back().memory_usage();
}
// Add more data after cutting buffer_size
for (int i = 0; i < 5; ++i) {
push_mf(s.make_row_v2(permit, s.make_ckey(i), "v"));
}
push_mf(last_rtc);
push_mf(partition_end{});
buffer_size -= first_buffer.back().memory_usage();
}
std::deque<mutation_fragment_v2> second_buffer;
{
auto push_mf = [&] (auto mf) {
second_buffer.emplace_back(*schema, permit, std::move(mf));
};
push_mf(partition_start(pkey2, {}));
push_mf(partition_end{});
}
std::deque<mutation_fragment_v2> empty_buffer;
auto prange = dht::partition_range::make_open_ended_both_sides();
auto check = [&] (const std::deque<mutation_fragment_v2>& second_buffer, const char* scenario) {
testlog.info("check() scenario {}", scenario);
auto ms = mutation_source([&buffer_size, &first_buffer, &second_buffer, first = true] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
const auto& buf = first ? first_buffer : second_buffer;
first = false;
auto reader = make_mutation_reader_from_fragments(schema, permit, copy_fragments(*schema, permit, buf), range, slice);
reader.set_max_buffer_size(buffer_size);
return reader;
});
auto reader = make_auto_paused_evictable_reader(std::move(ms), schema, permit, prange, schema->full_slice(),
nullptr, mutation_reader::forwarding::no);
auto close_reader = deferred_close(reader);
reader.fill_buffer().get();
BOOST_REQUIRE(!reader.is_end_of_stream());
std::vector<range_tombstone_change> tombs;
{
auto mf_opt = reader().get();
BOOST_REQUIRE(mf_opt->is_partition_start());
}
while (auto mf_opt = reader().get()) {
if (mf_opt->is_range_tombstone_change()) {
tombs.push_back(std::move(mf_opt->as_range_tombstone_change()));
}
}
BOOST_REQUIRE_EQUAL(tombs.size(), 2);
BOOST_REQUIRE(tombs.front().equal(*schema, first_rtc));
auto cmp = position_in_partition::tri_compare(*schema);
BOOST_REQUIRE(!tombs.back().tombstone());
BOOST_REQUIRE(cmp(tombs.back().position(), last_rtc.position()) < 0);
};
check(second_buffer, "continue from another partition");
check(empty_buffer, "end of stream");
}
SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) {
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), 1, 0);
auto stop_sem = deferred_stop(semaphore);
simple_schema s;
auto schema = s.schema();
auto permit = semaphore.make_tracking_only_permit(s.schema(), get_name(), db::no_timeout, {});
auto pk = s.make_pkey();
const auto prange = dht::partition_range::make_open_ended_both_sides();
std::deque<mutation_fragment_v2> frags;
frags.emplace_back(*schema, permit, partition_start(pk, {}));
for (size_t ck = 0; ck < 1000; ++ck) {
frags.emplace_back(*schema, permit, range_tombstone_change(position_in_partition::before_key(s.make_ckey(ck)), tombstone(s.new_timestamp(), {})));
}
frags.emplace_back(*schema, permit, range_tombstone_change(position_in_partition::before_key(s.make_ckey(1001)), tombstone()));
frags.emplace_back(*schema, permit, partition_end{});
const auto max_buf_size = frags[0].memory_usage() + frags[1].memory_usage() + frags[2].memory_usage();
auto ms = mutation_source([&frags, max_buf_size] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& ps) {
auto rd = make_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(frags), pr, ps);
rd.set_max_buffer_size(max_buf_size);
return rd;
});
auto [rd, handle] = make_manually_paused_evictable_reader(ms, schema, permit, prange, schema->full_slice(), {},
mutation_reader::forwarding::no);
auto stop_rd = deferred_close(rd);
rd.set_max_buffer_size(max_buf_size);
// #13491 - the reader must not consume the entire partition but a small batch of fragments based on the buffer size.
rd.fill_buffer().get();
rd.fill_buffer().get();
auto buf1 = rd.detach_buffer();
// There should be 6-7 fragments, but to avoid computing the exact number of fragments that should fit in `max_buf_size`,
// just ensure that there are <= 10 (consuming the whole partition would give ~1000 fragments).
BOOST_REQUIRE_LE(buf1.size(), 10);
}
struct mutation_bounds {
std::optional<mutation> m;
position_in_partition lower;
position_in_partition upper;
};
static reader_bounds make_reader_bounds(
schema_ptr s, reader_permit permit, mutation_bounds mb, streamed_mutation::forwarding fwd,
const query::partition_slice* slice = nullptr) {
if (!slice) {
slice = &s->full_slice();
}
return reader_bounds {
.r = mb.m ? make_mutation_reader_from_mutations(s, permit, std::move(*mb.m), *slice, fwd)
: make_empty_mutation_reader(s, permit),
.lower = std::move(mb.lower),
.upper = std::move(mb.upper)
};
}
struct clustering_order_merger_test_generator {
struct scenario {
std::vector<mutation_bounds> readers_data;
std::vector<position_range> fwd_ranges;
};
schema_ptr _s;
dht::decorated_key _pk;
clustering_order_merger_test_generator(std::optional<dht::decorated_key> pk = std::nullopt)
: _s(make_schema()), _pk(pk ? *pk : tests::generate_partition_key(_s))
{}
static schema_ptr make_schema() {
return schema_builder("ks", "t")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type, column_kind::regular_column)
.build();
}
clustering_key mk_ck(int k) const {
return clustering_key::from_single_value(*_s, int32_type->decompose(k));
}
position_in_partition mk_pos_for(int k) const {
return position_in_partition::for_key(mk_ck(k));
}
mutation mk_mutation(const std::map<int, int>& kvs) const {
mutation m(_s, _pk);
auto& cdef = *_s->get_column_definition(to_bytes("v"));
for (auto [k, v]: kvs) {
m.set_clustered_cell(mk_ck(k), cdef,
atomic_cell::make_live(*cdef.type, 42, cdef.type->decompose(v)));
}
return m;
}
scenario generate_scenario(std::mt19937& engine) const {
std::set<int> all_ks;
std::vector<mutation_bounds> readers_data;
auto num_readers = tests::random::get_int(1, 10, engine);
auto num_empty_readers = tests::random::get_int(1, num_readers, engine);
while (num_empty_readers--) {
auto lower = -tests::random::get_int(0, 5, engine);
auto upper = tests::random::get_int(0, 5, engine);
readers_data.push_back(mutation_bounds{std::nullopt, mk_pos_for(lower), mk_pos_for(upper)});
num_readers--;
}
while (num_readers--) {
auto len = tests::random::get_int(0, 15, engine);
auto ks = tests::random::random_subset<int>(100, len, engine);
std::map<int, int> kvs;
for (auto k: ks) {
all_ks.insert(k);
kvs.emplace(k, tests::random::get_int(0, 100, engine));
}
auto lower = (kvs.empty() ? 0 : kvs.begin()->first) - tests::random::get_int(0, 5, engine);
auto upper = (kvs.empty() ? 0 : std::prev(kvs.end())->first) + tests::random::get_int(0, 5, engine);
readers_data.push_back(mutation_bounds{mk_mutation(kvs), mk_pos_for(lower), mk_pos_for(upper)});
}
std::sort(readers_data.begin(), readers_data.end(), [less = position_in_partition::less_compare(*_s)]
(const mutation_bounds& a, const mutation_bounds& b) { return less(a.lower, b.lower); });
std::vector<position_in_partition> positions { position_in_partition::before_all_clustered_rows() };
for (auto k: all_ks) {
auto ck = mk_ck(k);
positions.push_back(position_in_partition::before_key(ck));
positions.push_back(position_in_partition::for_key(ck));
positions.push_back(position_in_partition::after_key(*_s, ck));
}
positions.push_back(position_in_partition::after_all_clustered_rows());
size_t num_ranges = tests::random::get_int(1ul, std::max(all_ks.size() / 3, 1ul));
positions = tests::random::random_subset(std::move(positions), num_ranges * 2, engine);
std::sort(positions.begin(), positions.end(), position_in_partition::less_compare(*_s));
std::vector<position_range> fwd_ranges;
for (size_t i = 0; i < num_ranges; ++i) {
SCYLLA_ASSERT(2*i+1 < positions.size());
fwd_ranges.push_back(position_range(std::move(positions[2*i]), std::move(positions[2*i+1])));
}
return scenario {
std::move(readers_data),
std::move(fwd_ranges)
};
}
};
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
tests::reader_concurrency_semaphore_wrapper semaphore;
clustering_order_merger_test_generator g;
auto make_authority = [s = g._s, &semaphore] (std::optional<mutation> mut, streamed_mutation::forwarding fwd) {
auto permit = semaphore.make_permit();
if (mut) {
return make_mutation_reader_from_mutations(s, permit, std::move(*mut), fwd);
}
return make_empty_mutation_reader(s, permit);
};
auto make_tested = [s = g._s, &semaphore] (std::vector<mutation_bounds> ms, streamed_mutation::forwarding fwd) {
auto permit = semaphore.make_permit();
auto rs = std::move(ms)
| std::views::transform([s, fwd, &permit] (auto&& mb) {
return make_reader_bounds(s, permit, std::move(mb), fwd);
})
| std::ranges::to<std::vector<reader_bounds>>();
auto q = std::make_unique<simple_position_reader_queue>(*s, std::move(rs));
return make_clustering_combined_reader(s, permit, fwd, std::move(q));
};
auto seed = tests::random::get_int<uint32_t>();
std::cout << "test_clustering_order_merger_in_memory seed: " << seed << std::endl;
auto engine = std::mt19937(seed);
for (int run = 0; run < 1000; ++run) {
auto scenario = g.generate_scenario(engine);
auto merged = std::accumulate(scenario.readers_data.begin(), scenario.readers_data.end(),
std::optional<mutation>{}, [&g] (std::optional<mutation> curr, const mutation_bounds& mb) {
if (mb.m) {
if (!curr) {
curr = mutation(g._s, g._pk);
}
*curr += *mb.m;
}
return curr;
});
{
auto fwd = streamed_mutation::forwarding::no;
compare_readers(*g._s, make_authority(merged, fwd), make_tested(scenario.readers_data, fwd));
}
auto fwd = streamed_mutation::forwarding::yes;
compare_readers(*g._s, make_authority(std::move(merged), fwd),
make_tested(std::move(scenario.readers_data), fwd), scenario.fwd_ranges);
}
// Test case with 0 readers
for (auto fwd: {streamed_mutation::forwarding::no, streamed_mutation::forwarding::yes}) {
auto r = make_clustering_combined_reader(g._s, semaphore.make_permit(), fwd,
std::make_unique<simple_position_reader_queue>(*g._s, std::vector<reader_bounds>{}));
assert_that(std::move(r)).produces_end_of_stream();
}
}
static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
return sstables::test_env::do_with_async([reversed] (sstables::test_env& env) {
auto pkeys = tests::generate_partition_keys(2, clustering_order_merger_test_generator::make_schema());
clustering_order_merger_test_generator g(pkeys[0]);
auto query_schema = g._s;
auto table_schema = g._s;
auto query_slice = query_schema->full_slice();
if (reversed) {
table_schema = table_schema->make_reversed();
query_slice.options.set(query::partition_slice::option::reversed);
query_slice = query::native_reverse_slice_to_legacy_reverse_slice(*table_schema, std::move(query_slice));
}
auto make_authority = [&env, &query_schema, &query_slice] (mutation mut, streamed_mutation::forwarding fwd) {
return make_mutation_reader_from_mutations(query_schema, env.make_reader_permit(), std::move(mut), query_slice, fwd);
};
auto pr = dht::partition_range::make_singular(dht::ring_position(g._pk));
auto make_tested = [&env, query_schema, pk = g._pk, &pr, &query_slice, reversed]
(const time_series_sstable_set& sst_set,
const std::unordered_set<sstables::generation_type>& included_gens, streamed_mutation::forwarding fwd) {
auto permit = env.make_reader_permit();
auto q = sst_set.make_position_reader_queue(
[query_schema, &pr, &query_slice, fwd, permit] (sstable& sst) {
return sst.make_reader(query_schema, permit, pr,
query_slice, nullptr, fwd);
},
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); },
pk.key(), query_schema, permit, fwd, reversed);
return make_clustering_combined_reader(query_schema, permit, fwd, std::move(q));
};
auto seed = tests::random::get_int<uint32_t>();
std::cout << "test_clustering_order_merger_sstable_set seed: " << seed << std::endl;
auto engine = std::mt19937(seed);
std::bernoulli_distribution dist(0.9);
for (int run = 0; run < 100; ++run) {
auto scenario = g.generate_scenario(engine);
if (reversed) {
for (auto& mb: scenario.readers_data) {
if (mb.m) {
mb.m = reverse(std::move(*mb.m));
}
}
}
time_series_sstable_set sst_set(table_schema, true);
mutation merged(table_schema, g._pk);
std::unordered_set<sstables::generation_type> included_gens;
auto sst_factory = env.make_sst_factory(table_schema);
for (auto& mb: scenario.readers_data) {
sstables::shared_sstable sst;
if (mb.m) {
sst = make_sstable_containing(sst_factory, {*mb.m});
sst_set.insert(sst);
} else {
// We want to have an sstable that won't return any fragments when we query it
// for our partition (not even `partition_start`). For that we create an sstable
// with a different partition.
auto pk = pkeys[1];
SCYLLA_ASSERT(!pk.equal(*g._s, g._pk));
sst = make_sstable_containing(sst_factory, {mutation(table_schema, pk)});
sst_set.insert(sst);
}
if (dist(engine)) {
included_gens.insert(sst->generation());
if (mb.m) {
merged += *mb.m;
}
}
}
{
auto fwd = streamed_mutation::forwarding::no;
compare_readers(*query_schema, make_authority(merged, fwd), make_tested(sst_set, included_gens, fwd));
}
auto fwd = streamed_mutation::forwarding::yes;
compare_readers(*query_schema, make_authority(std::move(merged), fwd),
make_tested(sst_set, included_gens, fwd), scenario.fwd_ranges);
}
});
}
SEASTAR_TEST_CASE(test_clustering_order_merger_sstable_set) {
return do_test_clustering_order_merger_sstable_set(false);
}
SEASTAR_TEST_CASE(test_clustering_order_merger_sstable_set_reversed) {
return do_test_clustering_order_merger_sstable_set(true);
}
SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
// The clustering combined reader (based on `clustering_order_reader_merger`) supports only
// single partition readers, but the mutation source test framework tests the provided source
// on multiple partitions. Furthermore, our reader doesn't support static rows or partition tombstones.
// We deal with this as follows:
// 1. we process all mutations provided by the framework, extracting unsupported fragments
// (partition tombstones and static rows) into separate mutations, called ``bad mutations'' below.
// Each bad mutation is used to create a separate reader.
// 2. The remaining mutations (``good mutations'') are sorted w.r.t their partition keys;
// for each partition key, we use the set of mutations with this key to create our clustering combined reader.
// The readers are then fed into a meta-reader `multi_partition_reader` which supports multi-partition queries,
// calling the provided readers appropriately as the query goes through the partition range.
// 3. The ``bad mutation'' readers from step 1 and the reader from step 2 are combined together
// using the generic combined reader.
// Multi partition reader from single partition readers.
// precondition: readers must be nonempty (they return a partition_start)
struct multi_partition_reader : public mutation_reader::impl {
using container_t = std::map<dht::decorated_key, mutation_reader, dht::decorated_key::less_comparator>;
std::reference_wrapper<const dht::partition_range> _range;
container_t _readers;
container_t::iterator _it;
// invariants:
// 1. _it = end() or _it->first is not before the current partition range (_range)
// 2. _it->second did not return partition_end
// did we fetch a fragment from _it? (due to invariant 2, it wasn't partition end)
bool _inside_partition = false;
multi_partition_reader(schema_ptr s, reader_permit permit,
container_t readers, const dht::partition_range& range)
: impl(std::move(s), std::move(permit))
, _range(range), _readers(std::move(readers))
, _it(std::partition_point(_readers.begin(), _readers.end(), [this, cmp = dht::ring_position_comparator(*_schema)]
(auto& r) { return _range.get().before(r.first, cmp); }))
{
SCYLLA_ASSERT(!_readers.empty());
}
virtual future<> fill_buffer() override {
while (!is_buffer_full()) {
auto mfo = co_await next_fragment();
if (!mfo) {
_end_of_stream = true;
break;
}
push_mutation_fragment(std::move(*mfo));
}
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
_end_of_stream = false;
if (is_buffer_empty()) {
// all fragments that were in the buffer came from the same partition
if (_inside_partition) {
// last fetched fragment came from _it, so all fragments previously in the buffer came from _it
// => current partition is _it, we need to move forward
// _it might be the end of current forwarding range, but that's no problem;
// in that case we'll go into eos mode until forwarded
SCYLLA_ASSERT(_it != _readers.end());
_inside_partition = false;
++_it;
} else {
// either no previously fetched fragment or must have come from before _it. Nothing to do
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
// all fragments currently in the buffer come from the current partition range
// and pr must be strictly greater, so just clear the buffer
clear_buffer();
_end_of_stream = false;
_inside_partition = false;
_range = pr;
// restore invariant 2
_it = std::partition_point(_it, _readers.end(), [this, cmp = dht::ring_position_comparator(*_schema)]
(auto& r) { return _range.get().before(r.first, cmp); });
co_return;
}
virtual future<> fast_forward_to(position_range pr) override {
if (!_inside_partition) {
// this should not happen - the specification of fast_forward_to says that it can only be called
// while inside partition. But if it happens for whatever reason just do nothing
return make_ready_future<>();
}
SCYLLA_ASSERT(_it != _readers.end());
// all fragments currently in the buffer come from the current position range
// and pr must be strictly greater, so just clear the buffer
clear_buffer();
_end_of_stream = false;
return _it->second.fast_forward_to(std::move(pr));
}
virtual future<> close() noexcept override {
return parallel_for_each(_readers, [] (auto& p) {
return p.second.close();
});
}
future<mutation_fragment_v2_opt> next_fragment() {
if (_it == _readers.end() || _range.get().after(_it->first, dht::ring_position_comparator(*_schema))) {
co_return mutation_fragment_v2_opt{};
}
auto mfo = co_await _it->second();
if (mfo) {
if (mfo->is_end_of_partition()) {
++_it;
_inside_partition = false;
} else {
_inside_partition = true;
}
co_return mfo;
}
// our readers must be sm::forwarding (invariant 2) and the range was just exhausted,
// waiting for ff or next_partition
co_return mutation_fragment_v2_opt{};
}
};
tests::reader_concurrency_semaphore_wrapper semaphore;
auto populate = [&semaphore] (schema_ptr s, const utils::chunked_vector<mutation>& muts) {
std::map<dht::decorated_key, std::vector<mutation_bounds>, dht::decorated_key::less_comparator>
good_mutations{dht::decorated_key::less_comparator(s)};
utils::chunked_vector<mutation> bad_mutations;
for (auto& m: muts) {
auto& dk = m.decorated_key();
std::optional<std::pair<position_in_partition, position_in_partition>> bounds;
position_in_partition::less_compare less{*s};
mutation_rebuilder_v2 good(m.schema());
good.consume_new_partition(dk);
std::optional<mutation_rebuilder_v2> bad;
auto rd = make_mutation_reader_from_mutations(s, semaphore.make_permit(), m);
auto close_rd = deferred_close(rd);
rd.consume_pausable([&] (mutation_fragment_v2&& mf) {
if (mf.is_partition_start()) {
if (mf.as_partition_start().partition_tombstone()) {
bad.emplace(m.schema());
bad->consume(std::move(mf));
}
} else if (mf.is_static_row()) {
if (!bad) {
bad.emplace(m.schema());
bad->consume_new_partition(dk);
}
bad->consume(std::move(mf));
} else if (!mf.is_end_of_partition()) {
if (!bounds) {
bounds = std::pair{mf.position(), mf.position()};
} else {
bounds->second = mf.position();
}
good.consume(std::move(mf));
}
return stop_iteration::no;
}).get();
mutation_bounds mb {
std::move(*good.consume_end_of_stream()),
bounds ? std::move(bounds->first) : position_in_partition::before_all_clustered_rows(),
bounds ? std::move(bounds->second) : position_in_partition::after_all_clustered_rows()
};
auto it = good_mutations.find(dk);
if (it == good_mutations.end()) {
good_mutations.emplace(dk, std::vector<mutation_bounds>{std::move(mb)});
} else {
it->second.push_back(std::move(mb));
}
if (bad) {
bad_mutations.push_back(std::move(*bad->consume_end_of_stream()));
}
}
return mutation_source([good = std::move(good_mutations), bad = std::move(bad_mutations)] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto reversed = slice.is_reversed();
std::map<dht::decorated_key, mutation_reader, dht::decorated_key::less_comparator>
good_readers{dht::decorated_key::less_comparator(s)};
for (auto& [k, ms]: good) {
auto rs = ms
| std::views::transform([&] (auto&& mb) {
auto rb = make_reader_bounds(s, permit, std::move(mb), fwd_sm, &slice);
if (reversed) {
// The bounds are calculated in 'table order' (using the mutation and its schema),
// but we need them in 'query order' (using the query schema), so for reversed queries
// we need to swap and reverse them.
std::swap(rb.lower, rb.upper);
rb.lower = std::move(rb.lower).reversed();
rb.upper = std::move(rb.upper).reversed();
}
return rb;
})
| std::ranges::to<std::vector<reader_bounds>>();
std::sort(rs.begin(), rs.end(), [less = position_in_partition::less_compare(*s)]
(const reader_bounds& a, const reader_bounds& b) { return less(a.lower, b.lower); });
auto q = std::make_unique<simple_position_reader_queue>(*s, std::move(rs));
good_readers.emplace(k, make_clustering_combined_reader(s, permit, fwd_sm, std::move(q)));
}
std::vector<mutation_reader> readers;
for (auto& m: bad) {
readers.push_back(make_mutation_reader_from_mutations(s, permit, {m}, range, slice, fwd_sm));
}
readers.push_back(make_mutation_reader<multi_partition_reader>(s, permit, std::move(good_readers), range));
return make_combined_reader(std::move(s), std::move(permit), std::move(readers), fwd_sm, fwd_mr);
});
};
run_mutation_source_tests(std::move(populate));
}
// Regression test for #8445.
SEASTAR_THREAD_TEST_CASE(test_clustering_combining_of_empty_readers) {
auto s = clustering_order_merger_test_generator::make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
std::vector<reader_bounds> rs;
rs.push_back({
.r = make_empty_mutation_reader(s, permit),
.lower = position_in_partition::before_all_clustered_rows(),
.upper = position_in_partition::after_all_clustered_rows()
});
auto r = make_clustering_combined_reader(
s, permit, streamed_mutation::forwarding::no,
std::make_unique<simple_position_reader_queue>(*s, std::move(rs)));
auto close_r = deferred_close(r);
auto mf = r().get();
if (mf) {
BOOST_FAIL(format("reader combined of empty readers returned fragment {}", mutation_fragment_v2::printer(*s, *mf)));
}
}
template <typename Reader>
class closing_holder {
Reader _rd;
public:
closing_holder(Reader&& rd) : _rd(std::move(rd)) { }
closing_holder(closing_holder&&) = default;
~closing_holder() { _rd.close().get(); }
auto operator()() { return _rd(); }
};
SEASTAR_THREAD_TEST_CASE(test_generating_reader_v1) {
auto populator_v1 = [] (schema_ptr s, const utils::chunked_vector<mutation>& muts) {
return mutation_source([muts] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto underlying = mutation_fragment_v1_stream
(make_mutation_reader_from_mutations(schema, permit, squash_mutations(muts), range, slice));
auto rd = make_next_partition_adaptor(make_generating_reader_v1(schema, permit, closing_holder(std::move(underlying))));
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(rd));
}
return rd;
});
};
run_mutation_source_tests(populator_v1, false);
}
SEASTAR_THREAD_TEST_CASE(test_generating_reader_v2) {
auto populator_v2 = [] (schema_ptr s, const utils::chunked_vector<mutation>& muts) {
return mutation_source([muts] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto underlying = make_mutation_reader_from_mutations(schema, permit, squash_mutations(muts), range, slice);
auto rd = make_next_partition_adaptor(make_generating_reader(schema, permit, closing_holder(std::move(underlying))));
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(rd));
}
return rd;
});
};
run_mutation_source_tests(populator_v2, false);
}
// Check that the multishard reader is safe to create with an admitted permit,
// i.e. a permit which already has a count resource (and memory resources).
// Create semaphroes with a single count resource, admit a permit and create a
// multishard reader with said permit and ensure this doesn't end up in a
// deadlock (timeout) when the multishard reader creates the shard reader on the
// same shard.
SEASTAR_TEST_CASE(test_multishard_reader_safe_to_create_with_admitted_permit) {
class semaphore_factory : public test_reader_lifecycle_policy::semaphore_factory {
std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>>& _semaphores;
public:
explicit semaphore_factory(std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>>& semaphores) : _semaphores(semaphores) { }
virtual lw_shared_ptr<reader_concurrency_semaphore> create(sstring name) override {
auto semaphore = _semaphores.at(this_shard_id()).release();
_semaphores[this_shard_id()] = make_foreign(semaphore);
return semaphore;
}
virtual future<> stop(reader_concurrency_semaphore& semaphore) override {
return make_ready_future<>(); // NOOP, we stop the semaphore in the layer above
}
};
return do_with_cql_env_thread([] (cql_test_env& env) {
simple_schema s;
std::vector<foreign_ptr<lw_shared_ptr<reader_concurrency_semaphore>>> semaphores;
semaphores.resize(smp::count);
parallel_for_each(std::views::iota(0u, smp::count), [&semaphores] (shard_id shard) {
return smp::submit_to(shard, [&semaphores] {
semaphores[this_shard_id()] = make_foreign(make_lw_shared<reader_concurrency_semaphore>(
reader_concurrency_semaphore::for_tests{},
seastar::format("{}:{}", get_name(), this_shard_id()),
1,
1 * 1024 * 1024));
});
}).get();
auto stop_semaphores = defer([&semaphores] {
parallel_for_each(std::views::iota(0u, smp::count), [&semaphores] (shard_id shard) {
return smp::submit_to(shard, [&semaphores] () -> future<> {
auto semaphore = semaphores[this_shard_id()].release();
co_await semaphore->stop();
});
}).get();
});
std::map<dht::token, unsigned> pkeys_by_tokens;
for (unsigned i = 0; i < smp::count * 2; ++i) {
pkeys_by_tokens.emplace(s.make_pkey(i).token(), i);
}
auto sharder = std::make_unique<dummy_sharder>(s.schema()->get_sharder(), std::move(pkeys_by_tokens));
auto reader_factory = [] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
tracing::trace_state_ptr,
mutation_reader::forwarding) {
return make_empty_mutation_reader(std::move(schema), std::move(permit));
};
// timeout is used to break the deadlock in case this test fails
auto permit = semaphores.at(this_shard_id())->obtain_permit(s.schema(), "multishard_reader", 128 * 1024, db::timeout_clock::now() + 60s, {}).get();
auto lifecycle_policy = seastar::make_shared<test_reader_lifecycle_policy>(reader_factory, std::make_unique<semaphore_factory>(semaphores));
auto reader = make_multishard_combining_reader_for_tests(*sharder, std::move(lifecycle_policy), s.schema(), std::move(permit),
query::full_partition_range, s.schema()->full_slice());
auto close_reader = deferred_close(reader);
reader().get();
});
}
class evicting_semaphore_factory : public test_reader_lifecycle_policy::semaphore_factory {
std::vector<lw_shared_ptr<reader_concurrency_semaphore>>& _registry;
public:
explicit evicting_semaphore_factory(std::vector<lw_shared_ptr<reader_concurrency_semaphore>>& registry) : _registry(registry) { }
virtual lw_shared_ptr<reader_concurrency_semaphore> create(sstring name) override {
// Create with no memory, so all inactive reads are immediately evicted.
auto semaphore = make_lw_shared<reader_concurrency_semaphore>(reader_concurrency_semaphore::for_tests{}, std::move(name), 1, 0);
_registry.at(this_shard_id()) = semaphore;
return semaphore;
}
virtual future<> stop(reader_concurrency_semaphore& semaphore) override {
_registry.at(this_shard_id()) = {};
return semaphore.stop();
}
};
SEASTAR_TEST_CASE(test_multishard_reader_buffer_hint_large_partitions) {
return do_with_cql_env_thread([] (cql_test_env& env) {
simple_schema ss;
const auto& schema = ss.schema();
const sstring value(1024, 'v');
const size_t num_rows = 200;
tests::reader_concurrency_semaphore_wrapper semaphore;
const auto pkeys_by_tokens = ss.make_pkeys(2 * smp::count)
| std::views::transform([] (const dht::decorated_key& dk) { return std::pair(dk.token(), dk); })
| std::ranges::to<std::map<dht::token, dht::decorated_key>>();
std::vector<utils::chunked_vector<frozen_mutation>> frozen_muts(smp::count, utils::chunked_vector<frozen_mutation>{});
std::vector<lw_shared_ptr<reader_concurrency_semaphore>> semaphore_registry(smp::count, nullptr);
unsigned i = 0;
for (const auto& [token, dk] : pkeys_by_tokens) {
mutation mut(schema, dk);
for (uint32_t ck = 0; ck < num_rows; ++ck) {
ss.add_row(mut, ss.make_ckey(ck), value);
}
frozen_muts[i++ % smp::count].emplace_back(mut);
}
size_t partition_size{0};
size_t partition_start_size{0};
size_t row_size{0};
size_t partition_end_size{0};
size_t range_tombstone_size{0};
{
auto reader = make_mutation_reader_from_mutations(schema, semaphore.make_permit(), frozen_muts.front().front().unfreeze(schema), schema->full_slice());
auto close_reader = deferred_close(reader);
reader.set_max_buffer_size(1024*1024);
reader.fill_buffer().get();
partition_size = reader.buffer_size();
const auto buf = reader.detach_buffer();
BOOST_REQUIRE_GT(buf.size(), 2);
BOOST_REQUIRE(buf[0].is_partition_start());
BOOST_REQUIRE(buf[1].is_clustering_row());
BOOST_REQUIRE(buf.back().is_end_of_partition());
BOOST_REQUIRE(reader.is_end_of_stream());
partition_start_size = buf[0].memory_usage();
row_size = buf[1].memory_usage();
partition_end_size = buf.back().memory_usage();
auto rtc = mutation_fragment_v2(*schema, semaphore.make_permit(), range_tombstone_change(ss.make_ckey(0), tombstone()));
range_tombstone_size = rtc.memory_usage();
}
std::vector<std::vector<size_t>> data_per_shard(smp::count, std::vector<size_t>{});
size_t total_data{0};
for (unsigned shard_id = 0; shard_id != smp::count; ++shard_id) {
for (const auto& _ : frozen_muts.at(shard_id)) {
data_per_shard.at(shard_id).push_back(partition_size);
total_data += partition_size;
}
}
auto sharder = std::make_unique<dummy_sharder>(schema->get_sharder(), std::move(pkeys_by_tokens));
const auto reader_factory = [frozen_muts] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range&,
const query::partition_slice& ps,
tracing::trace_state_ptr,
mutation_reader::forwarding) {
auto muts = frozen_muts.at(this_shard_id())
| std::views::transform([schema] (const frozen_mutation& fm) { return fm.unfreeze(schema); })
| std::ranges::to<utils::chunked_vector<mutation>>();
return make_mutation_reader_from_mutations(std::move(schema), std::move(permit), std::move(muts), ps);
};
auto run_test = [&] (size_t buffer_size, multishard_reader_buffer_hint buffer_hint,
std::source_location sl = std::source_location::current()) {
testlog.info("run_test(): buffer_size: {}, buffer_hint: {} @ {}:{}", buffer_size, bool(buffer_hint), sl.file_name(), sl.line());
auto reader = make_multishard_combining_reader_for_tests(
*sharder,
seastar::make_shared<test_reader_lifecycle_policy>(reader_factory, std::make_unique<evicting_semaphore_factory>(semaphore_registry)),
schema,
semaphore.make_permit(),
query::full_partition_range,
schema->full_slice(),
{},
mutation_reader::forwarding::no,
buffer_hint,
read_ahead::no);
reader.set_max_buffer_size(buffer_size);
auto close_reader = deferred_close(reader);
reader.fill_buffer().get();
// simulate the expected read algorithm to calculate the amount each shard should have read
std::vector<size_t> buffer_fill_calls_per_shard(smp::count, 0);
size_t shards_visited{0};
{
size_t to_read = buffer_size;
size_t data_left = total_data;
auto shard_data_left = data_per_shard;
const auto shard_reader_buffer_size = buffer_hint ? buffer_size : mutation_reader::default_max_buffer_size_in_bytes();
while (to_read > 0 && data_left > 0) {
for (unsigned shard_id = 0; shard_id != smp::count && to_read > 0 && data_left > 0; ++shard_id) {
auto& shard_data = shard_data_left[shard_id];
BOOST_REQUIRE(!shard_data.empty());
size_t current_buffer_size{0};
auto* current_partition = &shard_data.back();
bool stop_on_full_buffer = false;
while (to_read > 0 && data_left > 0) {
const auto fragment_size = *current_partition == partition_size
? partition_start_size
: (*current_partition == partition_end_size ? partition_end_size : row_size);
current_buffer_size += fragment_size;
testlog.trace("fill buffer loop for shard#{}: to_read={}, current_partition={}, fragment_size={}, current_buffer_size={}", shard_id, to_read, *current_partition, fragment_size, current_buffer_size);
if (current_buffer_size >= shard_reader_buffer_size) {
++buffer_fill_calls_per_shard.at(shard_id);
testlog.trace("fill buffer loop for shard#{}: finished buffer #{} with size {}", shard_id, buffer_fill_calls_per_shard.at(shard_id), current_buffer_size);
// After each eviction, the evictable reader will
// emit a range tombstone change resetting the
// current tombstone.
current_buffer_size = range_tombstone_size;
if (stop_on_full_buffer) {
break;
}
}
*current_partition -= fragment_size;
to_read -= std::min(to_read, fragment_size);
data_left -= fragment_size;
if (!*current_partition) {
if (buffer_hint) {
break;
} else {
shard_data.pop_back();
current_partition = &shard_data.back();
stop_on_full_buffer = true;
}
}
}
if (current_buffer_size > range_tombstone_size) {
++buffer_fill_calls_per_shard.at(shard_id);
testlog.trace("fill buffer loop for shard#{}: finished buffer #{} with size {}", shard_id, buffer_fill_calls_per_shard.at(shard_id), current_buffer_size);
}
if (!shard_data.back()) {
shard_data.pop_back();
}
++shards_visited;
}
}
}
for (unsigned shard_id = 0; shard_id != smp::count; ++shard_id) {
testlog.trace("shard#{}", shard_id);
if (shards_visited > shard_id) {
const auto reads_from_shard = buffer_fill_calls_per_shard.at(shard_id);
auto& shard_semaphore = semaphore_registry.at(shard_id);
BOOST_REQUIRE(bool(shard_semaphore));
BOOST_REQUIRE_EQUAL(shard_semaphore->get_stats().reads_admitted, reads_from_shard);
BOOST_REQUIRE_EQUAL(shard_semaphore->get_stats().permit_based_evictions, reads_from_shard);
} else {
BOOST_REQUIRE(!semaphore_registry.at(shard_id));
}
}
};
run_test(mutation_reader::default_max_buffer_size_in_bytes(), multishard_reader_buffer_hint::no);
run_test(100 * 1024, multishard_reader_buffer_hint::no);
run_test(100 * 1024, multishard_reader_buffer_hint::yes);
run_test(200 * 1024, multishard_reader_buffer_hint::no);
run_test(200 * 1024, multishard_reader_buffer_hint::yes);
run_test(400 * 1024, multishard_reader_buffer_hint::no);
run_test(400 * 1024, multishard_reader_buffer_hint::yes);
});
}
SEASTAR_TEST_CASE(test_multishard_reader_buffer_hint_small_partitions) {
return do_with_cql_env_thread([] (cql_test_env& env) {
simple_schema ss;
const auto& schema = ss.schema();
const sstring value(128, 'v');
const size_t num_rows = 2;
tests::reader_concurrency_semaphore_wrapper semaphore;
const auto pkeys_by_tokens = ss.make_pkeys(64 * smp::count)
| std::views::transform([] (const dht::decorated_key& dk) { return std::pair(dk.token(), dk); })
| std::ranges::to<std::map<dht::token, dht::decorated_key>>();
std::vector<utils::chunked_vector<frozen_mutation>> frozen_muts(smp::count, utils::chunked_vector<frozen_mutation>{});
std::vector<lw_shared_ptr<reader_concurrency_semaphore>> semaphore_registry(smp::count, nullptr);
unsigned i = 0;
for (const auto& [token, dk] : pkeys_by_tokens) {
mutation mut(schema, dk);
for (uint32_t ck = 0; ck < num_rows; ++ck) {
ss.add_row(mut, ss.make_ckey(ck), value);
}
frozen_muts[i++ % smp::count].emplace_back(mut);
}
auto sharder = std::make_unique<dummy_sharder>(schema->get_sharder(), std::move(pkeys_by_tokens));
const auto reader_factory = [frozen_muts] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range&,
const query::partition_slice& ps,
tracing::trace_state_ptr,
mutation_reader::forwarding) {
auto muts = frozen_muts.at(this_shard_id())
| std::views::transform([schema] (const frozen_mutation& fm) { return fm.unfreeze(schema); })
| std::ranges::to<utils::chunked_vector<mutation>>();
return make_mutation_reader_from_mutations(std::move(schema), std::move(permit), std::move(muts), ps);
};
auto reader = make_multishard_combining_reader_for_tests(
*sharder,
seastar::make_shared<test_reader_lifecycle_policy>(reader_factory, std::make_unique<evicting_semaphore_factory>(semaphore_registry)),
schema,
semaphore.make_permit(),
query::full_partition_range,
schema->full_slice(),
{},
mutation_reader::forwarding::no,
multishard_reader_buffer_hint::yes,
read_ahead::no);
auto close_reader = deferred_close(reader);
reader.fill_buffer().get();
for (unsigned shard_id = 0; shard_id != smp::count; ++shard_id) {
const auto reads_from_shard = 1;
auto& shard_semaphore = semaphore_registry.at(shard_id);
BOOST_REQUIRE(bool(shard_semaphore));
BOOST_REQUIRE_EQUAL(shard_semaphore->get_stats().reads_admitted, reads_from_shard);
BOOST_REQUIRE_EQUAL(shard_semaphore->get_stats().permit_based_evictions, reads_from_shard);
}
});
}
BOOST_AUTO_TEST_SUITE_END()