|
|
|
|
@@ -155,10 +155,10 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_within_partition_test) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::vector<flat_mutation_reader> v;
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {make_partition(std::views::iota(0, 5))}));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {make_partition(std::views::iota(5, 10))}));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {make_partition(boost::irange(0, 5))}));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {make_partition(boost::irange(5, 10))}));
|
|
|
|
|
assert_that(make_combined_reader(s.schema(), tests::make_permit(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
|
|
|
|
|
.produces(make_partition(std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition(boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -178,16 +178,16 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_galloping_over_multiple_partit
|
|
|
|
|
|
|
|
|
|
std::vector<flat_mutation_reader> v;
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], std::views::iota(5, 10)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 5))
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], boost::irange(5, 10)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], boost::irange(0, 5))
|
|
|
|
|
}));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 5)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], std::views::iota(5, 10))
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], boost::irange(0, 5)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], boost::irange(5, 10))
|
|
|
|
|
}));
|
|
|
|
|
assert_that(make_combined_reader(s.schema(), tests::make_permit(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[0], boost::irange(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[1], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -198,16 +198,16 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_changing_multiple_partitions_
|
|
|
|
|
|
|
|
|
|
std::vector<flat_mutation_reader> v;
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 5)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 5))
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], boost::irange(0, 5)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], boost::irange(0, 5))
|
|
|
|
|
}));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), {
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], std::views::iota(5, 10)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], std::views::iota(5, 10)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[0], boost::irange(5, 10)),
|
|
|
|
|
make_partition_with_clustering_rows(s, k[1], boost::irange(5, 10)),
|
|
|
|
|
}));
|
|
|
|
|
assert_that(make_combined_reader(s.schema(), tests::make_permit(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[0], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[1], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[0], boost::irange(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, k[1], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -320,7 +320,7 @@ SEASTAR_TEST_CASE(test_combining_one_empty_reader) {
|
|
|
|
|
|
|
|
|
|
std::vector<dht::decorated_key> generate_keys(schema_ptr s, int count) {
|
|
|
|
|
auto keys = ranges::to<std::vector<dht::decorated_key>>(
|
|
|
|
|
std::views::iota(0, count) | std::views::transform([s] (int key) {
|
|
|
|
|
boost::irange(0, count) | boost::adaptors::transformed([s] (int key) {
|
|
|
|
|
auto pk = partition_key::from_single_value(*s, int32_type->decompose(data_value(key)));
|
|
|
|
|
return dht::decorate_key(*s, std::move(pk));
|
|
|
|
|
}));
|
|
|
|
|
@@ -329,7 +329,7 @@ std::vector<dht::decorated_key> generate_keys(schema_ptr s, int count) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::vector<dht::ring_position> to_ring_positions(const std::vector<dht::decorated_key>& keys) {
|
|
|
|
|
return ranges::to<std::vector<dht::ring_position>>(keys | std::views::transform([] (const dht::decorated_key& key) {
|
|
|
|
|
return ranges::to<std::vector<dht::ring_position>>(keys | boost::adaptors::transformed([] (const dht::decorated_key& key) {
|
|
|
|
|
return dht::ring_position(key);
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
@@ -365,7 +365,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combining_reader) {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
auto make_reader = [&] (const dht::partition_range& pr) {
|
|
|
|
|
return make_combined_reader(s, tests::make_permit(), ranges::to<std::vector<flat_mutation_reader>>(mutations | std::views::transform([&pr] (auto& ms) {
|
|
|
|
|
return make_combined_reader(s, tests::make_permit(), ranges::to<std::vector<flat_mutation_reader>>(mutations | boost::adaptors::transformed([&pr] (auto& ms) {
|
|
|
|
|
return flat_mutation_reader_from_mutations(tests::make_permit(), {ms}, pr);
|
|
|
|
|
})));
|
|
|
|
|
};
|
|
|
|
|
@@ -416,22 +416,22 @@ SEASTAR_THREAD_TEST_CASE(test_fast_forwarding_combining_reader_with_galloping) {
|
|
|
|
|
|
|
|
|
|
auto pr = dht::partition_range::make(ring[0], ring[0]);
|
|
|
|
|
std::vector<flat_mutation_reader> v;
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(std::views::iota(0, 5), 7), pr));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(std::views::iota(5, 10), 7), pr));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(boost::irange(0, 5), 7), pr));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(boost::irange(5, 10), 7), pr));
|
|
|
|
|
|
|
|
|
|
assert_that(make_combined_reader(s.schema(), tests::make_permit(), std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[0], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[0], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream()
|
|
|
|
|
.fast_forward_to(dht::partition_range::make(ring[1], ring[1]))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[1], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[1], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream()
|
|
|
|
|
.fast_forward_to(dht::partition_range::make(ring[3], ring[4]))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[3], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[3], boost::irange(0, 10)))
|
|
|
|
|
.fast_forward_to(dht::partition_range::make({ ring[4], false }, ring[5]))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[5], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[5], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream()
|
|
|
|
|
.fast_forward_to(dht::partition_range::make_starting_with(ring[6]))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[6], std::views::iota(0, 10)))
|
|
|
|
|
.produces(make_partition_with_clustering_rows(s, pkeys[6], boost::irange(0, 10)))
|
|
|
|
|
.produces_end_of_stream();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -511,8 +511,8 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
|
|
|
|
|
|
|
|
|
|
auto pr = dht::partition_range::make(ring[0], ring[0]);
|
|
|
|
|
std::vector<flat_mutation_reader> v;
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(std::views::iota(0, 5), 3), streamed_mutation::forwarding::yes));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(std::views::iota(5, 10), 3), streamed_mutation::forwarding::yes));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(boost::irange(0, 5), 3), streamed_mutation::forwarding::yes));
|
|
|
|
|
v.push_back(flat_mutation_reader_from_mutations(tests::make_permit(), make_n_mutations(boost::irange(5, 10), 3), streamed_mutation::forwarding::yes));
|
|
|
|
|
|
|
|
|
|
auto reader = make_combined_reader(s.schema(), tests::make_permit(), std::move(v), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
|
|
|
|
|
auto assertions = assert_that(std::move(reader));
|
|
|
|
|
@@ -1563,7 +1563,7 @@ SEASTAR_THREAD_TEST_CASE(test_foreign_reader_as_mutation_source) {
|
|
|
|
|
const auto remote_shard = (this_shard_id() + 1) % smp::count;
|
|
|
|
|
auto frozen_mutations = ranges::to<std::vector<frozen_mutation>>(
|
|
|
|
|
mutations
|
|
|
|
|
| std::views::transform([] (const mutation& m) { return freeze(m); }));
|
|
|
|
|
| boost::adaptors::transformed([] (const mutation& m) { return freeze(m); }));
|
|
|
|
|
auto remote_mt = smp::submit_to(remote_shard, [s = global_schema_ptr(s), &frozen_mutations] {
|
|
|
|
|
auto mt = make_lw_shared<memtable>(s.get());
|
|
|
|
|
|
|
|
|
|
@@ -1690,12 +1690,12 @@ SEASTAR_TEST_CASE(test_trim_clustering_row_ranges_to) {
|
|
|
|
|
|
|
|
|
|
const auto check = [&schema] (std::vector<range> ranges, key key, std::vector<range> output_ranges, bool reversed = false,
|
|
|
|
|
std::experimental::source_location sl = std::experimental::source_location::current()) {
|
|
|
|
|
auto actual_ranges = ranges::to<query::clustering_row_ranges>(ranges | std::views::transform(
|
|
|
|
|
auto actual_ranges = ranges::to<query::clustering_row_ranges>(ranges | boost::adaptors::transformed(
|
|
|
|
|
[&] (const range& r) { return r.to_clustering_range(*schema); }));
|
|
|
|
|
|
|
|
|
|
query::trim_clustering_row_ranges_to(*schema, actual_ranges, key.to_clustering_key(*schema), reversed);
|
|
|
|
|
|
|
|
|
|
const auto expected_ranges = ranges::to<query::clustering_row_ranges>(output_ranges | std::views::transform(
|
|
|
|
|
const auto expected_ranges = ranges::to<query::clustering_row_ranges>(output_ranges | boost::adaptors::transformed(
|
|
|
|
|
[&] (const range& r) { return r.to_clustering_range(*schema); }));
|
|
|
|
|
|
|
|
|
|
if (!std::equal(actual_ranges.begin(), actual_ranges.end(), expected_ranges.begin(), expected_ranges.end(),
|
|
|
|
|
@@ -2180,7 +2180,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s
|
|
|
|
|
remote_controls.emplace_back(nullptr);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
parallel_for_each(std::views::iota(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
|
|
|
|
|
parallel_for_each(boost::irange(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
|
|
|
|
|
return smp::submit_to(shard, [] {
|
|
|
|
|
return make_foreign(std::make_unique<puppet_reader::control>());
|
|
|
|
|
}).then([shard, &remote_controls] (foreign_ptr<std::unique_ptr<puppet_reader::control>>&& ctr) mutable {
|
|
|
|
|
@@ -2196,7 +2196,7 @@ multishard_reader_for_read_ahead prepare_multishard_reader_for_read_ahead_test(s
|
|
|
|
|
|
|
|
|
|
auto shard_pkeys = std::vector<std::vector<uint32_t>>(smp::count, std::vector<uint32_t>{});
|
|
|
|
|
auto i = unsigned(0);
|
|
|
|
|
for (auto pkey : pkeys_by_tokens | std::views::values) {
|
|
|
|
|
for (auto pkey : pkeys_by_tokens | boost::adaptors::map_values) {
|
|
|
|
|
shard_pkeys[i++ % smp::count].push_back(pkey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2297,14 +2297,14 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_destroyed_with_pending
|
|
|
|
|
// Destroy reader.
|
|
|
|
|
reader = flat_mutation_reader(nullptr);
|
|
|
|
|
|
|
|
|
|
parallel_for_each(std::views::iota(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
|
|
|
|
|
parallel_for_each(boost::irange(0u, smp::count), [&remote_controls] (unsigned shard) mutable {
|
|
|
|
|
return smp::submit_to(shard, [control = remote_controls.at(shard).get()] {
|
|
|
|
|
control->buffer_filled.set_value();
|
|
|
|
|
});
|
|
|
|
|
}).get();
|
|
|
|
|
|
|
|
|
|
BOOST_REQUIRE(eventually_true([&] {
|
|
|
|
|
return map_reduce(std::views::iota(0u, smp::count), [&] (unsigned shard) {
|
|
|
|
|
return map_reduce(boost::irange(0u, smp::count), [&] (unsigned shard) {
|
|
|
|
|
return smp::submit_to(shard, [&remote_controls, shard] {
|
|
|
|
|
return remote_controls.at(shard)->destroyed;
|
|
|
|
|
});
|
|
|
|
|
@@ -2363,7 +2363,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe
|
|
|
|
|
fut.get();
|
|
|
|
|
|
|
|
|
|
const auto all_shard_fast_forwarded = map_reduce(
|
|
|
|
|
std::views::iota(0u, multishard_reader_for_read_ahead::blocked_shard + 1u),
|
|
|
|
|
boost::irange(0u, multishard_reader_for_read_ahead::blocked_shard + 1u),
|
|
|
|
|
[&] (unsigned shard) {
|
|
|
|
|
return smp::submit_to(shard, [control = remote_controls.at(shard).get()] {
|
|
|
|
|
return control->fast_forward_to == 1;
|
|
|
|
|
@@ -2382,7 +2382,7 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_fast_forwarded_with_pe
|
|
|
|
|
reader = flat_mutation_reader(nullptr);
|
|
|
|
|
|
|
|
|
|
BOOST_REQUIRE(eventually_true([&] {
|
|
|
|
|
return map_reduce(std::views::iota(0u, smp::count), [&] (unsigned shard) {
|
|
|
|
|
return map_reduce(boost::irange(0u, smp::count), [&] (unsigned shard) {
|
|
|
|
|
return smp::submit_to(shard, [&remote_controls, shard] {
|
|
|
|
|
return remote_controls.at(shard)->destroyed;
|
|
|
|
|
});
|
|
|
|
|
@@ -2417,8 +2417,8 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_combining_reader_next_partition) {
|
|
|
|
|
auto& partitioner = schema->get_partitioner();
|
|
|
|
|
|
|
|
|
|
auto pkeys = ranges::to<std::vector<dht::decorated_key>>(
|
|
|
|
|
std::views::iota(0, partition_count) |
|
|
|
|
|
std::views::transform([schema, &partitioner] (int i) {
|
|
|
|
|
boost::irange(0, partition_count) |
|
|
|
|
|
boost::adaptors::transformed([schema, &partitioner] (int i) {
|
|
|
|
|
return partitioner.decorate_key(*schema, partition_key::from_singular(*schema, i));
|
|
|
|
|
}));
|
|
|
|
|
|
|
|
|
|
|