/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include "partition_slice_builder.hh" #include "schema/schema_builder.hh" #include "test/lib/mutation_source_test.hh" #include "readers/mutation_source.hh" #include "mutation/counters.hh" #include "mutation/mutation_rebuilder.hh" #include "test/lib/simple_schema.hh" #include "readers/mutation_reader.hh" #include "test/lib/mutation_reader_assertions.hh" #include "mutation_query.hh" #include "mutation/mutation_rebuilder.hh" #include "test/lib/random_utils.hh" #include "cql3/cql3_type.hh" #include "test/lib/make_random_string.hh" #include "test/lib/data_model.hh" #include "test/lib/key_utils.hh" #include "test/lib/log.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "types/user.hh" #include "types/map.hh" #include "types/list.hh" #include "types/set.hh" #include #include "utils/assert.hh" #include "utils/UUID_gen.hh" // partitions must be sorted by decorated key static void require_no_token_duplicates(const utils::chunked_vector& partitions) { std::optional last_token; for (auto&& p : partitions) { const dht::decorated_key& key = p.decorated_key(); if (last_token && key.token() == *last_token) { BOOST_FAIL("token duplicate detected"); } last_token = key.token(); } } static api::timestamp_type new_timestamp() { static thread_local api::timestamp_type ts = api::min_timestamp; return ts++; } namespace { // Helper class for testing mutation_reader::fast_forward_to(dht::partition_range). class partition_range_walker { std::vector _ranges; size_t _current_position = 0; private: const dht::partition_range& current_range() const { return _ranges[_current_position]; } public: explicit partition_range_walker(std::vector ranges) : _ranges(ranges) { } const dht::partition_range& initial_range() const { return _ranges[0]; } void fast_forward_if_needed(mutation_reader_assertions& mr, const mutation& expected, bool verify_eos = true) { while (!current_range().contains(expected.decorated_key(), dht::ring_position_comparator(*expected.schema()))) { _current_position++; SCYLLA_ASSERT(_current_position < _ranges.size()); if (verify_eos) { mr.produces_end_of_stream(); } mr.fast_forward_to(current_range()); } } }; } static void test_slicing_and_fast_forwarding(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s; const sstring value = "v"; constexpr unsigned ckey_count = 4; auto dkeys = s.make_pkeys(128); auto dkeys_pos = 0; utils::chunked_vector mutations; { // All clustered rows and a static row, range tombstones covering each row auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.delete_range(m, query::clustering_range::make({s.make_ckey(ckey)}, {s.make_ckey(ckey + 1), false})); } for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } { // All clustered rows and a static row, a range tombstone covering all rows auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); s.delete_range(m, query::clustering_range::make({s.make_ckey(0)},{s.make_ckey(ckey_count)})); for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } { // All clustered rows and a static row, range tombstones disjoint with rows auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.delete_range(m, query::clustering_range::make({s.make_ckey(ckey), false}, {s.make_ckey(ckey + 1), false})); } for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } { // All clustered rows but no static row and no range tombstones auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); for (auto ckey = 0u; ckey < ckey_count; ckey++) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } { // Just a static row auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); mutations.emplace_back(std::move(m)); } { // Every other clustered row and a static row auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); for (auto ckey = 0u; ckey < ckey_count; ckey += 2) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } { // Every other clustered row but no static row auto m = mutation(s.schema(), dkeys.at(dkeys_pos++)); s.add_static_row(m, value); for (auto ckey = 0u; ckey < ckey_count; ckey += 2) { s.add_row(m, s.make_ckey(ckey), value); } mutations.emplace_back(std::move(m)); } mutation_source ms = populate(s.schema(), mutations, gc_clock::now()); auto test_ckey = [&] (std::vector pranges, utils::chunked_vector mutations, mutation_reader::forwarding fwd_mr) { for (auto range_size = 1u; range_size <= ckey_count + 1; range_size++) { for (auto start = 0u; start <= ckey_count; start++) { auto range = range_size == 1 ? query::clustering_range::make_singular(s.make_ckey(start)) : query::clustering_range::make({s.make_ckey(start)}, {s.make_ckey(start + range_size), false}); testlog.info("Clustering key range {}", range); auto test_common = [&] (const query::partition_slice& slice) { testlog.info("Read whole partitions at once"); auto pranges_walker = partition_range_walker(pranges); auto mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pranges_walker.initial_range(), slice, nullptr, streamed_mutation::forwarding::no, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { pranges_walker.fast_forward_if_needed(actual, expected); actual.produces_partition_start(expected.decorated_key()); if (!expected.partition().static_row().empty()) { actual.produces_static_row(); } auto start_position = position_in_partition(position_in_partition::after_static_row_tag_t()); for (auto current = start; current < start + range_size; current++) { auto ck = s.make_ckey(current); if (expected.partition().find_row(*s.schema(), ck)) { auto end_position = position_in_partition::after_key(*s.schema(), ck); actual.may_produce_tombstones(position_range(start_position, end_position)); actual.produces_row_with_key(ck, expected.partition().range_tombstone_for_row(*s.schema(), ck)); actual.may_produce_tombstones(position_range(start_position, end_position)); start_position = std::move(end_position); } } actual.may_produce_tombstones(position_range(start_position, position_in_partition::for_partition_end())); actual.produces_partition_end(); } actual.produces_end_of_stream(); testlog.info("Read partitions with fast-forwarding to each individual row"); pranges_walker = partition_range_walker(pranges); mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pranges_walker.initial_range(), slice, nullptr, streamed_mutation::forwarding::yes, fwd_mr); actual = assert_that(std::move(mr)); for (auto& expected : mutations) { pranges_walker.fast_forward_if_needed(actual, expected); actual.produces_partition_start(expected.decorated_key()); if (!expected.partition().static_row().empty()) { actual.produces_static_row(); } actual.produces_end_of_stream(); for (auto current = start; current < start + range_size; current++) { auto ck = s.make_ckey(current); auto pos_range = position_range( position_in_partition(position_in_partition::before_clustering_row_tag_t(), ck), position_in_partition::after_key(*s.schema(), ck)); actual.fast_forward_to(pos_range); actual.may_produce_tombstones(pos_range); if (expected.partition().find_row(*s.schema(), ck)) { actual.produces_row_with_key(ck, expected.partition().range_tombstone_for_row(*s.schema(), ck)); actual.may_produce_tombstones(pos_range); } actual.produces_end_of_stream(); } actual.next_partition(); } actual.produces_end_of_stream(); }; testlog.info("Single-range slice"); auto slice = partition_slice_builder(*s.schema()) .with_range(range) .build(); test_common(slice); testlog.info("Test monotonic positions"); auto mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), query::full_partition_range, slice, nullptr, streamed_mutation::forwarding::no, fwd_mr); assert_that(std::move(mr)).has_monotonic_positions(); if (range_size != 1) { testlog.info("Read partitions fast-forwarded to the range of interest"); auto pranges_walker = partition_range_walker(pranges); mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pranges_walker.initial_range(), slice, nullptr, streamed_mutation::forwarding::yes, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { pranges_walker.fast_forward_if_needed(actual, expected); actual.produces_partition_start(expected.decorated_key()); if (!expected.partition().static_row().empty()) { actual.produces_static_row(); } actual.produces_end_of_stream(); auto start_ck = s.make_ckey(start); auto end_ck = s.make_ckey(start + range_size); actual.fast_forward_to(position_range( position_in_partition(position_in_partition::clustering_row_tag_t(), start_ck), position_in_partition(position_in_partition::clustering_row_tag_t(), end_ck))); auto current_position = position_in_partition(position_in_partition::after_static_row_tag_t()); for (auto current = start; current < start + range_size; current++) { auto ck = s.make_ckey(current); if (expected.partition().find_row(*s.schema(), ck)) { auto end_position = position_in_partition::after_key(*s.schema(), ck); actual.may_produce_tombstones(position_range(current_position, end_position)); actual.produces_row_with_key(ck, expected.partition().range_tombstone_for_row(*s.schema(), ck)); current_position = std::move(end_position); } } actual.may_produce_tombstones(position_range(current_position, position_in_partition(position_in_partition::clustering_row_tag_t(), end_ck))); actual.produces_end_of_stream(); actual.next_partition(); } actual.produces_end_of_stream(); } testlog.info("Slice with not clustering ranges"); slice = partition_slice_builder(*s.schema()) .with_ranges({}) .build(); testlog.info("Read partitions with just static rows"); auto pranges_walker = partition_range_walker(pranges); mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pranges_walker.initial_range(), slice, nullptr, streamed_mutation::forwarding::no, fwd_mr); auto actual = assert_that(std::move(mr)); for (auto& expected : mutations) { pranges_walker.fast_forward_if_needed(actual, expected); actual.produces_partition_start(expected.decorated_key()); if (!expected.partition().static_row().empty()) { actual.produces_static_row(); } actual.produces_partition_end(); } actual.produces_end_of_stream(); if (range_size != 1) { testlog.info("Slice with single-row ranges"); std::vector ranges; for (auto i = start; i < start + range_size; i++) { ranges.emplace_back(query::clustering_range::make_singular(s.make_ckey(i))); } slice = partition_slice_builder(*s.schema()) .with_ranges(ranges) .build(); test_common(slice); testlog.info("Test monotonic positions"); auto mr = ms.make_mutation_reader(s.schema(), semaphore.make_permit(), query::full_partition_range, slice, nullptr, streamed_mutation::forwarding::no, fwd_mr); assert_that(std::move(mr)).has_monotonic_positions(); } } } }; test_ckey({query::full_partition_range}, mutations, mutation_reader::forwarding::no); for (auto prange_size = 1u; prange_size < mutations.size(); prange_size += 2) { for (auto pstart = 0u; pstart + prange_size <= mutations.size(); pstart++) { auto ms = mutations | std::views::drop(pstart) | std::views::take(prange_size) | std::ranges::to>(); if (prange_size == 1) { test_ckey({dht::partition_range::make_singular(mutations[pstart].decorated_key())}, ms, mutation_reader::forwarding::yes); test_ckey({dht::partition_range::make_singular(mutations[pstart].decorated_key())}, ms, mutation_reader::forwarding::no); } else { test_ckey({dht::partition_range::make({mutations[pstart].decorated_key()}, {mutations[pstart + prange_size - 1].decorated_key()})}, ms, mutation_reader::forwarding::no); } { auto pranges = std::vector(); for (auto current = pstart; current < pstart + prange_size; current++) { pranges.emplace_back(dht::partition_range::make_singular(mutations[current].decorated_key())); } test_ckey(pranges, ms, mutation_reader::forwarding::yes); } if (prange_size > 1) { auto pranges = std::vector(); for (auto current = pstart; current < pstart + prange_size;) { if (current + 1 < pstart + prange_size) { pranges.emplace_back(dht::partition_range::make({mutations[current].decorated_key()}, {mutations[current + 1].decorated_key()})); } else { pranges.emplace_back(dht::partition_range::make_singular(mutations[current].decorated_key())); } current += 2; } test_ckey(pranges, ms, mutation_reader::forwarding::yes); } } } } static void test_streamed_mutation_forwarding_is_consistent_with_slicing(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); // Generates few random mutations and row slices and verifies that using // fast_forward_to() over the slices gives the same mutations as using those // slices in partition_slice without forwarding. random_mutation_generator gen(random_mutation_generator::generate_counters::no, local_shard_only::yes, random_mutation_generator::generate_uncompactable::yes); for (int i = 0; i < 10; ++i) { mutation m = gen(); std::vector ranges = gen.make_random_ranges(10); auto prange = dht::partition_range::make_singular(m.decorated_key()); query::partition_slice full_slice = partition_slice_builder(*m.schema()).build(); query::partition_slice slice_with_ranges = partition_slice_builder(*m.schema()) .with_ranges(ranges) .build(); testlog.info("ranges: {}", ranges); mutation_source ms = populate(m.schema(), {m}, gc_clock::now()); auto sliced_reader = ms.make_mutation_reader(m.schema(), semaphore.make_permit(), prange, slice_with_ranges); auto close_sliced_reader = deferred_close(sliced_reader); auto fwd_reader = ms.make_mutation_reader(m.schema(), semaphore.make_permit(), prange, full_slice, nullptr, streamed_mutation::forwarding::yes); std::vector position_ranges; for (auto& r: ranges) { position_ranges.emplace_back(r); } auto fwd_m = forwardable_reader_to_mutation(std::move(fwd_reader), position_ranges); mutation_opt sliced_m = read_mutation_from_mutation_reader(sliced_reader).get(); BOOST_REQUIRE(bool(sliced_m)); assert_that(*sliced_m).is_equal_to(fwd_m, slice_with_ranges.row_ranges(*m.schema(), m.key())); } } static void test_streamed_mutation_forwarding_guarantees(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema table; schema_ptr s = table.schema(); // mutation will include odd keys auto contains_key = [] (int i) { return i % 2 == 1; }; const int n_keys = 1001; SCYLLA_ASSERT(!contains_key(n_keys - 1)); // so that we can form a range with position greater than all keys mutation m(s, table.make_pkey()); std::vector keys; for (int i = 0; i < n_keys; ++i) { keys.push_back(table.make_ckey(i)); if (contains_key(i)) { table.add_row(m, keys.back(), "value"); } } table.add_static_row(m, "static_value"); mutation_source ms = populate(s, utils::chunked_vector({m}), gc_clock::now()); auto new_stream = [&ms, s, &semaphore, &m] () -> mutation_reader_assertions { testlog.info("Creating new streamed_mutation"); auto res = assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, s->full_slice(), nullptr, streamed_mutation::forwarding::yes)); res.produces_partition_start(m.decorated_key()); return res; }; auto verify_range = [&] (mutation_reader_assertions& sm, int start, int end) { sm.fast_forward_to(keys[start], keys[end]); for (; start < end; ++start) { if (!contains_key(start)) { testlog.trace("skip {:d}", start); continue; } sm.produces_row_with_key(keys[start]); } sm.produces_end_of_stream(); }; // Test cases start here { auto sm = new_stream(); sm.produces_static_row(); sm.produces_end_of_stream(); } { auto sm = new_stream(); sm.fast_forward_to(position_range(query::full_clustering_range)); for (int i = 0; i < n_keys; ++i) { if (contains_key(i)) { sm.produces_row_with_key(keys[i]); } } sm.produces_end_of_stream(); } { auto sm = new_stream(); verify_range(sm, 0, 1); verify_range(sm, 1, 2); verify_range(sm, 2, 4); verify_range(sm, 7, 7); verify_range(sm, 7, 9); verify_range(sm, 11, 15); verify_range(sm, 21, 32); verify_range(sm, 132, 200); verify_range(sm, 300, n_keys - 1); } // Skip before EOS { auto sm = new_stream(); sm.fast_forward_to(keys[0], keys[4]); sm.produces_row_with_key(keys[1]); sm.fast_forward_to(keys[5], keys[8]); sm.produces_row_with_key(keys[5]); sm.produces_row_with_key(keys[7]); sm.produces_end_of_stream(); sm.fast_forward_to(keys[9], keys[12]); sm.fast_forward_to(keys[12], keys[13]); sm.fast_forward_to(keys[13], keys[13]); sm.produces_end_of_stream(); sm.fast_forward_to(keys[13], keys[16]); sm.produces_row_with_key(keys[13]); sm.produces_row_with_key(keys[15]); sm.produces_end_of_stream(); } { auto sm = new_stream(); verify_range(sm, n_keys - 2, n_keys - 1); } { auto sm = new_stream(); verify_range(sm, 0, n_keys - 1); } // Few random ranges auto& rnd = seastar::testing::local_random_engine; std::uniform_int_distribution key_dist{0, n_keys - 1}; for (int i = 0; i < 10; ++i) { std::vector indices; const int n_ranges = 7; for (int j = 0; j < n_ranges * 2; ++j) { indices.push_back(key_dist(rnd)); } std::sort(indices.begin(), indices.end()); auto sm = new_stream(); for (int j = 0; j < n_ranges; ++j) { verify_range(sm, indices[j*2], indices[j*2 + 1]); } } } // Reproduces https://github.com/scylladb/scylla/issues/2733 static void test_fast_forwarding_across_partitions_to_empty_range(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema table; schema_ptr s = table.schema(); utils::chunked_vector partitions; const unsigned ckeys_per_part = 100; auto keys = table.make_pkeys(10); auto missing_key = keys[3]; keys.erase(keys.begin() + 3); auto key_after_all = keys.back(); keys.erase(keys.begin() + (keys.size() - 1)); unsigned next_ckey = 0; for (auto&& key : keys) { mutation m(s, key); sstring val = make_random_string(1024); for (auto i : std::views::iota(0u, ckeys_per_part)) { table.add_row(m, table.make_ckey(next_ckey + i), val); } next_ckey += ckeys_per_part; partitions.push_back(m); } mutation_source ms = populate(s, partitions, gc_clock::now()); auto pr = dht::partition_range::make({keys[0]}, {keys[1]}); auto rd = assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes)); rd.fill_buffer().get(); BOOST_REQUIRE(rd.is_buffer_full()); // if not, increase n_ckeys rd.produces_partition_start(keys[0]) .produces_row_with_key(table.make_ckey(0)) .produces_row_with_key(table.make_ckey(1)); // ...don't finish consumption to leave the reader in the middle of partition pr = dht::partition_range::make({missing_key}, {missing_key}); rd.fast_forward_to(pr); rd.produces_end_of_stream(); pr = dht::partition_range::make({keys[3]}, {keys[3]}); rd.fast_forward_to(pr) .produces_partition_start(keys[3]) .produces_row_with_key(table.make_ckey(ckeys_per_part * 3)) .produces_row_with_key(table.make_ckey(ckeys_per_part * 3 + 1)); rd.next_partition(); rd.produces_end_of_stream(); pr = dht::partition_range::make_starting_with({keys[keys.size() - 1]}); rd.fast_forward_to(pr) .produces_partition_start(keys.back()) .produces_row_with_key(table.make_ckey(ckeys_per_part * (keys.size() - 1))); // ...don't finish consumption to leave the reader in the middle of partition pr = dht::partition_range::make({key_after_all}, {key_after_all}); rd.fast_forward_to(pr) .produces_end_of_stream(); } static void test_streamed_mutation_slicing_returns_only_relevant_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema table; schema_ptr s = table.schema(); mutation m(s, table.make_pkey()); std::vector keys; for (int i = 0; i < 20; ++i) { keys.push_back(table.make_ckey(i)); } auto rt1 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[0], true), query::clustering_range::bound(keys[1], true) )); table.add_row(m, keys[2], "value"); auto rt2 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[3], true), query::clustering_range::bound(keys[4], true) )); table.add_row(m, keys[5], "value"); auto rt3 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[6], true), query::clustering_range::bound(keys[7], true) )); table.add_row(m, keys[8], "value"); auto rt4 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[9], true), query::clustering_range::bound(keys[10], true) )); auto rt5 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[11], true), query::clustering_range::bound(keys[12], true) )); table.add_row(m, keys[10], "value"); auto pr = dht::partition_range::make_singular(m.decorated_key()); mutation_source ms = populate(s, utils::chunked_vector({m}), gc_clock::now()); { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make( query::clustering_range::bound(keys[2], true), query::clustering_range::bound(keys[2], true) )) .with_range(query::clustering_range::make( query::clustering_range::bound(keys[7], true), query::clustering_range::bound(keys[9], true) )) .build(); auto rd = assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), pr, slice)); auto rt3_trimmed = rt3; trim_range_tombstone(*s, rt3_trimmed, slice.row_ranges(*s, m.key())); auto rt4_trimmed = rt4; trim_range_tombstone(*s, rt4_trimmed, slice.row_ranges(*s, m.key())); rd.produces_partition_start(m.decorated_key()); rd.produces_row_with_key(keys[2]); rd.produces_range_tombstone_change({rt3_trimmed.position(), rt3.tomb}); rd.produces_range_tombstone_change({rt3_trimmed.end_position(), {}}); rd.produces_row_with_key(keys[8]); rd.produces_range_tombstone_change({rt4_trimmed.position(), rt4.tomb}); rd.produces_range_tombstone_change({rt4_trimmed.end_position(), {}}); rd.produces_partition_end(); rd.produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make( query::clustering_range::bound(keys[7], true), query::clustering_range::bound(keys[9], true) )) .build(); auto rd = assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), pr, slice)); auto rt3_trimmed = rt3; trim_range_tombstone(*s, rt3_trimmed, slice.row_ranges(*s, m.key())); auto rt4_trimmed = rt4; trim_range_tombstone(*s, rt4_trimmed, slice.row_ranges(*s, m.key())); rd.produces_partition_start(m.decorated_key()) .produces_range_tombstone_change({rt3_trimmed.position(), rt3.tomb}) .produces_range_tombstone_change({rt3_trimmed.end_position(), {}}) .produces_row_with_key(keys[8]) .produces_range_tombstone_change({rt4_trimmed.position(), rt4.tomb}) .produces_range_tombstone_change({rt4_trimmed.end_position(), {}}) .produces_partition_end() .produces_end_of_stream(); } } static void test_streamed_mutation_forwarding_across_range_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema table; schema_ptr s = table.schema(); mutation m(s, table.make_pkey()); std::vector keys; for (int i = 0; i < 20; ++i) { keys.push_back(table.make_ckey(i)); } auto rt1 = table.delete_range(m, query::clustering_range::make( query::clustering_range::bound(keys[0], true), query::clustering_range::bound(keys[1], false) )); table.add_row(m, keys[2], "value"); auto rt2_range = query::clustering_range::make( query::clustering_range::bound(keys[3], true), query::clustering_range::bound(keys[6], true) ); auto rt2 = table.delete_range(m, rt2_range); table.add_row(m, keys[4], "value"); auto rt3_range = query::clustering_range::make( query::clustering_range::bound(keys[7], true), query::clustering_range::bound(keys[8], true) ); auto rt3 = table.delete_range(m, rt3_range); auto rt4_range = query::clustering_range::make( query::clustering_range::bound(keys[9], true), query::clustering_range::bound(keys[10], true) ); auto rt4 = table.delete_range(m, rt4_range); auto rt5_range = query::clustering_range::make( query::clustering_range::bound(keys[11], true), query::clustering_range::bound(keys[13], true) ); auto rt5 = table.delete_range(m, rt5_range); mutation_source ms = populate(s, utils::chunked_vector({m}), gc_clock::now()); auto rd = assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, s->full_slice(), nullptr, streamed_mutation::forwarding::yes)); rd.produces_partition_start(m.decorated_key()); auto ff0 = query::clustering_range::make( query::clustering_range::bound(keys[1], true), query::clustering_range::bound(keys[2], true) ); rd.fast_forward_to(position_range(ff0)); // there might be a dummy rt [{keys[1],before},{keys[1],before}] rd.may_produce_tombstones(position_range(ff0)); rd.produces_row_with_key(keys[2]); auto ff1 = query::clustering_range::make( query::clustering_range::bound(keys[4], true), query::clustering_range::bound(keys[8], false) ); rd.fast_forward_to(position_range(ff1)); auto rt2_trimmed = rt2; trim_range_tombstone(*s, rt2_trimmed, {ff1}); rd.produces_range_tombstone_change({rt2_trimmed.position(), rt2.tomb}); rd.produces_row_with_key(keys[4]); rd.produces_range_tombstone_change({rt2_trimmed.end_position(), {}}); auto rt3_trimmed = rt3; trim_range_tombstone(*s, rt3_trimmed, {ff1}); rd.produces_range_tombstone_change({rt3_trimmed.position(), rt3.tomb}); rd.produces_range_tombstone_change({rt3_trimmed.end_position(), {}}); auto ff2 = query::clustering_range::make( query::clustering_range::bound(keys[10], true), query::clustering_range::bound(keys[12], false) ); rd.fast_forward_to(position_range(ff2)); auto rt4_trimmed = rt4; trim_range_tombstone(*s, rt4_trimmed, {ff2}); auto rt5_trimmed = rt5; trim_range_tombstone(*s, rt5_trimmed, {ff2}); rd.produces_range_tombstone_change({rt4_trimmed.position(), rt4.tomb}); rd.produces_range_tombstone_change({rt4_trimmed.end_position(), {}}); rd.produces_range_tombstone_change({rt5_trimmed.position(), rt5.tomb}); rd.produces_range_tombstone_change({rt5_trimmed.end_position(), {}}); rd.produces_end_of_stream(); rd.fast_forward_to(position_range(query::clustering_range::make( query::clustering_range::bound(keys[14], true), query::clustering_range::bound(keys[15], false) ))); rd.produces_end_of_stream(); rd.fast_forward_to(position_range(query::clustering_range::make( query::clustering_range::bound(keys[15], true), query::clustering_range::bound(keys[16], false) ))); rd.produces_end_of_stream(); } static void test_range_queries(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); auto s = schema_builder("ks", "cf") .with_column("key", bytes_type, column_kind::partition_key) .with_column("v", bytes_type) .build(); auto make_partition_mutation = [s] (dht::decorated_key key) -> mutation { mutation m(s, std::move(key)); m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1); return m; }; int partition_count = 300; auto keys = tests::generate_partition_keys(partition_count, s); utils::chunked_vector sorted_partitions; for (int i = 0; i < partition_count; ++i) { sorted_partitions.emplace_back( make_partition_mutation(keys[i])); } std::sort(sorted_partitions.begin(), sorted_partitions.end(), mutation_decorated_key_less_comparator()); require_no_token_duplicates(sorted_partitions); dht::decorated_key key_before_all = sorted_partitions.front().decorated_key(); dht::decorated_key key_after_all = sorted_partitions.back().decorated_key(); utils::chunked_vector partitions; std::move(std::make_move_iterator(sorted_partitions.begin()) + 1, std::make_move_iterator(sorted_partitions.end()) - 1, std::back_inserter(partitions)); auto ds = populate(s, partitions, gc_clock::now()); auto test_slice = [&] (dht::partition_range r) { testlog.info("Testing range {}", r); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), r)) .produces(slice(partitions, r)) .produces_end_of_stream(); }; auto inclusive_token_range = [&] (size_t start, size_t end) { return dht::partition_range::make( {dht::ring_position::starting_at(partitions[start].token())}, {dht::ring_position::ending_at(partitions[end].token())}); }; test_slice(dht::partition_range::make( {key_before_all, true}, {partitions.front().decorated_key(), true})); test_slice(dht::partition_range::make( {key_before_all, false}, {partitions.front().decorated_key(), true})); test_slice(dht::partition_range::make( {key_before_all, false}, {partitions.front().decorated_key(), false})); test_slice(dht::partition_range::make( {dht::ring_position::starting_at(key_before_all.token())}, {dht::ring_position::ending_at(partitions.front().token())})); test_slice(dht::partition_range::make( {dht::ring_position::ending_at(key_before_all.token())}, {dht::ring_position::ending_at(partitions.front().token())})); test_slice(dht::partition_range::make( {dht::ring_position::ending_at(key_before_all.token())}, {dht::ring_position::starting_at(partitions.front().token())})); test_slice(dht::partition_range::make( {partitions.back().decorated_key(), true}, {key_after_all, true})); test_slice(dht::partition_range::make( {partitions.back().decorated_key(), true}, {key_after_all, false})); test_slice(dht::partition_range::make( {partitions.back().decorated_key(), false}, {key_after_all, false})); test_slice(dht::partition_range::make( {dht::ring_position::starting_at(partitions.back().token())}, {dht::ring_position::ending_at(key_after_all.token())})); test_slice(dht::partition_range::make( {dht::ring_position::starting_at(partitions.back().token())}, {dht::ring_position::starting_at(key_after_all.token())})); test_slice(dht::partition_range::make( {dht::ring_position::ending_at(partitions.back().token())}, {dht::ring_position::starting_at(key_after_all.token())})); test_slice(dht::partition_range::make( {partitions[0].decorated_key(), false}, {partitions[1].decorated_key(), true})); test_slice(dht::partition_range::make( {partitions[0].decorated_key(), true}, {partitions[1].decorated_key(), false})); test_slice(dht::partition_range::make( {partitions[1].decorated_key(), true}, {partitions[3].decorated_key(), false})); test_slice(dht::partition_range::make( {partitions[1].decorated_key(), false}, {partitions[3].decorated_key(), true})); test_slice(dht::partition_range::make_ending_with( {partitions[3].decorated_key(), true})); test_slice(dht::partition_range::make_starting_with( {partitions[partitions.size() - 4].decorated_key(), true})); test_slice(inclusive_token_range(0, 0)); test_slice(inclusive_token_range(1, 1)); test_slice(inclusive_token_range(2, 4)); test_slice(inclusive_token_range(127, 128)); test_slice(inclusive_token_range(128, 128)); test_slice(inclusive_token_range(128, 129)); test_slice(inclusive_token_range(127, 129)); test_slice(inclusive_token_range(partitions.size() - 1, partitions.size() - 1)); test_slice(inclusive_token_range(0, partitions.size() - 1)); test_slice(inclusive_token_range(0, partitions.size() - 2)); test_slice(inclusive_token_range(0, partitions.size() - 3)); test_slice(inclusive_token_range(0, partitions.size() - 128)); test_slice(inclusive_token_range(1, partitions.size() - 1)); test_slice(inclusive_token_range(2, partitions.size() - 1)); test_slice(inclusive_token_range(3, partitions.size() - 1)); test_slice(inclusive_token_range(128, partitions.size() - 1)); } void test_all_data_is_read_back(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); const auto query_time = gc_clock::now(); for_each_mutation([&semaphore, &populate, query_time] (const mutation& m) mutable { auto ms = populate(m.schema(), {m}, query_time); mutation copy(m); copy.partition().compact_for_compaction(*copy.schema(), always_gc, copy.decorated_key(), query_time, tombstone_gc_state::for_tests()); assert_that(ms.make_mutation_reader(m.schema(), semaphore.make_permit())).produces_compacted(copy, query_time); }); } void test_mutation_reader_fragments_have_monotonic_positions(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); for_each_mutation([&semaphore, &populate] (const mutation& m) { auto ms = populate(m.schema(), {m}, gc_clock::now()); auto rd = ms.make_mutation_reader(m.schema(), semaphore.make_permit()); assert_that(std::move(rd)).has_monotonic_positions(); }); } static void test_time_window_clustering_slicing(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema ss; auto s = schema_builder(ss.schema()) .set_compaction_strategy(compaction::compaction_strategy_type::time_window) .build(); auto pkey = ss.make_pkey(); mutation m1(s, pkey); m1.partition().apply(ss.new_tombstone()); ss.add_static_row(m1, "s"); ss.add_row(m1, ss.make_ckey(0), "v1"); mutation_source ms = populate(s, {m1}, gc_clock::now()); // query row outside the range of existing rows to exercise sstable clustering key filter { auto slice = partition_slice_builder(*s) .with_range(ss.make_ckey_range(1, 2)) .build(); auto prange = dht::partition_range::make_singular(pkey); assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s, pkey.key())) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(ss.make_ckey(0))) .build(); auto prange = dht::partition_range::make_singular(pkey); assert_that(ms.make_mutation_reader(s, semaphore.make_permit(), prange, slice)) .produces(m1) .produces_end_of_stream(); } } static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); auto s = schema_builder("ks", "cf") .with_column("key", bytes_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("c2", int32_type, column_kind::clustering_key) .with_column("c3", int32_type, column_kind::clustering_key) .with_column("v", bytes_type) .build(); auto make_ck = [&] (int ck1, std::optional ck2 = std::nullopt, std::optional ck3 = std::nullopt) { std::vector components; components.push_back(data_value(ck1)); if (ck2) { components.push_back(data_value(ck2)); } if (ck3) { components.push_back(data_value(ck3)); } return clustering_key::from_deeply_exploded(*s, components); }; auto partition_count = 3; auto keys = tests::generate_partition_keys(partition_count, s); std::sort(keys.begin(), keys.end(), dht::ring_position_less_comparator(*s)); auto pk = keys[1]; auto make_row = [&] (clustering_key k, int v) { mutation m(s, pk); m.set_clustered_cell(k, "v", data_value(bytes("v1")), v); return m; }; auto make_delete = [&] (const query::clustering_range& r) { mutation m(s, pk); auto bv_range = bound_view::from_range(r); range_tombstone rt(bv_range.first, bv_range.second, tombstone(new_timestamp(), gc_clock::now())); m.partition().apply_delete(*s, rt); return m; }; auto ck1 = make_ck(1, 1, 1); auto ck2 = make_ck(1, 1, 2); auto ck3 = make_ck(1, 2, 1); auto ck4 = make_ck(1, 2, 2); auto ck5 = make_ck(1, 3, 1); auto ck6 = make_ck(2, 1, 1); auto ck7 = make_ck(2, 1, 2); auto ck8 = make_ck(3, 1, 1); mutation row1 = make_row(ck1, 1); mutation row2 = make_row(ck2, 2); mutation row3 = make_row(ck3, 3); mutation row4 = make_row(ck4, 4); mutation del_1 = make_delete(query::clustering_range::make({make_ck(1, 2, 1), true}, {make_ck(2, 0, 0), true})); mutation row5 = make_row(ck5, 5); mutation del_2 = make_delete(query::clustering_range::make({make_ck(2, 1), true}, {make_ck(2), true})); mutation row6 = make_row(ck6, 6); mutation row7 = make_row(ck7, 7); mutation del_3 = make_delete(query::clustering_range::make({make_ck(3), true}, {make_ck(3), true})); mutation row8 = make_row(ck8, 8); mutation m = row1 + row2 + row3 + row4 + row5 + row6 + row7 + del_1 + del_2 + row8 + del_3; mutation_source ds = populate(s, {m}, gc_clock::now()); auto pr = dht::partition_range::make_singular(pk); { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(0))) .build(); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice)) .produces_eos_or_empty_mutation(); } { auto slice = partition_slice_builder(*s) .build(); auto rd = assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice, nullptr, streamed_mutation::forwarding::yes)); rd.produces_partition_start(pk) .fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(*s, ck2))) .produces_row_with_key(ck1) .produces_row_with_key(ck2) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .build(); auto rd = assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice, nullptr, streamed_mutation::forwarding::yes)); rd.produces_partition_start(pk) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(*s, ck2))) .produces_row_with_key(ck1) .produces_row_with_key(ck2) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(1))) .build(); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice)) .produces(row1 + row2 + row3 + row4 + row5 + del_1, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(2))) .build(); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice)) .produces(row6 + row7 + del_1 + del_2, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(1, 2))) .build(); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice)) .produces(row3 + row4 + del_1, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s) .with_range(query::clustering_range::make_singular(make_ck(3))) .build(); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, slice)) .produces(row8 + del_3, slice.row_ranges(*s, pk.key())) .produces_end_of_stream(); } // Test out-of-range partition keys { auto pr = dht::partition_range::make_singular(keys[0]); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice())) .produces_eos_or_empty_mutation(); } { auto pr = dht::partition_range::make_singular(keys[2]); assert_that(ds.make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice())) .produces_eos_or_empty_mutation(); } } static void test_query_only_static_row(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s; auto pkeys = s.make_pkeys(1); mutation m1(s.schema(), pkeys[0]); m1.partition().apply(s.new_tombstone()); s.add_static_row(m1, "s1"); s.add_row(m1, s.make_ckey(0), "v1"); s.add_row(m1, s.make_ckey(1), "v2"); mutation_source ms = populate(s.schema(), {m1}, gc_clock::now()); // fully populate cache { auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, s.schema()->full_slice())) .produces(m1) .produces_end_of_stream(); } // query just a static row { auto slice = partition_slice_builder(*s.schema()) .with_ranges({}) .build(); auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } // query just a static row, single-partition case { auto slice = partition_slice_builder(*s.schema()) .with_ranges({}) .build(); auto prange = dht::partition_range::make_singular(m1.decorated_key()); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } } static void test_query_no_clustering_ranges_no_static_columns(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s(simple_schema::with_static::no); auto pkeys = s.make_pkeys(1); mutation m1(s.schema(), pkeys[0]); m1.partition().apply(s.new_tombstone()); s.add_row(m1, s.make_ckey(0), "v1"); s.add_row(m1, s.make_ckey(1), "v2"); mutation_source ms = populate(s.schema(), {m1}, gc_clock::now()); { auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, s.schema()->full_slice())) .produces(m1) .produces_end_of_stream(); } // multi-partition case { auto slice = partition_slice_builder(*s.schema()) .with_ranges({}) .build(); auto prange = dht::partition_range::make_ending_with(dht::ring_position(m1.decorated_key())); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } // single-partition case { auto slice = partition_slice_builder(*s.schema()) .with_ranges({}) .build(); auto prange = dht::partition_range::make_singular(m1.decorated_key()); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), prange, slice)) .produces(m1, slice.row_ranges(*s.schema(), m1.key())) .produces_end_of_stream(); } } void test_streamed_mutation_forwarding_succeeds_with_no_data(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s; auto cks = s.make_ckeys(6); auto pkey = s.make_pkey(); mutation m(s.schema(), pkey); s.add_row(m, cks[0], "data"); auto source = populate(s.schema(), {m}, gc_clock::now()); assert_that(source.make_mutation_reader(s.schema(), semaphore.make_permit(), query::full_partition_range, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes )) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[0]), position_in_partition::before_key(cks[1]))) .produces_row_with_key(cks[0]) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[1]), position_in_partition::before_key(cks[3]))) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[4]), position_in_partition::before_key(cks[5]))) .produces_end_of_stream() .next_partition() .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[0]), position_in_partition::before_key(cks[1]))) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[1]), position_in_partition::before_key(cks[3]))) .produces_end_of_stream() .fast_forward_to(position_range(position_in_partition::for_key(cks[4]), position_in_partition::before_key(cks[5]))) .produces_end_of_stream(); } static void test_slicing_with_overlapping_range_tombstones(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema ss; auto s = ss.schema(); auto rt1 = ss.make_range_tombstone(ss.make_ckey_range(1, 10)); auto rt2 = ss.make_range_tombstone(ss.make_ckey_range(1, 5)); // rt1 + rt2 = {[1, 5], (5, 10]} auto key = tests::generate_partition_key(s); mutation m1(s, key); m1.partition().apply_delete(*s, rt1); mutation m2(s, key); m2.partition().apply_delete(*s, rt2); ss.add_row(m2, ss.make_ckey(4), "v2"); // position after rt2.position() but before rt2.end_position(). mutation_source ds = populate(s, {m1, m2}, gc_clock::now()); // upper bound ends before the row in m2, so that the raw is fetched after next fast forward. auto range = ss.make_ckey_range(0, 3); { auto slice = partition_slice_builder(*s).with_range(range).build(); auto rd = ds.make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice); auto close_rd = deferred_close(rd); auto prange = position_range(range); mutation_rebuilder_v2 rebuilder(s); rd.consume_pausable([&] (mutation_fragment_v2&& mf) { testlog.trace("mf: {}", mutation_fragment_v2::printer(*s, mf)); if (mf.position().is_clustering_row() && !prange.contains(*s, mf.position())) { testlog.trace("m1: {}", m1); testlog.trace("m2: {}", m2); BOOST_FAIL(format("Received row which is not relevant for the slice: {}, slice: {}", mutation_fragment_v2::printer(*s, mf), prange)); } return rebuilder.consume(std::move(mf)); }).get(); auto result = *rebuilder.consume_end_of_stream(); assert_that(result).is_equal_to(m1 + m2, query::clustering_row_ranges({range})); } // Check fast_forward_to() { auto rd = ds.make_fragment_v1_stream(s, semaphore.make_permit(), query::full_partition_range, s->full_slice(), nullptr, streamed_mutation::forwarding::yes); auto close_rd = deferred_close(rd); auto prange = position_range(range); mutation result(m1.schema(), m1.decorated_key()); rd.consume_pausable([&](mutation_fragment&& mf) { BOOST_REQUIRE(!mf.position().has_clustering_key()); result.partition().apply(*s, std::move(mf)); return stop_iteration::no; }).get(); rd.fast_forward_to(prange).get(); position_in_partition last_pos = position_in_partition::before_all_clustered_rows(); auto consume_clustered = [&] (mutation_fragment&& mf) { position_in_partition::less_compare less(*s); if (less(mf.position(), last_pos)) { BOOST_FAIL(format("Out of order fragment: {}, last pos: {}", mutation_fragment::printer(*s, mf), last_pos)); } last_pos = position_in_partition(mf.position()); result.partition().apply(*s, std::move(mf)); return stop_iteration::no; }; rd.consume_pausable(consume_clustered).get(); rd.fast_forward_to(position_range(prange.end(), position_in_partition::after_all_clustered_rows())).get(); rd.consume_pausable(consume_clustered).get(); assert_that(result).is_equal_to(m1 + m2); } } void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s; auto pkey = s.make_pkey(); utils::chunked_vector mutations; mutation m(s.schema(), pkey); s.add_row(m, s.make_ckey(0), "v1"); auto t1 = s.new_tombstone(); s.delete_range(m, s.make_ckey_range(1, 10), t1); s.add_row(m, s.make_ckey(5), "v2"); auto t2 = s.new_tombstone(); s.delete_range(m, s.make_ckey_range(7, 12), t2); s.add_row(m, s.make_ckey(15), "v2"); auto t3 = s.new_tombstone(); s.delete_range(m, s.make_ckey_range(17, 19), t3); mutations.push_back(std::move(m)); auto ms = populate(s.schema(), mutations, gc_clock::now()); auto pr = dht::partition_range::make_singular(pkey); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit())) .next_partition() // Does nothing before first partition .produces_partition_start(pkey) .produces_row_with_key(s.make_ckey(0)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1)) .produces_row_with_key(s.make_ckey(5)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(7)), t2)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(12)), tombstone())) .produces_row_with_key(s.make_ckey(15)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(17)), t3)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(19)), tombstone())) .produces_partition_end() .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::after_key(*s.schema(), s.make_ckey(0)), position_in_partition::before_key(s.make_ckey(2)))) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {})) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(5)), position_in_partition::after_key(*s.schema(), s.make_ckey(5)))) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(5)), t1)) .produces_row_with_key(s.make_ckey(5)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(5)), {})) .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::after_key(*s.schema(), s.make_ckey(0)), position_in_partition::for_key(s.make_ckey(2)))) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {})) .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(0)), position_in_partition::before_key(s.make_ckey(1)))) .produces_row_with_key(s.make_ckey(0)) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(1)), position_in_partition::before_key(s.make_ckey(2)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(1)), t1}) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(2)), {}}) .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(1)), position_in_partition::before_key(s.make_ckey(6)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(1)), t1}) .produces_row_with_key(s.make_ckey(5)) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(6)), {}}) .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(6)), position_in_partition::before_key(s.make_ckey(7)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(6)), t1}) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(7)), {}}) .produces_end_of_stream(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, s.schema()->full_slice(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no)) .produces_partition_start(pkey) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(6)), position_in_partition::before_key(s.make_ckey(8)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(6)), t1}) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(7)), t2}) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(8)), {}}) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(9)), position_in_partition::before_key(s.make_ckey(10)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(9)), t2}) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(10)), {}}) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(10)), position_in_partition::before_key(s.make_ckey(13)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(10)), t2}) .produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(12)), {}}) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(16)), position_in_partition::after_key(*s.schema(), s.make_ckey(16)))) .produces_end_of_stream() .fast_forward_to(position_range( position_in_partition::before_key(s.make_ckey(17)), position_in_partition::after_key(*s.schema(), s.make_ckey(18)))) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(17)), t3}) .produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(18)), {}}) .produces_end_of_stream(); // Slicing using query restrictions { auto slice = partition_slice_builder(*s.schema()) .with_range(s.make_ckey_range(16, 18)) .build(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, slice)) .produces_partition_start(pkey) .produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(17)), t3}) .produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(18)), {}}) .produces_partition_end() .produces_end_of_stream(); } { auto slice = partition_slice_builder(*s.schema()) .with_range(s.make_ckey_range(0, 3)) .with_range(s.make_ckey_range(8, 11)) .build(); assert_that(ms.make_mutation_reader(s.schema(), semaphore.make_permit(), pr, slice)) .produces_partition_start(pkey) .produces_row_with_key(s.make_ckey(0)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(3)), {})) .produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(8)), t2)) .produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), {})) .produces_partition_end() .produces_end_of_stream(); } } void test_reader_conversions(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); for_each_mutation([&] (const mutation& m) mutable { utils::chunked_vector mutations = { m }; auto ms = populate(m.schema(), mutations, gc_clock::now()); // Query time must be fetched after populate. If compaction is executed // during populate it may be executed with timestamp later than query_time. // This would cause the compaction below and compaction during flush to // be executed at different time points producing different // results. The result would be sporadic test failures depending on relative // timing of those operations. If no other mutations happen after populate, // and query_time is later than the compaction time during population, we're // guaranteed to have the same results. const auto query_time = gc_clock::now(); mutation m_compacted(m); m_compacted.partition().compact_for_compaction(*m_compacted.schema(), always_gc, m_compacted.decorated_key(), query_time, tombstone_gc_state::for_tests()); { auto rd = ms.make_fragment_v1_stream(m.schema(), semaphore.make_permit()); auto close_rd = deferred_close(rd); match_compacted_mutation(read_mutation_from_mutation_reader(rd).get(), m_compacted, query_time); } }); } void test_next_partition(tests::reader_concurrency_semaphore_wrapper&, populate_fn_ex); void run_mutation_reader_tests_basic(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); test_range_tombstones_v2(semaphore, populate); test_time_window_clustering_slicing(semaphore, populate); test_clustering_slices(semaphore, populate); test_streamed_mutation_forwarding_across_range_tombstones(semaphore, populate); test_streamed_mutation_forwarding_guarantees(semaphore, populate); test_streamed_mutation_slicing_returns_only_relevant_tombstones(semaphore, populate); test_streamed_mutation_forwarding_is_consistent_with_slicing(semaphore, populate); test_range_queries(semaphore, populate); test_query_only_static_row(semaphore, populate); test_query_no_clustering_ranges_no_static_columns(semaphore, populate); test_next_partition(semaphore, populate); test_streamed_mutation_forwarding_succeeds_with_no_data(semaphore, populate); test_slicing_with_overlapping_range_tombstones(semaphore, populate); if (with_partition_range_forwarding) { test_fast_forwarding_across_partitions_to_empty_range(semaphore, populate); test_slicing_and_fast_forwarding(semaphore, populate); } } void run_mutation_reader_tests_all(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; run_mutation_reader_tests_basic(semaphore, populate, with_partition_range_forwarding); // Most of the time is spent in the following tests test_reader_conversions(semaphore, populate); test_mutation_reader_fragments_have_monotonic_positions(semaphore, populate); test_all_data_is_read_back(semaphore, populate); } void test_next_partition(tests::reader_concurrency_semaphore_wrapper& semaphore, populate_fn_ex populate) { testlog.info(__PRETTY_FUNCTION__); simple_schema s; auto pkeys = s.make_pkeys(4); utils::chunked_vector mutations; for (auto key : pkeys) { mutation m(s.schema(), key); s.add_static_row(m, "s1"); s.add_row(m, s.make_ckey(0), "v1"); s.add_row(m, s.make_ckey(1), "v2"); mutations.push_back(std::move(m)); } auto source = populate(s.schema(), mutations, gc_clock::now()); assert_that(source.make_mutation_reader(s.schema(), semaphore.make_permit())) .next_partition() // Does nothing before first partition .produces_partition_start(pkeys[0]) .produces_static_row() .produces_row_with_key(s.make_ckey(0)) .produces_row_with_key(s.make_ckey(1)) .produces_partition_end() .next_partition() // Does nothing between partitions .produces_partition_start(pkeys[1]) .next_partition() // Moves to next partition .produces_partition_start(pkeys[2]) .produces_static_row() .next_partition() .produces_partition_start(pkeys[3]) .produces_static_row() .produces_row_with_key(s.make_ckey(0)) .next_partition() .produces_end_of_stream(); } void run_mutation_source_tests(populate_fn populate, bool with_partition_range_forwarding) { auto populate_ex = [populate = std::move(populate)] (schema_ptr s, const utils::chunked_vector& muts, gc_clock::time_point) { return populate(std::move(s), muts); }; run_mutation_source_tests(std::move(populate_ex), with_partition_range_forwarding); } void run_mutation_source_tests(populate_fn_ex populate, bool with_partition_range_forwarding) { run_mutation_source_tests_plain(populate, with_partition_range_forwarding); run_mutation_source_tests_reverse(populate, with_partition_range_forwarding); // Some tests call the sub-types individually, mind checking them // if adding new stuff here } void run_mutation_source_tests_plain(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); run_mutation_reader_tests_all(populate, with_partition_range_forwarding); } void run_mutation_source_tests_plain_basic(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; run_mutation_reader_tests_basic(semaphore, populate, with_partition_range_forwarding); } void run_mutation_source_tests_plain_reader_conversion(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_reader_conversions(semaphore, populate); } void run_mutation_source_tests_plain_fragments_monotonic(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_mutation_reader_fragments_have_monotonic_positions(semaphore, populate); } void run_mutation_source_tests_plain_read_back(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_all_data_is_read_back(semaphore, populate); } // read in reverse static mutation_source make_mutation_source(populate_fn_ex populate, schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) { auto table_schema = s->make_reversed(); utils::chunked_vector reversed_mutations; reversed_mutations.reserve(m.size()); for (const auto& mut : m) { reversed_mutations.emplace_back(reverse(mut)); } auto ms = populate(table_schema, reversed_mutations, t); return mutation_source([table_schema, ms = std::move(ms), reversed_slices = std::list()] ( schema_ptr query_schema, reader_permit permit, const dht::partition_range& pr, const query::partition_slice& slice, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, mutation_reader::forwarding mr_fwd) mutable { // Note that the clustering ranges of the provided slice are already reversed in relation to the table_schema // above. Thus toggling its reverse option is all that needs to be done here. reversed_slices.emplace_back(partition_slice_builder(*table_schema, slice) .with_option() .build()); return ms.make_mutation_reader(query_schema, std::move(permit), pr, reversed_slices.back(), tr, fwd, mr_fwd); }); } void run_mutation_source_tests_reverse(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); run_mutation_reader_tests_all([&populate] (schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) -> mutation_source { return make_mutation_source(populate, s, m, t); }, false); // FIXME: pass with_partition_range_forwarding after all natively reversing sources have fast-forwarding support } void run_mutation_source_tests_reverse_basic(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; run_mutation_reader_tests_basic(semaphore, [&populate] (schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) -> mutation_source { return make_mutation_source(populate, s, m, t); }, false); // FIXME: pass with_partition_range_forwarding after all natively reversing sources have fast-forwarding support } void run_mutation_source_tests_reverse_reader_conversion(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_reader_conversions(semaphore, [&populate] (schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) -> mutation_source { return make_mutation_source(populate, s, m, t); }); } void run_mutation_source_tests_reverse_fragments_monotonic(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_mutation_reader_fragments_have_monotonic_positions(semaphore, [&populate] (schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) -> mutation_source { return make_mutation_source(populate, s, m, t); }); } void run_mutation_source_tests_reverse_read_back(populate_fn_ex populate, bool with_partition_range_forwarding) { testlog.info(__PRETTY_FUNCTION__); tests::reader_concurrency_semaphore_wrapper semaphore; test_all_data_is_read_back(semaphore, [&populate] (schema_ptr s, const utils::chunked_vector& m, gc_clock::time_point t) -> mutation_source { return make_mutation_source(populate, s, m, t); }); } struct mutation_sets { std::vector> equal; std::vector> unequal; mutation_sets(){} }; static tombstone new_tombstone() { return { new_timestamp(), gc_clock::now() + std::chrono::hours(10) }; } static mutation_sets generate_mutation_sets() { using mutations = utils::chunked_vector; mutation_sets result; { auto common_schema = schema_builder("ks", "test") .with_column("pk_col", bytes_type, column_kind::partition_key) .with_column("ck_col_1", bytes_type, column_kind::clustering_key) .with_column("ck_col_2", bytes_type, column_kind::clustering_key) .with_column("regular_col_1", bytes_type) .with_column("regular_col_2", bytes_type) .with_column("static_col_1", bytes_type, column_kind::static_column) .with_column("static_col_2", bytes_type, column_kind::static_column); auto s1 = common_schema .with_column("regular_col_1_s1", bytes_type) // will have id in between common columns .build(); auto s2 = common_schema .with_column("regular_col_1_s2", bytes_type) // will have id in between common columns .build(); auto local_keys = tests::generate_partition_keys(2, s1); // use only one schema as s1 and s2 don't differ in representation. auto& key1 = local_keys[0]; auto& key2 = local_keys[1]; // Differing keys result.unequal.emplace_back(mutations{ mutation(s1, key1), mutation(s2, key2) }); auto m1 = mutation(s1, key1); auto m2 = mutation(s2, key1); result.equal.emplace_back(mutations{m1, m2}); clustering_key ck1 = clustering_key::from_deeply_exploded(*s1, {data_value(bytes("ck1_0")), data_value(bytes("ck1_1"))}); clustering_key ck2 = clustering_key::from_deeply_exploded(*s1, {data_value(bytes("ck2_0")), data_value(bytes("ck2_1"))}); auto ttl = gc_clock::duration(10000); // Note: large value to avoid deletion on tests ignoring query time { auto tomb = new_tombstone(); m1.partition().apply(tomb); result.unequal.emplace_back(mutations{m1, m2}); m2.partition().apply(tomb); result.equal.emplace_back(mutations{m1, m2}); } { auto tomb = new_tombstone(); auto key = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0"))}); m1.partition().apply_row_tombstone(*s1, key, tomb); result.unequal.emplace_back(mutations{m1, m2}); m2.partition().apply_row_tombstone(*s2, key, tomb); result.equal.emplace_back(mutations{m1, m2}); } { auto tomb = new_tombstone(); m1.partition().apply_delete(*s1, ck2, tomb); result.unequal.emplace_back(mutations{m1, m2}); m2.partition().apply_delete(*s2, ck2, tomb); result.equal.emplace_back(mutations{m1, m2}); } { // Add a row which falls under the tombstone prefix. auto ts = new_timestamp(); auto key_full = clustering_key_prefix::from_deeply_exploded(*s1, {data_value(bytes("ck2_0")), data_value(bytes("ck1_1")), }); m1.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl); result.unequal.emplace_back(mutations{m1, m2}); m2.set_clustered_cell(key_full, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_clustered_cell(ck1, "regular_col_1", data_value(bytes("regular_col_value")), ts, ttl); result.unequal.emplace_back(mutations{m1, m2}); m2.set_clustered_cell(ck1, "regular_col_1", data_value(bytes("regular_col_value")), ts, ttl); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_clustered_cell(ck1, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl); result.unequal.emplace_back(mutations{m1, m2}); m2.set_clustered_cell(ck1, "regular_col_2", data_value(bytes("regular_col_value")), ts, ttl); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.partition().apply_insert(*s1, ck2, ts); result.unequal.emplace_back(mutations{m1, m2}); m2.partition().apply_insert(*s2, ck2, ts); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_clustered_cell(ck2, "regular_col_1", data_value(bytes("ck2_regular_col_1_value")), ts); result.unequal.emplace_back(mutations{m1, m2}); m2.set_clustered_cell(ck2, "regular_col_1", data_value(bytes("ck2_regular_col_1_value")), ts); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_static_cell("static_col_1", data_value(bytes("static_col_value")), ts, ttl); result.unequal.emplace_back(mutations{m1, m2}); m2.set_static_cell("static_col_1", data_value(bytes("static_col_value")), ts, ttl); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_static_cell("static_col_2", data_value(bytes("static_col_value")), ts); result.unequal.emplace_back(mutations{m1, m2}); m2.set_static_cell("static_col_2", data_value(bytes("static_col_value")), ts); result.equal.emplace_back(mutations{m1, m2}); } { m1.partition().ensure_last_dummy(*m1.schema()); result.equal.emplace_back(mutations{m1, m2}); m2.partition().ensure_last_dummy(*m2.schema()); result.equal.emplace_back(mutations{m1, m2}); } { auto ts = new_timestamp(); m1.set_clustered_cell(ck2, "regular_col_1_s1", data_value(bytes("x")), ts); result.unequal.emplace_back(mutations{m1, m2}); m2.set_clustered_cell(ck2, "regular_col_1_s2", data_value(bytes("x")), ts); result.unequal.emplace_back(mutations{m1, m2}); } } static constexpr auto rmg_iterations = 10; { random_mutation_generator gen(random_mutation_generator::generate_counters::no, local_shard_only::yes, random_mutation_generator::generate_uncompactable::yes); for (int i = 0; i < rmg_iterations; ++i) { auto m = gen(); result.unequal.emplace_back(mutations{m, gen()}); // collision unlikely result.equal.emplace_back(mutations{m, m}); } } { random_mutation_generator gen(random_mutation_generator::generate_counters::yes, local_shard_only::yes, random_mutation_generator::generate_uncompactable::yes); for (int i = 0; i < rmg_iterations; ++i) { auto m = gen(); result.unequal.emplace_back(mutations{m, gen()}); // collision unlikely result.equal.emplace_back(mutations{m, m}); } } return result; } static const mutation_sets& get_mutation_sets() { static thread_local const auto ms = generate_mutation_sets(); return ms; } void for_each_mutation_pair(std::function callback) { auto&& ms = get_mutation_sets(); for (auto&& mutations : ms.equal) { auto i = mutations.begin(); SCYLLA_ASSERT(i != mutations.end()); const mutation& first = *i++; while (i != mutations.end()) { callback(first, *i, are_equal::yes); ++i; } } for (auto&& mutations : ms.unequal) { auto i = mutations.begin(); SCYLLA_ASSERT(i != mutations.end()); const mutation& first = *i++; while (i != mutations.end()) { callback(first, *i, are_equal::no); ++i; } } } void for_each_mutation(std::function callback) { auto&& ms = get_mutation_sets(); for (auto&& mutations : ms.equal) { for (auto&& m : mutations) { callback(m); } } for (auto&& mutations : ms.unequal) { for (auto&& m : mutations) { callback(m); } } } bytes make_blob(size_t blob_size) { return tests::random::get_bytes(blob_size); }; class random_mutation_generator::impl { enum class timestamp_level { partition_tombstone = 0, range_tombstone = 1, row_shadowable_tombstone = 2, row_tombstone = 3, row_marker_tombstone = 4, collection_tombstone = 5, cell_tombstone = 6, data = 7, }; private: // Set to true in order to produce mutations which are easier to work with during debugging. static const bool debuggable = false; // The "333" prefix is so that it's easily distinguishable from other numbers in the printout. static const api::timestamp_type min_timestamp = debuggable ? 3330000 : ::api::min_timestamp; friend class random_mutation_generator; generate_counters _generate_counters; local_shard_only _local_shard_only; generate_uncompactable _uncompactable; const size_t _external_blob_size = debuggable ? 4 : 128; // Should be enough to force use of external bytes storage const size_t n_blobs = debuggable ? 32 : 1024; const column_id column_count = debuggable ? 3 : 64; std::mt19937 _gen; schema_ptr _schema; std::vector _blobs; std::uniform_int_distribution _ck_index_dist{0, n_blobs - 1}; std::uniform_int_distribution _bool_dist{0, 1}; std::uniform_int_distribution _not_dummy_dist{0, 19}; std::uniform_int_distribution _range_tombstone_dist{0, 29}; std::uniform_int_distribution _timestamp_dist{min_timestamp, min_timestamp + 2}; // Sequence number for mutation elements. // Intended to be put as "deletion time". // The "777" prefix is so that it's easily distinguishable from other numbers in the printout. // Also makes it easy to grep for a particular element. uint64_t _seq = 777000000; compress _compress = compress::yes; template static gc_clock::time_point expiry_dist(Generator& gen) { static thread_local std::uniform_int_distribution dist(0, 2); return gc_clock::time_point() + std::chrono::seconds(dist(gen)); } schema_ptr do_make_schema(data_type type, const char* ks_name, const char* cf_name) { auto builder = schema_builder(ks_name, cf_name) .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck1", bytes_type, column_kind::clustering_key) .with_column("ck2", bytes_type, column_kind::clustering_key); auto add_column = [&] (const sstring& column_name, column_kind kind) { auto col_type = type == counter_type || _bool_dist(_gen) ? type : list_type_impl::get_instance(type, true); builder.with_column(to_bytes(column_name), col_type, kind); }; for (column_id i = 0; i < column_count; ++i) { add_column(format("v{:d}", i), column_kind::regular_column); add_column(format("s{:d}", i), column_kind::static_column); } if(!_compress) { builder.set_compressor_params(compression_parameters::no_compression()); } return builder.build(); } schema_ptr make_schema(const char* ks_name, const char* cf_name) { return _generate_counters ? do_make_schema(counter_type, ks_name, cf_name) : do_make_schema(bytes_type, ks_name, cf_name); } api::timestamp_type gen_timestamp(timestamp_level l) { auto ts = _timestamp_dist(_gen); if (_uncompactable) { // Offset the timestamp such that no higher level tombstones // covers any lower level tombstone, and no tombstone covers data. return ts + static_cast>(l) * 10; } return ts; } gc_clock::time_point new_expiry() { return debuggable ? gc_clock::time_point(gc_clock::time_point::duration(_seq++)) : expiry_dist(_gen); } tombstone random_tombstone(timestamp_level l) { return tombstone(gen_timestamp(l), new_expiry()); } public: explicit impl(generate_counters counters, local_shard_only lso = local_shard_only::yes, generate_uncompactable uc = generate_uncompactable::no, std::optional seed_opt = std::nullopt, const char* ks_name="ks", const char* cf_name="cf", compress c = compress::yes) : _generate_counters(counters), _local_shard_only(lso), _uncompactable(uc), _compress(c) { // In case of errors, reproduce using the --random-seed command line option with the test_runner seed. auto seed = seed_opt.value_or(tests::random::get_int()); std::cout << "random_mutation_generator seed: " << seed << "\n"; _gen = std::mt19937(seed); _schema = make_schema(ks_name, cf_name); // The pre-existing assumption here is that the type of all the primary key components is blob. // So we generate partition keys and take the single blob component and save it as a random blob value. auto keys = tests::generate_partition_keys(n_blobs, _schema, _local_shard_only, tests::key_size{_external_blob_size, _external_blob_size}); _blobs = keys | std::views::transform([] (const dht::decorated_key& dk) { return dk.key().explode().front(); }) | std::ranges::to>(); } void set_key_cardinality(size_t n_keys) { SCYLLA_ASSERT(n_keys <= n_blobs); _ck_index_dist = std::uniform_int_distribution{0, n_keys - 1}; } bytes random_blob() { return _blobs[std::min(_blobs.size() - 1, std::max(0, _ck_index_dist(_gen)))]; } clustering_key make_random_key() { return clustering_key::from_exploded(*_schema, { random_blob(), random_blob() }); } clustering_key_prefix make_random_prefix(std::optional max_components_opt = std::nullopt) { std::vector components = { random_blob() }; auto max_components = max_components_opt.value_or(_schema->clustering_key_size()); for (size_t i = 1; i < max_components; i++) { if (_bool_dist(_gen)) { components.push_back(random_blob()); } } return clustering_key_prefix::from_exploded(*_schema, std::move(components)); } std::vector make_random_ranges(unsigned n_ranges) { std::vector ranges; if (n_ranges == 0) { return ranges; } auto keys = std::set{clustering_key::less_compare(*_schema)}; while (keys.size() < n_ranges * 2) { keys.insert(make_random_key()); } auto i = keys.begin(); bool open_start = _bool_dist(_gen); bool open_end = _bool_dist(_gen); if (open_start && open_end && n_ranges == 1) { ranges.push_back(query::clustering_range::make_open_ended_both_sides()); return ranges; } if (open_start) { ranges.push_back(query::clustering_range( { }, { query::clustering_range::bound(*i++, _bool_dist(_gen)) } )); } n_ranges -= unsigned(open_start); n_ranges -= unsigned(open_end); while (n_ranges--) { auto start_key = *i++; auto end_key = *i++; ranges.push_back(query::clustering_range( { query::clustering_range::bound(start_key, _bool_dist(_gen)) }, { query::clustering_range::bound(end_key, _bool_dist(_gen)) } )); } if (open_end) { ranges.push_back(query::clustering_range( { query::clustering_range::bound(*i++, _bool_dist(_gen)) }, { } )); } return ranges; } range_tombstone make_random_range_tombstone() { auto t = random_tombstone(timestamp_level::range_tombstone); switch (_range_tombstone_dist(_gen)) { case 0: { // singular prefix auto prefix = make_random_prefix(_schema->clustering_key_size()-1); // make sure the prefix is partial auto start = bound_view(prefix, bound_kind::incl_start); auto end = bound_view(prefix, bound_kind::incl_end); return range_tombstone(std::move(start), std::move(end), std::move(t)); } case 1: { // unbound start auto prefix = make_random_prefix(); auto start = bound_view::bottom(); auto end = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end); return range_tombstone(std::move(start), std::move(end), std::move(t)); } case 2: { // unbound end auto prefix = make_random_prefix(); auto start = bound_view(prefix, _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start); auto end = bound_view::top(); return range_tombstone(std::move(start), std::move(end), std::move(t)); } default: // fully bounded auto start_prefix = make_random_prefix(); auto end_prefix = make_random_prefix(); clustering_key_prefix::tri_compare cmp(*_schema); auto d = cmp(end_prefix, start_prefix); while (d == 0) { end_prefix = make_random_prefix(); d = cmp(end_prefix, start_prefix); } if (d < 0) { std::swap(end_prefix, start_prefix); } auto start = bound_view(std::move(start_prefix), _bool_dist(_gen) ? bound_kind::incl_start : bound_kind::excl_start); auto end = bound_view(std::move(end_prefix), _bool_dist(_gen) ? bound_kind::incl_end : bound_kind::excl_end); return range_tombstone(std::move(start), std::move(end), std::move(t)); } } mutation operator()() { std::uniform_int_distribution column_count_dist(1, column_count); std::uniform_int_distribution column_id_dist(0, column_count - 1); std::uniform_int_distribution value_blob_index_dist(0, 2); auto pkey = partition_key::from_single_value(*_schema, _blobs[0]); mutation m(_schema, pkey); std::map> counter_used_clock_values; std::vector counter_ids; std::generate_n(std::back_inserter(counter_ids), 8, counter_id::create_random_id); auto random_counter_cell = [&] { std::uniform_int_distribution shard_count_dist(1, counter_ids.size()); std::uniform_int_distribution value_dist(-100, 100); std::uniform_int_distribution clock_dist(0, 20000); auto shard_count = shard_count_dist(_gen); std::set shards; for (auto i = 0u; i < shard_count; i++) { shards.emplace(counter_ids[shard_count_dist(_gen) - 1]); } counter_cell_builder ccb; for (auto&& id : shards) { // Make sure we don't get shards with the same id and clock // but different value. int64_t clock = clock_dist(_gen); while (counter_used_clock_values[id].contains(clock)) { clock = clock_dist(_gen); } counter_used_clock_values[id].emplace(clock); ccb.add_shard(counter_shard(id, value_dist(_gen), clock)); } return ccb.build(gen_timestamp(timestamp_level::data)); }; auto set_random_cells = [&] (row& r, column_kind kind) { auto columns_to_set = column_count_dist(_gen); for (column_id i = 0; i < columns_to_set; ++i) { auto cid = column_id_dist(_gen); auto& col = _schema->column_at(kind, cid); auto get_live_cell = [&] () -> atomic_cell_or_collection { if (_generate_counters) { return random_counter_cell(); } if (col.is_atomic()) { return atomic_cell::make_live(*col.type, gen_timestamp(timestamp_level::data), _blobs[value_blob_index_dist(_gen)]); } static thread_local std::uniform_int_distribution element_dist{1, 13}; static thread_local std::uniform_int_distribution uuid_ts_dist{-12219292800000L, -12219292800000L + 1000}; collection_mutation_description m; auto num_cells = element_dist(_gen); m.cells.reserve(num_cells); std::unordered_set unique_cells; unique_cells.reserve(num_cells); auto ctype = static_pointer_cast(col.type); for (auto i = 0; i < num_cells; ++i) { auto uuid = utils::UUID_gen::min_time_UUID(std::chrono::milliseconds{uuid_ts_dist(_gen)}).serialize(); if (unique_cells.emplace(uuid).second) { m.cells.emplace_back( bytes(reinterpret_cast(uuid.data()), uuid.size()), atomic_cell::make_live(*ctype->value_comparator(), gen_timestamp(timestamp_level::data), _blobs[value_blob_index_dist(_gen)], atomic_cell::collection_member::yes)); } } std::sort(m.cells.begin(), m.cells.end(), [] (auto&& c1, auto&& c2) { return timeuuid_type->as_less_comparator()(c1.first, c2.first); }); return m.serialize(*ctype); }; auto get_dead_cell = [&] () -> atomic_cell_or_collection{ if (col.is_atomic() || col.is_counter()) { return atomic_cell::make_dead(gen_timestamp(timestamp_level::cell_tombstone), new_expiry()); } collection_mutation_description m; m.tomb = tombstone(gen_timestamp(timestamp_level::collection_tombstone), new_expiry()); return m.serialize(*col.type); }; // FIXME: generate expiring cells auto cell = _bool_dist(_gen) ? get_live_cell() : get_dead_cell(); r.apply(_schema->column_at(kind, cid), std::move(cell)); } }; auto random_row_marker = [&] { static thread_local std::uniform_int_distribution dist(0, 3); switch (dist(_gen)) { case 0: return row_marker(); case 1: return row_marker(random_tombstone(timestamp_level::row_marker_tombstone)); case 2: return row_marker(gen_timestamp(timestamp_level::data)); case 3: return row_marker(gen_timestamp(timestamp_level::data), std::chrono::seconds(1), new_expiry()); default: SCYLLA_ASSERT(0); } abort(); }; if (tests::random::with_probability(0.11)) { m.partition().apply(random_tombstone(timestamp_level::partition_tombstone)); } m.partition().set_static_row_continuous(_bool_dist(_gen)); set_random_cells(m.partition().static_row().maybe_create(), column_kind::static_column); auto row_count_dist = [&] (auto& gen) { static thread_local std::normal_distribution<> dist(32, 1.5); return static_cast(std::min(100.0, std::max(0.0, dist(gen)))); }; size_t row_count = row_count_dist(_gen); std::unordered_set keys( 0, clustering_key::hashing(*_schema), clustering_key::equality(*_schema)); while (keys.size() < row_count) { keys.emplace(make_random_key()); } for (auto&& ckey : keys) { is_continuous continuous = is_continuous(_bool_dist(_gen)); if (_not_dummy_dist(_gen)) { deletable_row& row = m.partition().clustered_row(*_schema, ckey, is_dummy::no, continuous); row.apply(random_row_marker()); if (!row.marker().is_missing() && !row.marker().is_live()) { // Mutations are not associative if dead marker is not matched with a dead row // due to shadowable tombstone merging rules. See #11307. row.apply(tombstone(row.marker().timestamp(), row.marker().deletion_time())); } if (_bool_dist(_gen)) { set_random_cells(row.cells(), column_kind::regular_column); } else { bool is_regular = _bool_dist(_gen); if (is_regular) { row.apply(random_tombstone(timestamp_level::row_tombstone)); } else { row.apply(shadowable_tombstone{random_tombstone(timestamp_level::row_shadowable_tombstone)}); } bool second_tombstone = _bool_dist(_gen); if (second_tombstone) { // Need to add the opposite of what has been just added if (is_regular) { row.apply(shadowable_tombstone{random_tombstone(timestamp_level::row_shadowable_tombstone)}); } else { row.apply(random_tombstone(timestamp_level::row_tombstone)); } } } } else { m.partition().clustered_row(*_schema, position_in_partition::after_key(*_schema, ckey), is_dummy::yes, continuous); } } size_t range_tombstone_count = row_count_dist(_gen); for (size_t i = 0; i < range_tombstone_count; ++i) { m.partition().apply_row_tombstone(*_schema, make_random_range_tombstone()); } if (_bool_dist(_gen)) { m.partition().ensure_last_dummy(*_schema); m.partition().clustered_rows().rbegin()->set_continuous(is_continuous(_bool_dist(_gen))); } return m; } std::vector make_partition_keys(size_t n) { return tests::generate_partition_keys(n, _schema, _local_shard_only); } utils::chunked_vector operator()(size_t n) { auto keys = make_partition_keys(n); utils::chunked_vector mutations; for (auto&& dkey : keys) { auto m = operator()(); mutations.emplace_back(_schema, std::move(dkey), std::move(m.partition())); } return mutations; } }; random_mutation_generator::~random_mutation_generator() {} random_mutation_generator::random_mutation_generator(generate_counters counters, local_shard_only lso, generate_uncompactable uc, std::optional seed_opt, const char* ks_name, const char* cf_name, compress c) : _impl(std::make_unique(counters, lso, uc, seed_opt, ks_name, cf_name, c)) { } mutation random_mutation_generator::operator()() { return (*_impl)(); } utils::chunked_vector random_mutation_generator::operator()(size_t n) { return (*_impl)(n); } std::vector random_mutation_generator::make_partition_keys(size_t n) { return _impl->make_partition_keys(n); } schema_ptr random_mutation_generator::schema() const { return _impl->_schema; } range_tombstone random_mutation_generator::make_random_range_tombstone() { return _impl->make_random_range_tombstone(); } clustering_key random_mutation_generator::make_random_key() { return _impl->make_random_key(); } std::vector random_mutation_generator::make_random_ranges(unsigned n_ranges) { return _impl->make_random_ranges(n_ranges); } void random_mutation_generator::set_key_cardinality(size_t n_keys) { _impl->set_key_cardinality(n_keys); } void for_each_schema_change(std::function&, schema_ptr, const utils::chunked_vector&)> fn) { auto map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, true); auto map_of_int_to_bytes = map_type_impl::get_instance(int32_type, bytes_type, true); auto frozen_map_of_int_to_int = map_type_impl::get_instance(int32_type, int32_type, false); auto frozen_map_of_int_to_bytes = map_type_impl::get_instance(int32_type, bytes_type, false); auto tuple_of_int_long = tuple_type_impl::get_instance({ int32_type, long_type }); auto tuple_of_bytes_long = tuple_type_impl::get_instance( { bytes_type, long_type }); auto tuple_of_bytes_bytes = tuple_type_impl::get_instance( { bytes_type, bytes_type }); auto set_of_text = set_type_impl::get_instance(utf8_type, true); auto set_of_bytes = set_type_impl::get_instance(bytes_type, true); auto udt_int_text = user_type_impl::get_instance("ks", "udt", { utf8_type->decompose("f1"), utf8_type->decompose("f2"), }, { int32_type, utf8_type }, true); auto udt_int_blob_long = user_type_impl::get_instance("ks", "udt", { utf8_type->decompose("v1"), utf8_type->decompose("v2"), utf8_type->decompose("v3"), }, { int32_type, bytes_type, long_type }, true); auto frozen_udt_int_text = user_type_impl::get_instance("ks", "udt", { utf8_type->decompose("f1"), utf8_type->decompose("f2"), }, { int32_type, utf8_type }, false); auto frozen_udt_int_blob_long = user_type_impl::get_instance("ks", "udt", { utf8_type->decompose("v1"), utf8_type->decompose("v2"), utf8_type->decompose("v3"), }, { int32_type, bytes_type, long_type }, false); auto random_int32_value = [] { return int32_type->decompose(tests::random::get_int()); }; auto random_text_value = [] { return utf8_type->decompose(tests::random::get_sstring()); }; int32_t key_id = 0; auto random_partition_key = [&] () -> tests::data_model::mutation_description::key { return { random_int32_value(), random_int32_value(), int32_type->decompose(key_id++), }; }; auto random_clustering_key = [&] () -> tests::data_model::mutation_description::key { return { utf8_type->decompose(tests::random::get_sstring()), utf8_type->decompose(tests::random::get_sstring()), utf8_type->decompose(format("{}", key_id++)), }; }; auto random_map = [&] () -> tests::data_model::mutation_description::collection { return { { int32_type->decompose(1), random_int32_value() }, { int32_type->decompose(2), random_int32_value() }, { int32_type->decompose(3), random_int32_value() }, }; }; auto random_frozen_map = [&] { return map_of_int_to_int->decompose(make_map_value(map_of_int_to_int, map_type_impl::native_type({ { 1, tests::random::get_int() }, { 2, tests::random::get_int() }, { 3, tests::random::get_int() }, }))); }; auto random_tuple = [&] { return tuple_of_int_long->decompose(make_tuple_value(tuple_of_int_long, tuple_type_impl::native_type{ tests::random::get_int(), tests::random::get_int(), })); }; auto random_set = [&] () -> tests::data_model::mutation_description::collection { return { { utf8_type->decompose("a"), bytes() }, { utf8_type->decompose("b"), bytes() }, { utf8_type->decompose("c"), bytes() }, }; }; auto random_udt = [&] () -> tests::data_model::mutation_description::collection { return { { serialize_field_index(0), random_int32_value() }, { serialize_field_index(1), random_text_value() }, }; }; auto random_frozen_udt = [&] { return frozen_udt_int_text->decompose(make_user_value(udt_int_text, user_type_impl::native_type{ tests::random::get_int(), tests::random::get_sstring(), })); }; struct column_description { int id; data_type type; std::vector alter_to; std::vector> data_generators; data_type old_type; }; auto columns = std::vector { { 100, int32_type, { varint_type, bytes_type }, { [&] { return random_int32_value(); }, [&] { return bytes(); } }, uuid_type }, { 200, map_of_int_to_int, { map_of_int_to_bytes }, { [&] { return random_map(); } }, empty_type }, { 300, int32_type, { varint_type, bytes_type }, { [&] { return random_int32_value(); }, [&] { return bytes(); } }, empty_type }, { 400, frozen_map_of_int_to_int, { frozen_map_of_int_to_bytes }, { [&] { return random_frozen_map(); } }, empty_type }, { 500, tuple_of_int_long, { tuple_of_bytes_long, tuple_of_bytes_bytes }, { [&] { return random_tuple(); } }, empty_type }, { 600, set_of_text, { set_of_bytes }, { [&] { return random_set(); } }, empty_type }, { 700, udt_int_text, { udt_int_blob_long }, { [&] { return random_udt(); } }, empty_type }, { 800, frozen_udt_int_text, { frozen_udt_int_blob_long }, { [&] { return random_frozen_udt(); } }, empty_type }, }; auto static_columns = columns; auto regular_columns = columns; // Base schema auto s = tests::data_model::table_description({ { "pk1", int32_type }, { "pk2", int32_type }, { "pk3", int32_type }, }, { { "ck1", utf8_type }, { "ck2", utf8_type }, { "ck3", utf8_type }, }); for (auto& sc : static_columns) { auto name = format("s{}", sc.id); s.add_static_column(name, sc.type); if (sc.old_type != empty_type) { s.add_old_static_column(name, sc.old_type); } } for (auto& rc : regular_columns) { auto name = format("r{}", rc.id); s.add_regular_column(name, rc.type); if (rc.old_type != empty_type) { s.add_old_regular_column(name, rc.old_type); } } auto max_generator_count = std::max( std::ranges::fold_left(static_columns | std::views::transform([] (const column_description& c) { return c.data_generators.size(); }), 0u, [] (size_t a, size_t b) { return std::max(a, b); }), std::ranges::fold_left(regular_columns | std::views::transform([] (const column_description& c) { return c.data_generators.size(); }), 0u, [] (size_t a, size_t b) { return std::max(a, b); }) ); // Base data // Single column in a static row, nothing else for (auto& [id, type, alter_to, data_generators, old_type] : static_columns) { auto name = format("s{}", id); for (auto& dg : data_generators) { auto m = tests::data_model::mutation_description(random_partition_key()); m.add_static_cell(name, dg()); s.unordered_mutations().emplace_back(std::move(m)); } } // Partition with rows each having a single column auto m = tests::data_model::mutation_description(random_partition_key()); for (auto& [id, type, alter_to, data_generators, old_type] : regular_columns) { auto name = format("r{}", id); for (auto& dg : data_generators) { m.add_clustered_cell(random_clustering_key(), name, dg()); } } s.unordered_mutations().emplace_back(std::move(m)); // Absolutely everything for (auto i = 0u; i < max_generator_count; i++) { auto m = tests::data_model::mutation_description(random_partition_key()); for (auto& [id, type, alter_to, data_generators, old_type] : static_columns) { auto name = format("s{}", id); m.add_static_cell(name, data_generators[std::min(i, data_generators.size() - 1)]()); } for (auto& [id, type, alter_to, data_generators, old_type] : regular_columns) { auto name = format("r{}", id); m.add_clustered_cell(random_clustering_key(), name, data_generators[std::min(i, data_generators.size() - 1)]()); } m.add_range_tombstone(random_clustering_key(), random_clustering_key()); m.add_range_tombstone(random_clustering_key(), random_clustering_key()); m.add_range_tombstone(random_clustering_key(), random_clustering_key()); s.unordered_mutations().emplace_back(std::move(m)); } // Transformations auto base = s.build(); std::vector schemas; schemas.emplace_back(base); auto test_mutated_schemas = [&] { auto& [ base_change_log, base_schema, base_mutations ] = base; for (auto&& [ mutated_change_log, mutated_schema, mutated_mutations ] : schemas) { testlog.info("\nSchema change from:\n\n{}\n\nto:\n\n{}\n", base_change_log, mutated_change_log); fn(base_schema, base_mutations, mutated_schema, mutated_mutations); } for (auto i = 2u; i < schemas.size(); i++) { auto& [ base_change_log, base_schema, base_mutations ] = schemas[i - 1]; auto& [ mutated_change_log, mutated_schema, mutated_mutations ] = schemas[i]; testlog.info("\nSchema change from:\n\n{}\n\nto:\n\n{}\n", base_change_log, mutated_change_log); fn(base_schema, base_mutations, mutated_schema, mutated_mutations); } schemas.clear(); schemas.emplace_back(base); }; auto original_s = s; // Remove and add back all static columns for (auto& sc : static_columns) { s.remove_static_column(format("s{}", sc.id)); schemas.emplace_back(s.build()); } for (auto& sc : static_columns) { s.add_static_column(format("s{}", sc.id), uuid_type); auto mutated = s.build(); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Remove and add back all regular columns for (auto& rc : regular_columns) { s.remove_regular_column(format("r{}", rc.id)); schemas.emplace_back(s.build()); } auto temp_s = s; auto temp_schemas = schemas; for (auto& rc : regular_columns) { s.add_regular_column(format("r{}", rc.id), uuid_type); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = temp_s; schemas = temp_schemas; // Add back all regular columns as collections for (auto& rc : regular_columns) { s.add_regular_column(format("r{}", rc.id), map_of_int_to_bytes); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = temp_s; schemas = temp_schemas; // Add back all regular columns as frozen collections for (auto& rc : regular_columns) { s.add_regular_column(format("r{}", rc.id), frozen_map_of_int_to_int); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Add more static columns for (auto& sc : static_columns) { s.add_static_column(format("s{}", sc.id + 1), uuid_type); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Add more regular columns for (auto& rc : regular_columns) { s.add_regular_column(format("r{}", rc.id + 1), uuid_type); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Alter column types for (auto& sc : static_columns) { for (auto& target : sc.alter_to) { s.alter_static_column_type(format("s{}", sc.id), target); schemas.emplace_back(s.build()); } } for (auto& rc : regular_columns) { for (auto& target : rc.alter_to) { s.alter_regular_column_type(format("r{}", rc.id), target); schemas.emplace_back(s.build()); } } for (auto i = 1; i <= 3; i++) { s.alter_clustering_column_type(format("ck{}", i), bytes_type); schemas.emplace_back(s.build()); } for (auto i = 1; i <= 3; i++) { s.alter_partition_column_type(format("pk{}", i), bytes_type); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Rename clustering key for (auto i = 1; i <= 3; i++) { s.rename_clustering_column(format("ck{}", i), format("ck{}", 100 - i)); schemas.emplace_back(s.build()); } test_mutated_schemas(); s = original_s; // Rename partition key for (auto i = 1; i <= 3; i++) { s.rename_partition_column(format("pk{}", i), format("pk{}", 100 - i)); schemas.emplace_back(s.build()); } test_mutated_schemas(); } static bool compare_readers(const schema& s, mutation_reader& authority, mutation_reader_assertions& tested) { bool empty = true; while (auto expected = authority().get()) { tested.produces(s, *expected); empty = false; } tested.produces_end_of_stream(); return !empty; } void compare_readers(const schema& s, mutation_reader authority, mutation_reader tested, bool exact) { auto close_authority = deferred_close(authority); auto assertions = assert_that(std::move(tested)).exact(exact); compare_readers(s, authority, assertions); } // Assumes that the readers return fragments from (at most) a single (and the same) partition. void compare_readers(const schema& s, mutation_reader authority, mutation_reader tested, const std::vector& fwd_ranges) { auto close_authority = deferred_close(authority); auto assertions = assert_that(std::move(tested)); if (compare_readers(s, authority, assertions)) { for (auto& r: fwd_ranges) { authority.fast_forward_to(r).get(); assertions.fast_forward_to(r); compare_readers(s, authority, assertions); } } } mutation forwardable_reader_to_mutation(mutation_reader r, const std::vector& fwd_ranges) { auto close_reader = deferred_close(r); struct consumer { schema_ptr _s; std::optional& _builder; consumer(schema_ptr s, std::optional& builder) : _s(std::move(s)) , _builder(builder) { } void consume_new_partition(const dht::decorated_key& dk) { SCYLLA_ASSERT(!_builder); _builder = mutation_rebuilder_v2(std::move(_s)); _builder->consume_new_partition(dk); } stop_iteration consume(tombstone t) { SCYLLA_ASSERT(_builder); return _builder->consume(t); } stop_iteration consume(range_tombstone_change&& rt) { SCYLLA_ASSERT(_builder); return _builder->consume(std::move(rt)); } stop_iteration consume(static_row&& sr) { SCYLLA_ASSERT(_builder); return _builder->consume(std::move(sr)); } stop_iteration consume(clustering_row&& cr) { SCYLLA_ASSERT(_builder); return _builder->consume(std::move(cr)); } stop_iteration consume_end_of_partition() { SCYLLA_ASSERT(_builder); return stop_iteration::yes; } void consume_end_of_stream() { } }; std::optional builder{}; r.consume(consumer(r.schema(), builder)).get(); BOOST_REQUIRE(builder); for (auto& range : fwd_ranges) { testlog.trace("forwardable_reader_to_mutation: forwarding to {}", range); r.fast_forward_to(range).get(); r.consume(consumer(r.schema(), builder)).get(); } auto m = builder->consume_end_of_stream(); BOOST_REQUIRE(m); return std::move(*m); } utils::chunked_vector squash_mutations(utils::chunked_vector mutations) { if (mutations.empty()) { return {}; } std::map merged_muts{ dht::ring_position_less_comparator{*mutations.front().schema()}}; for (const auto& mut : mutations) { auto [it, inserted] = merged_muts.try_emplace(mut.decorated_key(), mut); if (!inserted) { it->second.apply(mut); } } return merged_muts | std::views::values | std::ranges::to>(); }