/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include "compaction/compaction_garbage_collector.hh" #include "mutation_query.hh" #include "utils/assert.hh" #include "utils/hashers.hh" #include "utils/preempt.hh" #include "utils/xx_hasher.hh" #include #include #include #include #include #include #include "replica/database.hh" #include "utils/UUID_gen.hh" #include "keys/clustering_interval_set.hh" #include "schema/schema_builder.hh" #include "query/query-result-set.hh" #include "query/query-result-reader.hh" #include "partition_slice_builder.hh" #include "test/lib/tmpdir.hh" #include "compaction/compaction_manager.hh" #include "test/lib/scylla_test_case.hh" #include #include "test/lib/mutation_assertions.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/result_set_assertions.hh" #include "test/lib/sstable_test_env.hh" #include "test/lib/random_schema.hh" #include "test/lib/mutation_source_test.hh" #include "replica/cell_locking.hh" #include "test/lib/mutation_reader_assertions.hh" #include "test/lib/mutation_assertions.hh" #include "test/lib/random_utils.hh" #include "test/lib/simple_schema.hh" #include "test/lib/sstable_utils.hh" #include "test/lib/test_utils.hh" #include "test/lib/log.hh" #include "types/map.hh" #include "types/list.hh" #include "types/set.hh" #include "types/user.hh" #include "mutation/mutation_rebuilder.hh" #include "mutation/mutation_partition.hh" #include "mutation/async_utils.hh" #include "keys/clustering_key_filter.hh" #include "readers/from_mutations.hh" #include "readers/from_fragments.hh" using namespace std::chrono_literals; static sstring some_keyspace("ks"); static sstring some_column_family("cf"); static atomic_cell make_atomic_cell(bytes value) { return atomic_cell::make_live(*bytes_type, 0, std::move(value)); } static atomic_cell make_atomic_cell() { return atomic_cell::make_live(*bytes_type, 0, bytes_view()); } template static atomic_cell make_atomic_cell(data_type dt, T value) { return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value))); }; template static atomic_cell make_collection_member(data_type dt, T value) { return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value)), atomic_cell::collection_member::yes); }; static mutation_partition get_partition(reader_permit permit, replica::memtable& mt, const partition_key& key) { auto dk = dht::decorate_key(*mt.schema(), key); auto range = dht::partition_range::make_singular(dk); auto reader = mt.make_mutation_reader(mt.schema(), std::move(permit), range); auto close_reader = deferred_close(reader); auto mo = read_mutation_from_mutation_reader(reader).get(); BOOST_REQUIRE(bool(mo)); return std::move(mo->partition()); } future<> with_column_family(schema_ptr s, replica::column_family::config cfg, sstables::sstables_manager& sm, noncopyable_function (replica::column_family&)> func) { std::vector x_log2_compaction_group_values = { 0 /* 1 CG */, 3 /* 8 CGs */ }; for (auto x_log2_compaction_groups : x_log2_compaction_group_values) { auto tracker = make_lw_shared(); auto dir = tmpdir(); cfg.x_log2_compaction_groups = x_log2_compaction_groups; tasks::task_manager tm; auto cm = make_lw_shared(tm, compaction::compaction_manager::for_testing_tag{}); auto cl_stats = make_lw_shared(); auto s_opts = make_lw_shared(data_dictionary::make_local_options(dir.path())); auto cf = make_lw_shared(s, cfg, s_opts, *cm, sm, *cl_stats, *tracker, nullptr); cf->mark_ready_for_writes(nullptr); co_await func(*cf); co_await cf->stop(); } } SEASTAR_TEST_CASE(test_mutation_is_applied) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("r1", int32_type) .build(); auto mt = make_lw_shared(s); const column_definition& r1_col = *s->get_column_definition("r1"); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(2)}); mutation m(s, key); auto c = make_atomic_cell(int32_type, 3); m.set_clustered_cell(c_key, r1_col, std::move(c)); mt->apply(std::move(m)); auto p = get_partition(semaphore.make_permit(), *mt, key); row& r = p.clustered_row(*s, c_key).cells(); auto i = r.find_cell(r1_col.id); BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(3))); }); } SEASTAR_TEST_CASE(test_multi_level_row_tombstones) { auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("c2", int32_type, column_kind::clustering_key) .with_column("c3", int32_type, column_kind::clustering_key) .with_column("r1", int32_type) .build(); auto ttl = gc_clock::now() + std::chrono::seconds(1); mutation m(s, partition_key::from_exploded(*s, {to_bytes("key1")})); auto make_prefix = [s] (const std::vector& v) { return clustering_key_prefix::from_deeply_exploded(*s, v); }; auto make_key = [s] (const std::vector& v) { return clustering_key::from_deeply_exploded(*s, v); }; m.partition().apply_row_tombstone(*s, make_prefix({1, 2}), tombstone(9, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 3})), row_tombstone(tombstone(9, ttl))); m.partition().apply_row_tombstone(*s, make_prefix({1, 3}), tombstone(8, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(9, ttl))); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(8, ttl))); m.partition().apply_row_tombstone(*s, make_prefix({1}), tombstone(11, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(11, ttl))); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(11, ttl))); m.partition().apply_row_tombstone(*s, make_prefix({1, 4}), tombstone(6, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), row_tombstone(tombstone(11, ttl))); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), row_tombstone(tombstone(11, ttl))); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 4, 0})), row_tombstone(tombstone(11, ttl))); return make_ready_future<>(); } SEASTAR_TEST_CASE(test_row_tombstone_updates) { auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("c2", int32_type, column_kind::clustering_key) .with_column("r1", int32_type) .build(); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto c_key1 = clustering_key::from_deeply_exploded(*s, {1, 0}); auto c_key1_prefix = clustering_key_prefix::from_deeply_exploded(*s, {1}); auto c_key2 = clustering_key::from_deeply_exploded(*s, {2, 0}); auto c_key2_prefix = clustering_key_prefix::from_deeply_exploded(*s, {2}); auto ttl = gc_clock::now() + std::chrono::seconds(1); mutation m(s, key); m.partition().apply_row_tombstone(*s, c_key1_prefix, tombstone(1, ttl)); m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(0, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key1), row_tombstone(tombstone(1, ttl))); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), row_tombstone(tombstone(0, ttl))); m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(1, ttl)); BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), row_tombstone(tombstone(1, ttl))); return make_ready_future<>(); } collection_mutation_description make_collection_mutation(tombstone t, bytes key, atomic_cell cell) { collection_mutation_description m; m.tomb = t; m.cells.emplace_back(std::move(key), std::move(cell)); return m; } collection_mutation_description make_collection_mutation(tombstone t, bytes key1, atomic_cell cell1, bytes key2, atomic_cell cell2) { collection_mutation_description m; m.tomb = t; m.cells.emplace_back(std::move(key1), std::move(cell1)); m.cells.emplace_back(std::move(key2), std::move(cell2)); return m; } SEASTAR_TEST_CASE(test_map_mutations) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto my_map_type = map_type_impl::get_instance(int32_type, utf8_type, true); auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("s1", my_map_type, column_kind::static_column) .build(); auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_collection_member(utf8_type, sstring("101"))); mutation m1(s, key); m1.set_static_cell(column, mmut1.serialize(*my_map_type)); mt->apply(m1); auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102"))); mutation m2(s, key); m2.set_static_cell(column, mmut2.serialize(*my_map_type)); mt->apply(m2); auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_collection_member(utf8_type, sstring("103"))); mutation m3(s, key); m3.set_static_cell(column, mmut3.serialize(*my_map_type)); mt->apply(m3); auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102 override"))); mutation m2o(s, key); m2o.set_static_cell(column, mmut2o.serialize(*my_map_type)); mt->apply(m2o); auto p = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r = p.static_row(); auto i = r.find_cell(column.id); BOOST_REQUIRE(i); i->as_collection_mutation().with_deserialized(*my_map_type, [] (collection_mutation_view_description muts) { BOOST_REQUIRE(muts.cells.size() == 3); }); // FIXME: more strict tests }); } SEASTAR_TEST_CASE(test_set_mutations) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto my_set_type = set_type_impl::get_instance(int32_type, true); auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("s1", my_set_type, column_kind::static_column) .build(); auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell()); mutation m1(s, key); m1.set_static_cell(column, mmut1.serialize(*my_set_type)); mt->apply(m1); auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell()); mutation m2(s, key); m2.set_static_cell(column, mmut2.serialize(*my_set_type)); mt->apply(m2); auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell()); mutation m3(s, key); m3.set_static_cell(column, mmut3.serialize(*my_set_type)); mt->apply(m3); auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell()); mutation m2o(s, key); m2o.set_static_cell(column, mmut2o.serialize(*my_set_type)); mt->apply(m2o); auto p = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r = p.static_row(); auto i = r.find_cell(column.id); BOOST_REQUIRE(i); i->as_collection_mutation().with_deserialized(*my_set_type, [] (collection_mutation_view_description muts) { BOOST_REQUIRE(muts.cells.size() == 3); }); // FIXME: more strict tests }); } SEASTAR_TEST_CASE(test_list_mutations) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto my_list_type = list_type_impl::get_instance(int32_type, true); auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("s1", my_list_type, column_kind::static_column) .build(); auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); }; auto mmut1 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 101)); mutation m1(s, key); m1.set_static_cell(column, mmut1.serialize(*my_list_type)); mt->apply(m1); auto mmut2 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2(s, key); m2.set_static_cell(column, mmut2.serialize(*my_list_type)); mt->apply(m2); auto mmut3 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 103)); mutation m3(s, key); m3.set_static_cell(column, mmut3.serialize(*my_list_type)); mt->apply(m3); auto mmut2o = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102)); mutation m2o(s, key); m2o.set_static_cell(column, mmut2o.serialize(*my_list_type)); mt->apply(m2o); auto p = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r = p.static_row(); auto i = r.find_cell(column.id); BOOST_REQUIRE(i); i->as_collection_mutation().with_deserialized(*my_list_type, [] (collection_mutation_view_description muts) { BOOST_REQUIRE(muts.cells.size() == 4); }); // FIXME: more strict tests }); } SEASTAR_THREAD_TEST_CASE(test_udt_mutations) { tests::reader_concurrency_semaphore_wrapper semaphore; // (a int, b text, c long, d text) auto ut = user_type_impl::get_instance("ks", to_bytes("ut"), {to_bytes("a"), to_bytes("b"), to_bytes("c"), to_bytes("d")}, {int32_type, utf8_type, long_type, utf8_type}, true); auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("s1", ut, column_kind::static_column) .build(); auto mt = make_lw_shared(s); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto& column = *s->get_column_definition("s1"); // {a: 0, c: 2} auto mut1 = make_collection_mutation({}, serialize_field_index(0), make_collection_member(int32_type, 0), serialize_field_index(2), make_collection_member(long_type, int64_t(2))); mutation m1(s, key); m1.set_static_cell(column, mut1.serialize(*ut)); mt->apply(m1); // {d: "text"} auto mut2 = make_collection_mutation({}, serialize_field_index(3), make_collection_member(utf8_type, "text")); mutation m2(s, key); m2.set_static_cell(column, mut2.serialize(*ut)); mt->apply(m2); // {c: 3} auto mut3 = make_collection_mutation({}, serialize_field_index(2), make_collection_member(long_type, int64_t(3))); mutation m3(s, key); m3.set_static_cell(column, mut3.serialize(*ut)); mt->apply(m3); auto p = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r = p.static_row(); auto i = r.find_cell(column.id); BOOST_REQUIRE(i); i->as_collection_mutation().with_deserialized(*ut, [&] (collection_mutation_view_description m) { // one cell for each field that has been set. mut3 and mut1 should have been merged BOOST_REQUIRE(m.cells.size() == 3); BOOST_REQUIRE(std::all_of(m.cells.begin(), m.cells.end(), [] (const auto& c) { return c.second.is_live(); })); auto cells_equal = [] (const auto& c1, const auto& c2) { return c1.first == c2.first && c1.second.value().linearize() == c2.second.value().linearize(); }; auto cell_a = std::make_pair(serialize_field_index(0), make_collection_member(int32_type, 0)); BOOST_REQUIRE(cells_equal(m.cells[0], std::pair(cell_a.first, cell_a.second))); auto cell_c = std::make_pair(serialize_field_index(2), make_collection_member(long_type, int64_t(3))); BOOST_REQUIRE(cells_equal(m.cells[1], std::pair(cell_c.first, cell_c.second))); auto cell_d = std::make_pair(serialize_field_index(3), make_collection_member(utf8_type, "text")); BOOST_REQUIRE(cells_equal(m.cells[2], std::pair(cell_d.first, cell_d.second))); auto mm = m.materialize(*ut); BOOST_REQUIRE(mm.cells.size() == 3); BOOST_REQUIRE(cells_equal(mm.cells[0], cell_a)); BOOST_REQUIRE(cells_equal(mm.cells[1], cell_c)); BOOST_REQUIRE(cells_equal(mm.cells[2], cell_d)); }); } // Verify that serializing and unserializing a large collection doesn't // trigger any large allocations. // We create a 8MB collection, composed of key/value pairs of varying // size, apply it to a memtable and verify that during usual memtable // operations like merging two collections and compaction query results // there are no allocations larger than our usual 128KB buffer size. SEASTAR_THREAD_TEST_CASE(test_large_collection_allocation) { tests::reader_concurrency_semaphore_wrapper semaphore; const auto key_type = int32_type; const auto value_type = utf8_type; const auto collection_type = map_type_impl::get_instance(key_type, value_type, true); auto schema = schema_builder("test", "test_large_collection_allocation") .with_column("pk", int32_type, column_kind::partition_key) .with_column("v", collection_type) .build(); const std::array sizes_kb{size_t(1), size_t(10), size_t(64)}; auto mt = make_lw_shared(schema); auto make_mutation_with_collection = [&schema, &semaphore, collection_type] (partition_key pk, collection_mutation_description cmd) { const auto& cdef = schema->column_at(column_kind::regular_column, 0); mutation mut(schema, pk); row r; r.apply(cdef, atomic_cell_or_collection(cmd.serialize(*collection_type))); mut.apply(mutation_fragment(*schema, semaphore.make_permit(), clustering_row(clustering_key_prefix::make_empty(), {}, {}, std::move(r)))); return mut; }; for (size_t i = 0; i != sizes_kb.size(); ++i) { const auto pk = partition_key::from_single_value(*schema, int32_type->decompose(int(i))); const auto blob_size = sizes_kb[i] * 1024; const bytes blob(blob_size, 'a'); const auto stats_before = memory::stats(); const memory::scoped_large_allocation_warning_threshold _{128 * 1024 + 1}; const api::timestamp_type ts1 = 1; const api::timestamp_type ts2 = 2; collection_mutation_description cmd1; collection_mutation_description cmd2; for (size_t j = 0; j < size_t(8 * 1024 * 1024) / blob_size; ++j) { // we want no more than 8MB total size cmd1.cells.emplace_back(int32_type->decompose(int(j)), atomic_cell::make_live(*value_type, ts1, blob, atomic_cell::collection_member::yes)); cmd2.cells.emplace_back(int32_type->decompose(int(j)), atomic_cell::make_live(*value_type, ts2, blob, atomic_cell::collection_member::yes)); } mt->apply(make_mutation_with_collection(pk, std::move(cmd1))); mt->apply(make_mutation_with_collection(pk, std::move(cmd2))); // this should trigger a merge of the two collections auto rd = mt->make_mutation_reader(schema, semaphore.make_permit()); auto close_rd = deferred_close(rd); auto res_mut_opt = read_mutation_from_mutation_reader(rd).get(); BOOST_REQUIRE(res_mut_opt); res_mut_opt->partition().compact_for_query(*schema, res_mut_opt->decorated_key(), gc_clock::now(), {query::full_clustering_range}, true, std::numeric_limits::max()); const auto stats_after = memory::stats(); BOOST_REQUIRE_EQUAL(stats_before.large_allocations(), stats_after.large_allocations()); } } SEASTAR_THREAD_TEST_CASE(test_large_collection_serialization_exception_safety) { #ifndef SEASTAR_ENABLE_ALLOC_FAILURE_INJECTION std::cout << "Test case " << get_name() << " will not run because SEASTAR_ENABLE_ALLOC_FAILURE_INJECTION is not defined." << std::endl; return; #endif const auto key_type = int32_type; const auto value_type = utf8_type; const auto collection_type = map_type_impl::get_instance(key_type, value_type, true); const auto blob_size = 1024; const bytes blob(blob_size, 'a'); const api::timestamp_type ts = 1; collection_mutation_description cmd; for (size_t i = 0; i != 256; ++i) { cmd.cells.emplace_back(int32_type->decompose(int(i)), atomic_cell::make_live(*value_type, ts, blob, atomic_cell::collection_member::yes)); } // We need an undisturbed run first to create all thread_local variables. cmd.serialize(*collection_type); memory::with_allocation_failures([&] { cmd.serialize(*collection_type); }); } SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { return sstables::test_env::do_with_async([] (sstables::test_env& env) { auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("r1", int32_type) .build(); auto cf_stats = make_lw_shared(); replica::column_family::config cfg = env.make_table_config(); cfg.enable_disk_reads = false; cfg.enable_disk_writes = false; cfg.enable_incremental_backups = false; cfg.cf_stats = &*cf_stats; with_column_family(s, cfg, env.manager(), [s, &env] (replica::column_family& cf) -> future<> { const column_definition& r1_col = *s->get_column_definition("r1"); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); auto insert_row = [&] (int32_t c1, int32_t r1) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); mutation m(s, key); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type, r1)); cf.apply(std::move(m)); return cf.flush(); }; co_await insert_row(1001, 2001); co_await insert_row(1002, 2002); co_await insert_row(1003, 2003); { auto verify_row = [&] (int32_t c1, int32_t r1) -> future<> { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); auto p_key = dht::decorate_key(*s, key); auto r = co_await cf.find_row(cf.schema(), env.make_reader_permit(), p_key, c_key); { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(r1_col); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(r1))); } }; co_await verify_row(1001, 2001); co_await verify_row(1002, 2002); co_await verify_row(1003, 2003); } }).get(); }); } SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) { return sstables::test_env::do_with_async([] (sstables::test_env& env) { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("v", bytes_type) .build(); auto cf_stats = make_lw_shared(); replica::column_family::config cfg = env.make_table_config(); cfg.enable_disk_reads = true; cfg.enable_disk_writes = true; cfg.enable_cache = true; cfg.enable_incremental_backups = false; cfg.cf_stats = &*cf_stats; with_column_family(s, cfg, env.manager(), [&env, s](replica::column_family& cf) { return seastar::async([&env, s, &cf] { // populate auto new_key = [&] { static thread_local int next = 0; return dht::decorate_key(*s, partition_key::from_single_value(*s, to_bytes(format("key{:d}", next++)))); }; auto make_mutation = [&] { mutation m(s, new_key()); m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(to_bytes("value")), 1); return m; }; utils::chunked_vector mutations; for (int i = 0; i < 1000; ++i) { auto m = make_mutation(); cf.apply(m); mutations.emplace_back(std::move(m)); } std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator()); // Flush will happen in the middle of reading for this scanner auto assert_that_scanner1 = assert_that(cf.make_mutation_reader(s, env.make_reader_permit(), query::full_partition_range)); // Flush will happen before it is invoked auto assert_that_scanner2 = assert_that(cf.make_mutation_reader(s, env.make_reader_permit(), query::full_partition_range)); // Flush will happen after all data was read, but before EOS was consumed auto assert_that_scanner3 = assert_that(cf.make_mutation_reader(s, env.make_reader_permit(), query::full_partition_range)); assert_that_scanner1.produces(mutations[0]); assert_that_scanner1.produces(mutations[1]); for (unsigned i = 0; i < mutations.size(); ++i) { assert_that_scanner3.produces(mutations[i]); } auto ms = active_memtables(cf); // held by scanners auto flushed = cf.flush(); while (!std::ranges::all_of(ms, std::mem_fn(&replica::memtable::is_flushed))) { sleep(10ms).get(); } for (unsigned i = 2; i < mutations.size(); ++i) { assert_that_scanner1.produces(mutations[i]); } assert_that_scanner1.produces_end_of_stream(); for (unsigned i = 0; i < mutations.size(); ++i) { assert_that_scanner2.produces(mutations[i]); } assert_that_scanner2.produces_end_of_stream(); assert_that_scanner3.produces_end_of_stream(); flushed.get(); }); }).get(); }); } SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { return sstables::test_env::do_with_async([] (sstables::test_env& env) { auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", int32_type, column_kind::partition_key) .with_column("c1", int32_type, column_kind::clustering_key) .with_column("r1", int32_type) .build(); auto cf_stats = make_lw_shared(); replica::column_family::config cfg = env.make_table_config(); cfg.enable_disk_reads = false; cfg.enable_disk_writes = false; cfg.enable_incremental_backups = false; cfg.cf_stats = &*cf_stats; with_column_family(s, cfg, env.manager(), [s, &env] (auto& cf) mutable -> future<> { std::map> shadow, result; const column_definition& r1_col = *s->get_column_definition("r1"); api::timestamp_type ts = 0; auto insert_row = [&] (int32_t p1, int32_t c1, int32_t r1) { auto key = partition_key::from_exploded(*s, {int32_type->decompose(p1)}); auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); mutation m(s, key); m.set_clustered_cell(c_key, r1_col, atomic_cell::make_live(*int32_type, ts++, int32_type->decompose(r1))); cf.apply(std::move(m)); shadow[p1][c1] = r1; }; std::minstd_rand random_engine; std::normal_distribution<> pk_distribution(0, 10); std::normal_distribution<> ck_distribution(0, 5); std::normal_distribution<> r_distribution(0, 100); for (unsigned i = 0; i < 10; ++i) { for (unsigned j = 0; j < 100; ++j) { insert_row(pk_distribution(random_engine), ck_distribution(random_engine), r_distribution(random_engine)); } // In the background, cf.stop() will wait for this. (void)cf.flush(); } auto reader = cf.make_mutation_reader(s, env.make_reader_permit()); while (true) { mutation_opt mo = co_await read_mutation_from_mutation_reader(reader); if (!mo) { break; } const dht::decorated_key& pk = mo->decorated_key(); const mutation_partition& mp = mo->partition(); auto p1 = value_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, interval())) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); auto cell = re.row().cells().find_cell(r1_col.id); if (cell) { result[p1][c1] = value_cast(int32_type->deserialize(cell->as_atomic_cell(r1_col).value().linearize())); } } } co_await reader.close(); BOOST_REQUIRE(shadow == result); }).then([cf_stats] {}).get(); }); } SEASTAR_TEST_CASE(test_cell_ordering) { auto now = gc_clock::now(); auto ttl_1 = gc_clock::duration(1); auto ttl_2 = gc_clock::duration(2); auto expiry_1 = now + ttl_1; auto expiry_2 = now + ttl_2; auto assert_order = [] (atomic_cell_view first, atomic_cell_view second) { testlog.trace("Expected {} < {}", first, second); BOOST_REQUIRE(compare_atomic_cell_for_merge(first, second) < 0); testlog.trace("Expected {} > {}", second, first); BOOST_REQUIRE(compare_atomic_cell_for_merge(second, first) > 0); }; auto assert_equal = [] (atomic_cell_view c1, atomic_cell_view c2) { testlog.trace("Expected {} == {}", c1, c2); BOOST_REQUIRE(compare_atomic_cell_for_merge(c1, c2) == 0); BOOST_REQUIRE(compare_atomic_cell_for_merge(c2, c1) == 0); }; testlog.debug("Live cells with same value are equal"); assert_equal( atomic_cell::make_live(*bytes_type, 0, bytes("value")), atomic_cell::make_live(*bytes_type, 0, bytes("value"))); testlog.debug("Non-expiring live cells are ordered before expiring cells"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value")), atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1)); testlog.debug("Non-expiring live cells are ordered before expiring cells, regardless of their value"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value2")), atomic_cell::make_live(*bytes_type, 1, bytes("value1"), expiry_1, ttl_1)); testlog.debug("Dead cells with same expiry are equal"); assert_equal( atomic_cell::make_dead(1, expiry_1), atomic_cell::make_dead(1, expiry_1)); testlog.debug("Non-expiring live cells are ordered before expiring cells, with empty value"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes()), atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_2)); // Origin doesn't compare ttl (is it wise?) // But we do. See https://github.com/scylladb/scylla/issues/10156 // and https://github.com/scylladb/scylla/issues/10173 testlog.debug("Expiring cells with higher ttl are ordered before expiring cells with smaller ttl and same expiry time"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_2), atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_1, ttl_1)); testlog.debug("Cells are ordered by value if all else is equal"); assert_order( atomic_cell::make_live(*bytes_type, 0, bytes("value1")), atomic_cell::make_live(*bytes_type, 0, bytes("value2"))); testlog.debug("Cells are ordered by value in lexicographical order if all else is equal"); assert_order( atomic_cell::make_live(*bytes_type, 0, bytes("value12")), atomic_cell::make_live(*bytes_type, 0, bytes("value2"))); testlog.debug("Live cells are ordered first by timestamp..."); assert_order( atomic_cell::make_live(*bytes_type, 0, bytes("value2")), atomic_cell::make_live(*bytes_type, 1, bytes("value1"))); testlog.debug("...then by expiry"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_1), atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_2, ttl_1)); testlog.debug("...then by ttl (in reverse)"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_2), atomic_cell::make_live(*bytes_type, 1, bytes(), expiry_1, ttl_1)); testlog.debug("...then by value"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value1"), expiry_1, ttl_1), atomic_cell::make_live(*bytes_type, 1, bytes("value2"), expiry_1, ttl_1)); testlog.debug("Dead wins"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value")), atomic_cell::make_dead(1, expiry_1)); testlog.debug("Dead wins with expiring cell"); assert_order( atomic_cell::make_live(*bytes_type, 1, bytes("value"), expiry_2, ttl_2), atomic_cell::make_dead(1, expiry_1)); testlog.debug("Deleted cells are ordered first by timestamp..."); assert_order( atomic_cell::make_dead(1, expiry_2), atomic_cell::make_dead(2, expiry_1)); testlog.debug("...then by expiry"); assert_order( atomic_cell::make_dead(1, expiry_1), atomic_cell::make_dead(1, expiry_2)); return make_ready_future<>(); } static query::partition_slice make_full_slice(const schema& s) { return partition_slice_builder(s).build(); } SEASTAR_TEST_CASE(test_querying_of_mutation) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); auto resultify = [s] (const mutation& m) -> query::result_set { auto slice = make_full_slice(*s); return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice)); }; mutation m(s, partition_key::from_single_value(*s, "key1")); m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1); assert_that(resultify(m)) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("v", data_value(bytes("v1")))); m.partition().apply(tombstone(2, gc_clock::now())); assert_that(resultify(m)).is_empty(); }); } SEASTAR_TEST_CASE(test_partition_with_no_live_data_is_absent_in_data_query_results) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("sc1", bytes_type, column_kind::static_column) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); mutation m(s, partition_key::from_single_value(*s, "key1")); m.partition().apply(tombstone(1, gc_clock::now())); m.partition().static_row().apply(*s->get_column_definition("sc1"), atomic_cell::make_dead(2, gc_clock::now())); m.set_clustered_cell(clustering_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("A")))), *s->get_column_definition("v"), atomic_cell::make_dead(2, gc_clock::now())); auto slice = make_full_slice(*s); assert_that(query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice))) .is_empty(); }); } SEASTAR_TEST_CASE(test_partition_with_live_data_in_static_row_is_present_in_the_results_even_if_static_row_was_not_queried) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("sc1", bytes_type, column_kind::static_column) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); mutation m(s, partition_key::from_single_value(*s, "key1")); m.partition().static_row().apply(*s->get_column_definition("sc1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("sc1:value"))))); auto slice = partition_slice_builder(*s) .with_no_static_columns() .with_regular_column("v") .build(); assert_that(query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("v", data_value::make_null(bytes_type))); }); } SEASTAR_TEST_CASE(test_query_result_with_one_regular_column_missing) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .build(); mutation m(s, partition_key::from_single_value(*s, "key1")); m.set_clustered_cell(clustering_key::from_single_value(*s, bytes("ck:A")), *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value"))))); auto slice = partition_slice_builder(*s).build(); assert_that(query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice))) .has_only(a_row() .with_column("pk", data_value(bytes("key1"))) .with_column("ck", data_value(bytes("ck:A"))) .with_column("v1", data_value(bytes("v1:value"))) .with_column("v2", data_value::make_null(bytes_type))); }); } SEASTAR_TEST_CASE(test_row_counting) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("sc1", bytes_type, column_kind::static_column) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); auto col_v = *s->get_column_definition("v"); mutation m(s, partition_key::from_single_value(*s, "key1")); BOOST_REQUIRE_EQUAL(0, m.live_row_count()); auto ckey1 = clustering_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("A")))); auto ckey2 = clustering_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("B")))); m.set_clustered_cell(ckey1, col_v, atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v:value"))))); BOOST_REQUIRE_EQUAL(1, m.live_row_count()); m.partition().static_row().apply(*s->get_column_definition("sc1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("sc1:value"))))); BOOST_REQUIRE_EQUAL(1, m.live_row_count()); m.set_clustered_cell(ckey1, col_v, atomic_cell::make_dead(2, gc_clock::now())); BOOST_REQUIRE_EQUAL(1, m.live_row_count()); m.partition().static_row().apply(*s->get_column_definition("sc1"), atomic_cell::make_dead(2, gc_clock::now())); BOOST_REQUIRE_EQUAL(0, m.live_row_count()); m.partition().clustered_row(*s, ckey1).apply(row_marker(api::timestamp_type(3))); BOOST_REQUIRE_EQUAL(1, m.live_row_count()); m.partition().apply(tombstone(3, gc_clock::now())); BOOST_REQUIRE_EQUAL(0, m.live_row_count()); m.set_clustered_cell(ckey1, col_v, atomic_cell::make_live(*bytes_type, 4, bytes_type->decompose(data_value(bytes("v:value"))))); m.set_clustered_cell(ckey2, col_v, atomic_cell::make_live(*bytes_type, 4, bytes_type->decompose(data_value(bytes("v:value"))))); BOOST_REQUIRE_EQUAL(2, m.live_row_count()); }); } SEASTAR_TEST_CASE(test_tombstone_apply) { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); auto pkey = partition_key::from_single_value(*s, "key1"); mutation m1(s, pkey); BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone(), tombstone()); mutation m2(s, pkey); auto tomb = tombstone(api::new_timestamp(), gc_clock::now()); m2.partition().apply(tomb); BOOST_REQUIRE_EQUAL(m2.partition().partition_tombstone(), tomb); m1.apply(m2); BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone(), tomb); return make_ready_future<>(); } SEASTAR_TEST_CASE(test_marker_apply) { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); auto pkey = partition_key::from_single_value(*s, "pk1"); auto ckey = clustering_key::from_single_value(*s, "ck1"); auto mutation_with_marker = [&] (row_marker rm) { mutation m(s, pkey); m.partition().clustered_row(*s, ckey).marker() = rm; return m; }; { mutation m(s, pkey); auto marker = row_marker(api::new_timestamp()); auto mm = mutation_with_marker(marker); m.apply(mm); BOOST_REQUIRE_EQUAL(m.partition().clustered_row(*s, ckey).marker(), marker); } { mutation m(s, pkey); auto marker = row_marker(api::new_timestamp(), std::chrono::seconds(1), gc_clock::now()); m.apply(mutation_with_marker(marker)); BOOST_REQUIRE_EQUAL(m.partition().clustered_row(*s, ckey).marker(), marker); } return make_ready_future<>(); } SEASTAR_TEST_CASE(test_apply_monotonically_is_monotonic) { auto do_test = [](auto&& gen) { auto&& alloc = standard_allocator(); with_allocator(alloc, [&] { mutation_application_stats app_stats; auto&& s = *gen.schema(); mutation target = gen(); mutation second = gen(); target.partition().set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); second.partition().set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); // Mark random ranges as continuous in target and second. // Note that continuity merging rules mandate that the ranges are discjoint // between the two. { int which = 0; for (auto&& ck_range : gen.make_random_ranges(7)) { bool use_second = which++ % 2; mutation& dst = use_second ? second : target; dst.partition().set_continuity(s, position_range::from_range(ck_range), is_continuous::yes); // Continutiy merging rules mandate that continuous range in the newer version // contains all rows which are in the old versions. if (use_second) { second.partition().apply(s, target.partition().sliced(s, {ck_range}), app_stats); } } } auto expected = target + second; mutation m = target; auto m2 = mutation_partition(*m.schema(), second.partition()); memory::with_allocation_failures([&] { auto d = defer([&] { auto&& s = *gen.schema(); auto c1 = m.partition().get_continuity(s); auto c2 = m2.get_continuity(s); clustering_interval_set actual; actual.add(s, c1); actual.add(s, c2); auto expected_cont = expected.partition().get_continuity(s); if (!actual.contained_in(expected_cont)) { BOOST_FAIL(format("Continuity should be contained in the expected one, expected {} ({} + {}), got {} ({} + {})", expected_cont, target.partition().get_continuity(s), second.partition().get_continuity(s), actual, c1, c2)); } apply_resume res; m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker, app_stats, is_preemptible::no, res); assert_that(m).is_equal_to(expected); m = target; m2 = mutation_partition(*m.schema(), second.partition()); }); apply_resume res; m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker, app_stats, is_preemptible::no, res); d.cancel(); }); assert_that(m).is_equal_to(expected).has_same_continuity(expected); }); }; do_test(random_mutation_generator(random_mutation_generator::generate_counters::no)); do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes)); return make_ready_future<>(); } SEASTAR_THREAD_TEST_CASE(test_apply_monotonically_with_preemption) { auto do_test = [](auto&& gen) { auto&& alloc = standard_allocator(); with_allocator(alloc, [&] { mutation_application_stats app_stats; mutation target = gen(); mutation second = gen(); auto expected = target + second; mutation m = target; auto m2 = mutation_partition(*m.schema(), second.partition()); apply_resume res; memory::with_allocation_failures([&] { while (m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) { yield().get(); } }); assert_that(m).is_equal_to_compacted(expected); }); }; do_test(random_mutation_generator(random_mutation_generator::generate_counters::no)); do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes)); } SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) { auto do_test = [](auto&& gen) { auto&& alloc = standard_allocator(); with_allocator(alloc, [&] { mutation_application_stats app_stats; const schema& s = *gen.schema(); mutation target = gen(); mutation second = gen(); target.partition().set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); second.partition().set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); // Mark random ranges as continuous in target and second. // Note that continuity merging rules mandate that the ranges are disjoint // between the two. { int which = 0; for (auto&& ck_range : gen.make_random_ranges(7)) { bool use_second = which++ % 2; mutation& dst = use_second ? second : target; dst.partition().set_continuity(s, position_range::from_range(ck_range), is_continuous::yes); // Continuity merging rules mandate that continuous range in the newer version // contains all rows which are in the old versions. if (use_second) { second.partition().apply(s, target.partition().sliced(s, {ck_range}), app_stats); } } } auto expected = target + second; auto expected_cont = mutation_partition_v2(s, expected.partition()).get_continuity(s); testlog.trace("target: {}", target); testlog.trace("second: {}", second); testlog.trace("expected: {}", expected); auto preempt_check = [] () noexcept { try { memory::local_failure_injector().on_alloc_point(); return false; } catch (const std::bad_alloc&) { return true; } }; auto m = mutation_partition_v2(s, target.partition()); auto m2 = mutation_partition_v2(s, second.partition()); memory::with_allocation_failures([&] { auto reset_m = defer([&] { m = mutation_partition_v2(s, target.partition()); m2 = mutation_partition_v2(s, second.partition()); }); auto check = defer([&] { m.apply(s, std::move(m2)); assert_that(target.schema(), m).is_equal_to_compacted(expected.partition()); }); auto continuity_check = defer([&] { auto c1 = m.get_continuity(s); auto c2 = m2.get_continuity(s); clustering_interval_set actual; actual.add(s, c1); actual.add(s, c2); if (!actual.equals(s, expected_cont)) { testlog.trace("c1: {}", mutation_partition_v2::printer(s, m)); testlog.trace("c2: {}", mutation_partition_v2::printer(s, m2)); BOOST_FAIL(format("Continuity should be contained in the expected one, expected {}, got {} ({} + {})", expected_cont, actual, c1, c2)); } }); apply_resume res; if (m.apply_monotonically(s, s, std::move(m2), no_cache_tracker, app_stats, preempt_check, res, is_evictable::yes) == stop_iteration::yes) { continuity_check.cancel(); seastar::memory::local_failure_injector().cancel(); } reset_m.cancel(); }); }); }; do_test(random_mutation_generator(random_mutation_generator::generate_counters::no)); do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes)); return make_ready_future<>(); } SEASTAR_THREAD_TEST_CASE(test_split_mutations) { random_mutation_generator mut_gen1(random_mutation_generator::generate_counters::no); random_mutation_generator mut_gen2(random_mutation_generator::generate_counters::yes); const double fracs[] = {0.1, 0.3, 0.6, 0.8, 2.0}; for (unsigned i = 0; i < 100; ++i) { auto& mut_gen = (i & 1) == 0 ? mut_gen1 : mut_gen2; auto s = mut_gen.schema(); auto mut = mut_gen(); const auto mut_size = mut.memory_usage(*s); for (const auto frac: fracs) { const auto max_size = size_t(double(mut_size) * frac); if (max_size == 0) { continue; } utils::chunked_vector splitted; split_mutation(mut, splitted, max_size / 2).get(); BOOST_REQUIRE(!splitted.empty()); for (const auto& m: splitted) { BOOST_REQUIRE_EQUAL(m.schema(), s); BOOST_REQUIRE(!m.partition().empty()); if (m.memory_usage(*s) > max_size) { // We don't split rows into cells, so one row can be bigger than the limit. const auto rows_count = m.partition().row_count() + (m.partition().static_row().empty() ? 0 : 1); BOOST_REQUIRE_EQUAL(rows_count, 1); } } const auto squashed = squash_mutations(splitted); BOOST_REQUIRE_EQUAL(squashed.size(), 1); assert_that(squashed.front()).is_equal_to(mut); } } } SEASTAR_TEST_CASE(test_mutation_diff) { return seastar::async([] { mutation_application_stats app_stats; auto my_set_type = set_type_impl::get_instance(int32_type, true); auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("sc1", bytes_type, column_kind::static_column) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .with_column("v3", my_set_type, column_kind::regular_column) .build(); auto ckey1 = clustering_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("A")))); auto ckey2 = clustering_key::from_single_value(*s, bytes_type->decompose(data_value(bytes("B")))); mutation m1(s, partition_key::from_single_value(*s, "key1")); m1.set_static_cell(*s->get_column_definition("sc1"), atomic_cell::make_dead(2, gc_clock::now())); m1.partition().apply(tombstone { 1, gc_clock::now() }); m1.set_clustered_cell(ckey1, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value1"))))); m1.set_clustered_cell(ckey1, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v2:value2"))))); m1.partition().clustered_row(*s, ckey2).apply(row_marker(3)); m1.set_clustered_cell(ckey2, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v2:value4"))))); auto mset1 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell(), int32_type->decompose(2), make_atomic_cell()); m1.set_clustered_cell(ckey2, *s->get_column_definition("v3"), mset1.serialize(*my_set_type)); mutation m2(s, partition_key::from_single_value(*s, "key1")); m2.set_clustered_cell(ckey1, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 1, bytes_type->decompose(data_value(bytes("v1:value1a"))))); m2.set_clustered_cell(ckey1, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v2:value2"))))); m2.set_clustered_cell(ckey2, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value3"))))); m2.set_clustered_cell(ckey2, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, 3, bytes_type->decompose(data_value(bytes("v2:value4a"))))); auto mset2 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell(), int32_type->decompose(3), make_atomic_cell()); m2.set_clustered_cell(ckey2, *s->get_column_definition("v3"), mset2.serialize(*my_set_type)); mutation m3(s, partition_key::from_single_value(*s, "key1")); m3.set_clustered_cell(ckey1, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value1"))))); m3.set_clustered_cell(ckey2, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, 2, bytes_type->decompose(data_value(bytes("v1:value3"))))); m3.set_clustered_cell(ckey2, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, 3, bytes_type->decompose(data_value(bytes("v2:value4a"))))); auto mset3 = make_collection_mutation({}, int32_type->decompose(1), make_atomic_cell()); m3.set_clustered_cell(ckey2, *s->get_column_definition("v3"), mset3.serialize(*my_set_type)); mutation m12(s, partition_key::from_single_value(*s, "key1")); m12.apply(m1); m12.apply(m2); auto m2_1 = m2.partition().difference(*s, m1.partition()); BOOST_REQUIRE_EQUAL(m2_1.partition_tombstone(), tombstone()); BOOST_REQUIRE(!m2_1.static_row().size()); BOOST_REQUIRE(!m2_1.find_row(*s, ckey1)); BOOST_REQUIRE(m2_1.find_row(*s, ckey2)); BOOST_REQUIRE(m2_1.find_row(*s, ckey2)->find_cell(2)); auto cmv = m2_1.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); cmv.with_deserialized(*my_set_type, [] (collection_mutation_view_description cm) { BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(3)); }); mutation m12_1(s, partition_key::from_single_value(*s, "key1")); m12_1.apply(m1); m12_1.partition().apply(*s, m2_1, *s, app_stats); BOOST_REQUIRE_EQUAL(m12, m12_1); auto m1_2 = m1.partition().difference(*s, m2.partition()); BOOST_REQUIRE_EQUAL(m1_2.partition_tombstone(), m12.partition().partition_tombstone()); BOOST_REQUIRE(m1_2.find_row(*s, ckey1)); BOOST_REQUIRE(m1_2.find_row(*s, ckey2)); BOOST_REQUIRE(!m1_2.find_row(*s, ckey1)->find_cell(1)); BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(0)); BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(1)); cmv = m1_2.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation(); cmv.with_deserialized(*my_set_type, [] (collection_mutation_view_description cm) { BOOST_REQUIRE(cm.cells.size() == 1); BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(2)); }); mutation m12_2(s, partition_key::from_single_value(*s, "key1")); m12_2.apply(m2); m12_2.partition().apply(*s, m1_2, *s, app_stats); BOOST_REQUIRE_EQUAL(m12, m12_2); auto m3_12 = m3.partition().difference(*s, m12.partition()); BOOST_REQUIRE(m3_12.empty()); auto m12_3 = m12.partition().difference(*s, m3.partition()); BOOST_REQUIRE_EQUAL(m12_3.partition_tombstone(), m12.partition().partition_tombstone()); mutation m123(s, partition_key::from_single_value(*s, "key1")); m123.apply(m3); m123.partition().apply(*s, m12_3, *s, app_stats); BOOST_REQUIRE_EQUAL(m12, m123); }); } SEASTAR_TEST_CASE(test_large_blobs) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto s = schema_builder(some_keyspace, some_column_family) .with_column("p1", utf8_type, column_kind::partition_key) .with_column("s1", bytes_type, column_kind::static_column) .build(); auto mt = make_lw_shared(s); auto blob1 = make_blob(1234567); auto blob2 = make_blob(2345678); const column_definition& s1_col = *s->get_column_definition("s1"); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); mutation m(s, key); m.set_static_cell(s1_col, make_atomic_cell(bytes_type, data_value(blob1))); mt->apply(std::move(m)); auto p = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r = p.static_row(); auto i = r.find_cell(s1_col.id); BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(s1_col); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(bytes_type->equal(cell.value().linearize(), bytes_type->decompose(data_value(blob1)))); // Stress managed_bytes::linearize and scatter by merging a value into the cell mutation m2(s, key); m2.set_static_cell(s1_col, atomic_cell::make_live(*bytes_type, 7, bytes_type->decompose(data_value(blob2)))); mt->apply(std::move(m2)); auto p2 = get_partition(semaphore.make_permit(), *mt, key); lazy_row& r2 = p2.static_row(); auto i2 = r2.find_cell(s1_col.id); BOOST_REQUIRE(i2); auto cell2 = i2->as_atomic_cell(s1_col); BOOST_REQUIRE(cell2.is_live()); BOOST_REQUIRE(bytes_type->equal(cell2.value().linearize(), bytes_type->decompose(data_value(blob2)))); }); } SEASTAR_TEST_CASE(test_mutation_equality) { return seastar::async([] { for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) { if (eq) { assert_that(m1).is_equal_to(m2); } else { assert_that(m1).is_not_equal_to(m2); } }); }); } SEASTAR_TEST_CASE(test_mutation_hash) { return seastar::async([] { for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) { auto test_with_hasher = [&] (auto hasher) { auto get_hash = [&] (const mutation &m) { auto h = hasher; feed_hash(h, m); return h.finalize(); }; auto h1 = get_hash(m1); auto h2 = get_hash(m2); if (eq) { if (h1 != h2) { BOOST_FAIL(format("Hash should be equal for {} and {}", m1, m2)); } } else { // We're using a strong hasher, collision should be unlikely if (h1 == h2) { BOOST_FAIL(format("Hash should be different for {} and {}", m1, m2)); } } }; test_with_hasher(md5_hasher()); test_with_hasher(xx_hasher()); }); }); } static mutation compacted(const mutation& m, gc_clock::time_point now) { auto result = m; result.partition().compact_for_compaction(*result.schema(), always_gc, result.decorated_key(), now, tombstone_gc_state(nullptr)); return result; } SEASTAR_TEST_CASE(test_query_digest) { return seastar::async([] { auto now = gc_clock::now(); auto check_digests_equal = [now] (const mutation& m1, const mutation& m2) { auto ps1 = partition_slice_builder(*m1.schema()).build(); auto ps2 = partition_slice_builder(*m2.schema()).build(); auto digest1 = *query_mutation(mutation(m1), ps1, query::max_rows, now, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); auto digest2 = *query_mutation( mutation(m2), ps2, query::max_rows, now, query::result_options::only_digest(query::digest_algorithm::xxHash)).digest(); if (digest1 != digest2) { BOOST_FAIL(format("Digest should be the same for {} and {}", m1, m2)); } }; for_each_mutation_pair([&] (const mutation& m1, const mutation& m2, are_equal eq) { if (m1.schema()->version() != m2.schema()->version()) { return; } if (eq) { check_digests_equal(compacted(m1, now), m2); check_digests_equal(m1, compacted(m2, now)); } else { testlog.info("If not equal, they should become so after applying diffs mutually"); mutation_application_stats app_stats; schema_ptr s = m1.schema(); auto m3 = m2; { auto diff = m1.partition().difference(*s, m2.partition()); m3.partition().apply(*m3.schema(), std::move(diff), app_stats); } auto m4 = m1; { auto diff = m2.partition().difference(*s, m1.partition()); m4.partition().apply(*m4.schema(), std::move(diff), app_stats); } check_digests_equal(m3, m4); } }); }); } SEASTAR_TEST_CASE(test_mutation_upgrade_of_equal_mutations) { return seastar::async([] { for_each_mutation_pair([](auto&& m1, auto&& m2, are_equal eq) { if (eq == are_equal::yes) { assert_that(m1).is_upgrade_equivalent(m2.schema()); assert_that(m2).is_upgrade_equivalent(m1.schema()); } }); }); } SEASTAR_TEST_CASE(test_mutation_upgrade) { return seastar::async([] { auto make_builder = [] { return schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key); }; auto s = make_builder() .with_column("sc1", bytes_type, column_kind::static_column) .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .build(); auto pk = partition_key::from_singular(*s, data_value(bytes("key1"))); auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A"))); { mutation m(s, pk); m.set_clustered_cell(ckey1, "v2", data_value(bytes("v2:value")), 1); assert_that(m).is_upgrade_equivalent( make_builder() // without v1 .with_column("sc1", bytes_type, column_kind::static_column) .with_column("v2", bytes_type, column_kind::regular_column) .build()); assert_that(m).is_upgrade_equivalent( make_builder() // without sc1 .with_column("v1", bytes_type, column_kind::static_column) .with_column("v2", bytes_type, column_kind::regular_column) .build()); assert_that(m).is_upgrade_equivalent( make_builder() // with v1 recreated as static .with_column("sc1", bytes_type, column_kind::static_column) .with_column("v1", bytes_type, column_kind::static_column) .with_column("v2", bytes_type, column_kind::regular_column) .build()); assert_that(m).is_upgrade_equivalent( make_builder() // with new column inserted before v1 .with_column("sc1", bytes_type, column_kind::static_column) .with_column("v0", bytes_type, column_kind::regular_column) .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .build()); assert_that(m).is_upgrade_equivalent( make_builder() // with new column inserted after v2 .with_column("sc1", bytes_type, column_kind::static_column) .with_column("v0", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .with_column("v3", bytes_type, column_kind::regular_column) .build()); } { mutation m(s, pk); m.set_clustered_cell(ckey1, "v1", data_value(bytes("v2:value")), 1); m.set_clustered_cell(ckey1, "v2", data_value(bytes("v2:value")), 1); auto s2 = make_builder() // v2 changed into a static column, v1 removed .with_column("v2", bytes_type, column_kind::static_column) .build(); m.upgrade(s2); mutation m2(s2, pk); m2.partition().clustered_row(*s2, ckey1); assert_that(m).is_equal_to(m2); } { mutation m(make_builder() .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::regular_column) .with_column("v3", bytes_type, column_kind::regular_column) .build(), pk); m.set_clustered_cell(ckey1, "v1", data_value(bytes("v1:value")), 1); m.set_clustered_cell(ckey1, "v2", data_value(bytes("v2:value")), 1); m.set_clustered_cell(ckey1, "v3", data_value(bytes("v3:value")), 1); auto s2 = make_builder() // v2 changed into a static column .with_column("v1", bytes_type, column_kind::regular_column) .with_column("v2", bytes_type, column_kind::static_column) .with_column("v3", bytes_type, column_kind::regular_column) .build(); m.upgrade(s2); mutation m2(s2, pk); m2.set_clustered_cell(ckey1, "v1", data_value(bytes("v1:value")), 1); m2.set_clustered_cell(ckey1, "v3", data_value(bytes("v3:value")), 1); assert_that(m).is_equal_to(m2); } }); } SEASTAR_THREAD_TEST_CASE(test_mutation_upgrade_type_change) { auto make_builder = [] { return schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key); }; auto s1 = make_builder() .with_column("v1", int32_type) .build(); auto s2 = make_builder() .with_column("v1", bytes_type) .build(); auto pk = partition_key::from_singular(*s1, data_value(bytes("key1"))); auto ck1 = clustering_key::from_singular(*s1, data_value(bytes("A"))); mutation m(s1, pk); m.set_clustered_cell(ck1, "v1", data_value(int32_t(0x1234abcd)), 1); m.upgrade(s2); mutation m2(s2, pk); m2.set_clustered_cell(ck1, "v1", data_value(from_hex("1234abcd")), 1); assert_that(m).is_equal_to(m2); } // This test checks the behavior of row_marker::{is_live, is_dead, compact_and_expire}. Those functions have some // duplicated logic that decides if a row is expired, and this test verifies that they behave the same with respect // to TTL. SEASTAR_THREAD_TEST_CASE(test_row_marker_expiry) { auto must_be_alive = [&] (row_marker mark, gc_clock::time_point t) { testlog.trace("must_be_alive({}, {})", mark, t); BOOST_REQUIRE(mark.is_live(tombstone(), t)); BOOST_REQUIRE(mark.is_missing() || !mark.is_dead(t)); BOOST_REQUIRE(mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point())); }; auto must_be_dead = [&] (row_marker mark, gc_clock::time_point t) { testlog.trace("must_be_dead({}, {})", mark, t); BOOST_REQUIRE(!mark.is_live(tombstone(), t)); BOOST_REQUIRE(mark.is_missing() || mark.is_dead(t)); BOOST_REQUIRE(!mark.compact_and_expire(tombstone(), t, never_gc, gc_clock::time_point())); }; const auto timestamp = api::timestamp_type(1); const auto t0 = gc_clock::now(); const auto t1 = t0 + 1s; const auto t2 = t0 + 2s; const auto t3 = t0 + 3s; // Without timestamp the marker is missing (doesn't exist) const row_marker m1; must_be_dead(m1, t0); must_be_dead(m1, t1); must_be_dead(m1, t2); must_be_dead(m1, t3); // With timestamp and without ttl, a row_marker is always alive const row_marker m2(timestamp); must_be_alive(m2, t0); must_be_alive(m2, t1); must_be_alive(m2, t2); must_be_alive(m2, t3); // A row_marker becomes dead exactly at the moment of expiry // Reproduces #4263, #5290 const auto ttl = 1s; const row_marker m3(timestamp, ttl, t2); must_be_alive(m3, t0); must_be_alive(m3, t1); must_be_dead(m3, t2); must_be_dead(m3, t3); } SEASTAR_THREAD_TEST_CASE(test_querying_expired_rows) { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .build(); auto pk = partition_key::from_singular(*s, data_value(bytes("key1"))); auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A"))); auto ckey2 = clustering_key::from_singular(*s, data_value(bytes("B"))); auto ckey3 = clustering_key::from_singular(*s, data_value(bytes("C"))); auto ttl = 1s; auto t0 = gc_clock::now(); auto t1 = t0 + 1s; auto t2 = t0 + 2s; auto t3 = t0 + 3s; auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) { auto slice = partition_slice_builder(*s) .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; mutation m(s, pk); m.partition().clustered_row(*m.schema(), ckey1).apply(row_marker(api::new_timestamp(), ttl, t1)); m.partition().clustered_row(*m.schema(), ckey2).apply(row_marker(api::new_timestamp(), ttl, t2)); m.partition().clustered_row(*m.schema(), ckey3).apply(row_marker(api::new_timestamp(), ttl, t3)); assert_that(results_at_time(m, t0)) .has_size(3) .has(a_row().with_column("ck", data_value(bytes("A")))) .has(a_row().with_column("ck", data_value(bytes("B")))) .has(a_row().with_column("ck", data_value(bytes("C")))); assert_that(results_at_time(m, t1)) .has_size(2) .has(a_row().with_column("ck", data_value(bytes("B")))) .has(a_row().with_column("ck", data_value(bytes("C")))); assert_that(results_at_time(m, t2)) .has_size(1) .has(a_row().with_column("ck", data_value(bytes("C")))); assert_that(results_at_time(m, t3)).is_empty(); } SEASTAR_TEST_CASE(test_querying_expired_cells) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("s1", bytes_type, column_kind::static_column) .with_column("s2", bytes_type, column_kind::static_column) .with_column("s3", bytes_type, column_kind::static_column) .with_column("v1", bytes_type) .with_column("v2", bytes_type) .with_column("v3", bytes_type) .build(); auto pk = partition_key::from_singular(*s, data_value(bytes("key1"))); auto ckey1 = clustering_key::from_singular(*s, data_value(bytes("A"))); auto ttl = std::chrono::seconds(1); auto t1 = gc_clock::now(); auto t0 = t1 - std::chrono::seconds(1); auto t2 = t1 + std::chrono::seconds(1); auto t3 = t2 + std::chrono::seconds(1); auto v1 = data_value(bytes("1")); auto v2 = data_value(bytes("2")); auto v3 = data_value(bytes("3")); auto results_at_time = [s] (const mutation& m, gc_clock::time_point t) { auto slice = partition_slice_builder(*s) .with_regular_column("v1") .with_regular_column("v2") .with_regular_column("v3") .with_static_column("s1") .with_static_column("s2") .with_static_column("s3") .without_clustering_key_columns() .without_partition_key_columns() .build(); auto opts = query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}; return query::result_set::from_raw_result(s, slice, query_mutation(mutation(m), slice, query::max_rows, t, opts)); }; { mutation m(s, pk); m.set_clustered_cell(ckey1, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v1.serialize_nonnull(), t1, ttl)); m.set_clustered_cell(ckey1, *s->get_column_definition("v2"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v2.serialize_nonnull(), t2, ttl)); m.set_clustered_cell(ckey1, *s->get_column_definition("v3"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v3.serialize_nonnull(), t3, ttl)); m.set_static_cell(*s->get_column_definition("s1"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v1.serialize_nonnull(), t1, ttl)); m.set_static_cell(*s->get_column_definition("s2"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v2.serialize_nonnull(), t2, ttl)); m.set_static_cell(*s->get_column_definition("s3"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v3.serialize_nonnull(), t3, ttl)); assert_that(results_at_time(m, t0)) .has_only(a_row() .with_column("s1", v1) .with_column("s2", v2) .with_column("s3", v3) .with_column("v1", v1) .with_column("v2", v2) .with_column("v3", v3) .and_only_that()); assert_that(results_at_time(m, t1)) .has_only(a_row() .with_column("s2", v2) .with_column("s3", v3) .with_column("v2", v2) .with_column("v3", v3) .and_only_that()); assert_that(results_at_time(m, t2)) .has_only(a_row() .with_column("s3", v3) .with_column("v3", v3) .and_only_that()); assert_that(results_at_time(m, t3)).is_empty(); } { mutation m(s, pk); m.set_clustered_cell(ckey1, *s->get_column_definition("v1"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v1.serialize_nonnull(), t1, ttl)); m.set_static_cell(*s->get_column_definition("s1"), atomic_cell::make_live(*bytes_type, api::new_timestamp(), v1.serialize_nonnull(), t3, ttl)); assert_that(results_at_time(m, t2)) .has_only(a_row().with_column("s1", v1).and_only_that()); assert_that(results_at_time(m, t3)).is_empty(); } }); } SEASTAR_TEST_CASE(test_tombstone_purge) { auto builder = schema_builder("tests", "tombstone_purge") .with_column("id", utf8_type, column_kind::partition_key) .with_column("value", int32_type); builder.set_gc_grace_seconds(0); auto s = builder.build(); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); const column_definition& col = *s->get_column_definition("value"); mutation m(s, key); m.set_clustered_cell(clustering_key::make_empty(), col, make_atomic_cell(int32_type, 1)); tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1)); m.partition().apply(tomb); BOOST_REQUIRE(!m.partition().empty()); m.partition().compact_for_compaction(*s, always_gc, m.decorated_key(), gc_clock::now(), tombstone_gc_state(nullptr)); // Check that row was covered by tombstone. BOOST_REQUIRE(m.partition().empty()); // Check that tombstone was purged after compact_for_compaction(). BOOST_REQUIRE(!m.partition().partition_tombstone()); return make_ready_future<>(); } SEASTAR_TEST_CASE(test_slicing_mutation) { auto s = schema_builder("ks", "cf") .with_column("pk", int32_type, column_kind::partition_key) .with_column("ck", int32_type, column_kind::clustering_key) .with_column("v", int32_type) .build(); auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) }); mutation m(s, pk); constexpr auto row_count = 8; for (auto i = 0; i < row_count; i++) { m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)), to_bytes("v"), data_value(i), api::new_timestamp()); } auto verify_rows = [&] (mutation_partition& mp, std::vector rows) { std::deque cks; for (auto&& cr : rows) { cks.emplace_back(clustering_key_prefix::from_single_value(*s, int32_type->decompose(cr))); } clustering_key::equality ck_eq(*s); for (auto&& cr : mp.clustered_rows()) { BOOST_REQUIRE(ck_eq(cr.key(), cks.front())); cks.pop_front(); } }; auto test_slicing = [&] (query::clustering_row_ranges ranges, std::vector expected_rows) { mutation_partition mp1(m.partition(), *s, ranges); auto mp_temp = mutation_partition(*s, m.partition()); mutation_partition mp2(std::move(mp_temp), *s, ranges); BOOST_REQUIRE(mp1.equal(*s, mp2)); verify_rows(mp1, expected_rows); }; test_slicing(query::clustering_row_ranges { query::clustering_range { { }, query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(2)), false }, }, clustering_key_prefix::from_single_value(*s, int32_type->decompose(5)), query::clustering_range { query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(7)) }, query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(10)) }, }, }, std::vector { 0, 1, 5, 7 }); test_slicing(query::clustering_row_ranges { query::clustering_range { query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(1)) }, query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(2)) }, }, query::clustering_range { query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(4)), false }, query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(6)) }, }, query::clustering_range { query::clustering_range::bound { clustering_key_prefix::from_single_value(*s, int32_type->decompose(7)), false }, { }, }, }, std::vector { 1, 2, 5, 6 }); test_slicing(query::clustering_row_ranges { query::clustering_range { { }, { }, }, }, std::vector { 0, 1, 2, 3, 4, 5, 6, 7 }); return make_ready_future<>(); } SEASTAR_TEST_CASE(test_trim_rows) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", int32_type, column_kind::partition_key) .with_column("ck", int32_type, column_kind::clustering_key) .with_column("v", int32_type) .build(); auto pk = partition_key::from_exploded(*s, { int32_type->decompose(0) }); mutation m(s, pk); constexpr auto row_count = 8; for (auto i = 0; i < row_count; i++) { m.set_clustered_cell(clustering_key_prefix::from_single_value(*s, int32_type->decompose(i)), to_bytes("v"), data_value(i), api::new_timestamp() - 5); } m.partition().apply(tombstone(api::new_timestamp(), gc_clock::now())); auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(1)); auto compact_and_expect_empty = [&] (mutation m, std::vector ranges) { mutation m2 = m; m.partition().compact_for_query(*s, m.decorated_key(), now, ranges, false, query::max_rows); BOOST_REQUIRE(m.partition().clustered_rows().empty()); std::reverse(ranges.begin(), ranges.end()); for (auto& range : ranges) { if (!range.is_singular()) { range = query::clustering_range(range.end(), range.start()); } } m2 = reverse(m); auto reversed_schema = s->make_reversed(); m2.partition().compact_for_query(*reversed_schema, m2.decorated_key(), now, ranges, false, query::max_rows); BOOST_REQUIRE(m2.partition().clustered_rows().empty()); }; std::vector ranges = { query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5))) }; compact_and_expect_empty(m, ranges); ranges = { query::clustering_range::make_starting_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(50))) }; compact_and_expect_empty(m, ranges); ranges = { query::clustering_range::make_ending_with(clustering_key_prefix::from_single_value(*s, int32_type->decompose(5))) }; compact_and_expect_empty(m, ranges); ranges = { query::clustering_range::make_open_ended_both_sides() }; compact_and_expect_empty(m, ranges); }); } SEASTAR_TEST_CASE(test_collection_cell_diff) { return seastar::async([] { auto s = schema_builder(some_keyspace, some_column_family) .with_column("p", utf8_type, column_kind::partition_key) .with_column("v", list_type_impl::get_instance(bytes_type, true)) .build(); auto& col = s->column_at(column_kind::regular_column, 0); auto k = dht::decorate_key(*s, partition_key::from_single_value(*s, to_bytes("key"))); mutation m1(s, k); auto uuid = utils::UUID_gen::get_time_UUID_bytes(); collection_mutation_description mcol1; mcol1.cells.emplace_back( bytes(reinterpret_cast(uuid.data()), uuid.size()), atomic_cell::make_live(*bytes_type, api::timestamp_type(1), to_bytes("element"))); m1.set_clustered_cell(clustering_key::make_empty(), col, mcol1.serialize(*col.type)); mutation m2(s, k); collection_mutation_description mcol2; mcol2.tomb = tombstone(api::timestamp_type(2), gc_clock::now()); m2.set_clustered_cell(clustering_key::make_empty(), col, mcol2.serialize(*col.type)); mutation m12 = m1; m12.apply(m2); auto diff = m12.partition().difference(*s, m1.partition()); BOOST_REQUIRE(!diff.empty()); BOOST_REQUIRE(m2.partition().equal(*s, diff)); }); } SEASTAR_TEST_CASE(test_apply_is_commutative) { return seastar::async([] { for_each_mutation_pair([] (auto&& m1, auto&& m2, are_equal eq) { auto s = m1.schema(); if (s != m2.schema()) { return; // mutations with different schemas not commutative } assert_that(m1 + m2).is_equal_to(m2 + m1); }); }); } SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) { return seastar::async([] { auto check_partitions_match = [] (const mutation_partition& mp1, const mutation_partition& mp2, const schema& s) { if (!mp1.equal(s, mp2)) { BOOST_FAIL(format("Partitions don't match, got: {}\n...and: {}", mutation_partition::printer(s, mp1), mutation_partition::printer(s, mp2))); } }; const auto now = gc_clock::now(); for_each_mutation_pair([&] (auto m1, auto m2, are_equal eq) { mutation_application_stats app_stats; auto s = m1.schema(); if (s != m2.schema()) { return; } m1.partition().compact_for_compaction(*s, never_gc, m1.decorated_key(), now, tombstone_gc_state(nullptr)); m2.partition().compact_for_compaction(*s, never_gc, m2.decorated_key(), now, tombstone_gc_state(nullptr)); auto m12 = m1; m12.apply(m2); auto m12_with_diff = m1; m12_with_diff.partition().apply(*s, m2.partition().difference(*s, m1.partition()), app_stats); check_partitions_match(m12.partition(), m12_with_diff.partition(), *s); check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s); check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s); check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s); // same as above, just using apply_gently m12 = m1; apply_gently(m12, m2).get(); m12_with_diff = m1; apply_gently(m12_with_diff.partition(), *s, m2.partition().difference(*s, m1.partition()), app_stats).get(); check_partitions_match(m12.partition(), m12_with_diff.partition(), *s); check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s); check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s); check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s); }); }); } SEASTAR_TEST_CASE(test_continuity_merging_of_complete_mutations) { random_mutation_generator gen(random_mutation_generator::generate_counters::no); mutation m1 = gen(); m1.partition().make_fully_continuous(); mutation m2 = gen(); m2.partition().make_fully_continuous(); mutation m3 = m1 + m2; assert_that(m3).is_continuous(position_range::all_clustered_rows(), is_continuous::yes); return make_ready_future<>(); } SEASTAR_TEST_CASE(test_commutativity_and_associativity) { random_mutation_generator gen(random_mutation_generator::generate_counters::no); gen.set_key_cardinality(7); for (int i = 0; i < 10; ++i) { mutation m1 = gen(); m1.partition().make_fully_continuous(); mutation m2 = gen(); m2.partition().make_fully_continuous(); mutation m3 = gen(); m3.partition().make_fully_continuous(); assert_that(m1 + m2 + m3) .is_equal_to(m1 + m3 + m2) .is_equal_to(m2 + m1 + m3) .is_equal_to(m2 + m3 + m1) .is_equal_to(m3 + m1 + m2) .is_equal_to(m3 + m2 + m1); } return make_ready_future<>(); } SEASTAR_THREAD_TEST_CASE(test_row_merging) { simple_schema table; auto&& s = *table.schema(); row r1; table.set_cell(r1, "v1"); row r2; table.set_cell(r2, "v2"); row r3; table.set_cell(r3, "v3"); r2.apply_monotonically(s, column_kind::regular_column, std::move(r1)); auto r3_backup = row(s, column_kind::regular_column, r3); r1.apply_monotonically(s, column_kind::regular_column, std::move(r3)); BOOST_REQUIRE(r1.equal(column_kind::regular_column, s, r3_backup, s)); } SEASTAR_TEST_CASE(test_continuity_merging) { return seastar::async([] { simple_schema table; auto&& s = *table.schema(); auto new_mutation = [&] { return mutation(table.schema(), table.make_pkey(0)); }; { auto left = new_mutation(); auto right = new_mutation(); auto result = new_mutation(); left.partition().clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::yes); right.partition().clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no); result.partition().clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(1), is_dummy::yes, is_continuous::yes); right.partition().clustered_row(s, table.make_ckey(2), is_dummy::yes, is_continuous::no); result.partition().clustered_row(s, table.make_ckey(1), is_dummy::yes, is_continuous::yes); result.partition().clustered_row(s, table.make_ckey(2), is_dummy::yes, is_continuous::no); left.partition().clustered_row(s, table.make_ckey(3), is_dummy::yes, is_continuous::yes); right.partition().clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::no); result.partition().clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::no); right.partition().clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::yes); result.partition().clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no); right.partition().clustered_row(s, table.make_ckey(5), is_dummy::yes, is_continuous::yes); result.partition().clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(6), is_dummy::no, is_continuous::yes); right.partition().clustered_row(s, table.make_ckey(6), is_dummy::yes, is_continuous::no); result.partition().clustered_row(s, table.make_ckey(6), is_dummy::no, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(7), is_dummy::yes, is_continuous::yes); right.partition().clustered_row(s, table.make_ckey(7), is_dummy::yes, is_continuous::no); result.partition().clustered_row(s, table.make_ckey(7), is_dummy::yes, is_continuous::yes); left.partition().clustered_row(s, table.make_ckey(8), is_dummy::yes, is_continuous::no); right.partition().clustered_row(s, table.make_ckey(8), is_dummy::yes, is_continuous::yes); result.partition().clustered_row(s, table.make_ckey(8), is_dummy::yes, is_continuous::yes); assert_that(right + left).has_same_continuity(result); } // static row continuity { auto complete = mutation(table.schema(), table.make_pkey(0)); auto incomplete = mutation(table.schema(), table.make_pkey(0)); incomplete.partition().set_static_row_continuous(false); assert_that(complete + complete).has_same_continuity(complete); assert_that(complete + incomplete).has_same_continuity(complete); assert_that(incomplete + complete).has_same_continuity(complete); assert_that(incomplete + incomplete).has_same_continuity(incomplete); } }); } static void test_all_preemption_points(std::function func) { uint64_t preempt_after = 0; bool preempted; do { testlog.trace("preempt after {}", preempt_after); preempted = false; uint64_t check_count = 0; func([&] () noexcept { if (check_count++ == preempt_after) { testlog.trace("preempted"); preempted = true; return true; } else { return false; } }); preempt_after++; } while (preempted); } SEASTAR_TEST_CASE(test_v2_merging_in_non_evictable_snapshot) { return seastar::async([] { random_mutation_generator gen(random_mutation_generator::generate_counters::no); const schema& s = *gen.schema(); mutation m1 = gen(); mutation m2 = gen(); m1.partition().make_fully_continuous(); m2.partition().make_fully_continuous(); testlog.trace("m1 = {}", m1); testlog.trace("m2 = {}", m2); mutation_partition_v2 m1_v2(s, m1.partition()); mutation_partition_v2 m2_v2(s, m2.partition()); m1_v2.compact(s, no_cache_tracker); m2_v2.compact(s, no_cache_tracker); BOOST_REQUIRE(!has_redundant_dummies(m1_v2)); BOOST_REQUIRE(!has_redundant_dummies(m2_v2)); testlog.trace("m1_v2 = {}", mutation_partition_v2::printer(s, m1_v2)); testlog.trace("m2_v2 = {}", mutation_partition_v2::printer(s, m2_v2)); mutation_application_stats app_stats; auto result_v1 = mutation_partition(s, (m1 + m2).partition()); auto check = [&] (preemption_check preempt) { auto result_v2 = mutation_partition_v2(s, m1_v2); auto to_apply = mutation_partition_v2(s, m2_v2); apply_resume res; while (result_v2.apply_monotonically(s, s, std::move(to_apply), no_cache_tracker, app_stats, [&] () noexcept { return preempt(); }, res, is_evictable::no) == stop_iteration::no) { seastar::thread::maybe_yield(); } BOOST_REQUIRE(!has_redundant_dummies(result_v2)); testlog.trace("result_v1 = {}", mutation_partition::printer(s, result_v1)); testlog.trace("result_v2 = {}", mutation_partition_v2::printer(s, result_v2)); testlog.trace("result_v2_as_v1 = {}", value_of([&] { return mutation_partition::printer(s, result_v2.as_mutation_partition(s)); })); assert_that(gen.schema(), result_v2).is_equal_to_compacted(result_v1); }; testlog.info("always_preempt"); check(always_preempt()); testlog.info("random_preempt"); check(tests::random::random_preempt()); testlog.info("exhaustive check of all preemption points"); test_all_preemption_points([&] (preemption_check preempt) { check(std::move(preempt)); }); }); } static void clear(cache_tracker& tracker, const schema& s, mutation_partition_v2& p) { while (p.clear_gently(&tracker) == stop_iteration::no) {} p = mutation_partition_v2(s); tracker.insert(p); } SEASTAR_TEST_CASE(test_v2_merging_in_evictable_snapshot) { return seastar::async([] { mutation_application_stats app_stats; cache_tracker tracker; random_mutation_generator gen(random_mutation_generator::generate_counters::no); const schema& s = *gen.schema(); mutation m1 = gen(); // older mutation m2 = gen() + m1; // newer mutation_partition_v2 m1_v2(s, m1.partition()); mutation_partition_v2 m2_v2(s, m2.partition()); // Newer version cannot have entries with range tombstones // if the range to the left is discontinuous. apply_monotonically() asserts this. for (rows_entry& e : m2_v2.mutable_clustered_rows()) { if (e.range_tombstone() && !e.continuous()) { e.set_continuous(true); } } // Break some continuity in the oldest version as if during eviction. // We want this to generate some non-dummy entries with continuity flag not set. bool succeeded = false; while (!succeeded) { bool has_non_dummies = false; for (rows_entry& e : m1_v2.mutable_clustered_rows()) { if (!e.dummy()) { has_non_dummies = true; } if (tests::random::with_probability(0.17)) { e.set_continuous(false); if (!e.dummy()) { succeeded = true; } } } if (!succeeded && !has_non_dummies) { break; // no chance to succeed } } testlog.trace("m1_v2 = {}", mutation_partition_v2::printer(s, m1_v2)); testlog.trace("m2_v2 = {}", mutation_partition_v2::printer(s, m2_v2)); auto expected_continuity = m1_v2.get_continuity(s, is_continuous::yes); testlog.trace("m1 cont = {}", expected_continuity); expected_continuity.add(s, m2_v2.get_continuity(s, is_continuous::yes)); testlog.trace("m2 cont = {}", m2_v2.get_continuity(s, is_continuous::yes)); tracker.insert(m1_v2); tracker.insert(m2_v2); auto drop_entries = defer([&] { // Don't let the cleaner free them. He assumes entries are allocated using its region() and they're not. clear(tracker, s, m1_v2); clear(tracker, s, m2_v2); }); auto result_v2 = mutation_partition_v2(s, m1_v2); tracker.insert(result_v2); auto clear_result_v2 = defer([&] { clear(tracker, s, result_v2); }); apply_resume res; while (result_v2.apply_monotonically(s, s, std::move(m2_v2), &tracker, app_stats, default_preemption_check(), res, is_evictable::yes) == stop_iteration::no) { seastar::thread::maybe_yield(); } testlog.trace("result_v2 = {}", mutation_partition_v2::printer(s, result_v2)); auto v2_continuity = result_v2.get_continuity(s, is_continuous::yes); if (!v2_continuity.equals(s, expected_continuity)) { BOOST_FAIL(format("Continuity mismatch, expected: {}\nbut got: {}", expected_continuity, v2_continuity)); } auto result_v1 = mutation_partition(s, m2.partition()); // Set result_v1 continuity to expected_continuity result_v1.set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); for (auto&& r : expected_continuity) { result_v1.set_continuity(s, r, is_continuous::yes); } testlog.trace("result_v1 = {}", mutation_partition::printer(s, result_v1)); if (testlog.is_enabled(seastar::log_level::trace)) { testlog.trace("result_v2_as_v1 = {}", mutation_partition::printer(s, result_v2.as_mutation_partition(s))); } assert_that(gen.schema(), result_v2).is_equal_to_compacted(result_v1); }); } SEASTAR_TEST_CASE(test_mutation_partition_conversion_between_v2_and_v1) { return seastar::async([] { simple_schema ss; random_mutation_generator gen(random_mutation_generator::generate_counters::no); const schema& s = *gen.schema(); mutation m = gen(); m.partition().make_fully_continuous(); mutation_partition_v2 v2(s, m.partition()); auto v1_from_v2 = v2.as_mutation_partition(s); assert_that(m.schema(), v1_from_v2).is_equal_to_compacted(s, m.partition()); }); } SEASTAR_TEST_CASE(test_conversion_of_range_tombstones_to_v2) { return seastar::async([] { simple_schema ss; const schema& s = *ss.schema(); mutation m(ss.schema(), ss.make_pkey()); ss.delete_range(m, ss.make_ckey_range(1, 3)); ss.delete_range(m, ss.make_ckey_range(3, 7)); ss.delete_range(m, ss.make_ckey_range(0, 0)); ss.delete_range(m, ss.make_ckey_range(13, 13)); // range tombstone which is past all the row entries, falls into implicit continuous range. ss.delete_range(m, ss.make_ckey_range(17, 19)); mutation_partition_v2 m_v2(*ss.schema(), m.partition()); BOOST_REQUIRE(m_v2.is_fully_continuous()); assert_that(m.schema(), m_v2).is_equal_to_compacted(s, m.partition()); }); } SEASTAR_TEST_CASE(test_continuity_merging_past_last_entry_in_evictable) { return seastar::async([] { simple_schema ss; const schema& s = *ss.schema(); mutation m1(ss.schema(), ss.make_pkey()); ss.delete_range(m1, ss.make_ckey_range(1, 3)); mutation_partition_v2 m1_v2(*ss.schema(), m1.partition()); { mutation m2(ss.schema(), ss.make_pkey()); ss.delete_range(m2, ss.make_ckey_range(5, 7)); mutation_partition_v2 m2_v2(*ss.schema(), m2.partition()); // All ranges are marked discontinuous so all dummies in m2_v2 should have no effect m2_v2.set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); // m2_v2: --------------- 5 [rt] --- 7 [rt] --- // m1_v2: === 1 === 3 ========================= mutation_application_stats app_stats; apply_resume resume; m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes); BOOST_REQUIRE(m1_v2.is_fully_continuous()); assert_that(ss.schema(), m1_v2).is_equal_to_compacted(s, (m1).partition()); } { mutation m2(ss.schema(), ss.make_pkey()); ss.delete_range(m2, ss.make_ckey_range(5, 7)); mutation_partition_v2 m2_v2(*ss.schema(), m2.partition()); // Leave only [5, 7] continuous m2_v2.set_continuity(s, position_range::all_clustered_rows(), is_continuous::no); m2_v2.set_continuity(s, position_range(ss.make_ckey_range(5, 7)), is_continuous::yes); mutation_application_stats app_stats; apply_resume resume; // m2_v2: --------------- 5 ==(rt)== 7 [rt] --- // m1_v2: === 1 === 3 ========================= m1_v2.apply_monotonically(s, s, std::move(m2_v2), nullptr, app_stats, never_preempt(), resume, is_evictable::yes); BOOST_REQUIRE(m1_v2.is_fully_continuous()); assert_that(ss.schema(), m1_v2).is_equal_to_compacted(s, (m1 + m2).partition()); } }); } class measuring_allocator final : public allocation_strategy { size_t _allocated_bytes = 0; public: measuring_allocator() { _preferred_max_contiguous_allocation = standard_allocator().preferred_max_contiguous_allocation(); } virtual void* alloc(migrate_fn mf, size_t size, size_t alignment) override { _allocated_bytes += size; return standard_allocator().alloc(mf, size, alignment); } virtual void free(void* ptr, size_t size) override { standard_allocator().free(ptr, size); } virtual void free(void* ptr) override { standard_allocator().free(ptr); } virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override { return standard_allocator().object_memory_size_in_allocator(obj); } size_t allocated_bytes() const { return _allocated_bytes; } }; SEASTAR_THREAD_TEST_CASE(test_external_memory_usage) { measuring_allocator alloc; auto s = simple_schema(); auto generate = [&s] { size_t data_size = 0; auto m = mutation(s.schema(), s.make_pkey("pk")); auto row_count = tests::random::get_int(1, 16); auto ck_values = std::set(); for (auto i = 0; i < row_count; i++) { auto ck_value = to_hex(tests::random::get_bytes(tests::random::get_int(1023) + 1)); if (!ck_values.insert(ck_value).second) { // This clustering key was already added. Retry. --i; continue; } data_size += ck_value.size(); auto ck = s.make_ckey(ck_value); auto value = to_hex(tests::random::get_bytes(tests::random::get_int(128 * 1024))); data_size += value.size(); s.add_row(m, ck, value); } return std::pair(std::move(m), data_size); }; for (auto i = 0; i < 16; i++) { auto m_and_size = generate(); auto&& m = m_and_size.first; auto&& size = m_and_size.second; with_allocator(alloc, [&] { auto before = alloc.allocated_bytes(); auto m2 = mutation_partition(*m.schema(), m.partition()); auto after = alloc.allocated_bytes(); BOOST_CHECK_EQUAL(m.partition().external_memory_usage(*s.schema()), m2.external_memory_usage(*s.schema())); BOOST_CHECK_GE(m.partition().external_memory_usage(*s.schema()), size); BOOST_CHECK_EQUAL(m.partition().external_memory_usage(*s.schema()), after - before); }); } } SEASTAR_THREAD_TEST_CASE(test_external_memory_usage_v2) { measuring_allocator alloc; auto s = simple_schema(); auto generate = [&s] { size_t data_size = 0; auto m = mutation(s.schema(), s.make_pkey("pk")); auto row_count = tests::random::get_int(1, 16); auto ck_values = std::set(); for (auto i = 0; i < row_count; i++) { auto ck_value = to_hex(tests::random::get_bytes(tests::random::get_int(1023) + 1)); if (!ck_values.insert(ck_value).second) { // This clustering key was already added. Retry. --i; continue; } data_size += ck_value.size(); auto ck = s.make_ckey(ck_value); auto value = to_hex(tests::random::get_bytes(tests::random::get_int(128 * 1024))); data_size += value.size(); s.add_row(m, ck, value); } return std::pair(std::move(m), data_size); }; for (auto i = 0; i < 16; i++) { auto m_and_size = generate(); auto&& m = m_and_size.first; auto&& size = m_and_size.second; mutation_partition_v2 m_v2(*m.schema(), m.partition()); with_allocator(alloc, [&] { auto before = alloc.allocated_bytes(); auto m2_v2 = mutation_partition_v2(*m.schema(), m_v2); auto after = alloc.allocated_bytes(); BOOST_CHECK_EQUAL(m_v2.external_memory_usage(*s.schema()), m2_v2.external_memory_usage(*s.schema())); BOOST_CHECK_GE(m_v2.external_memory_usage(*s.schema()), size); BOOST_CHECK_EQUAL(m_v2.external_memory_usage(*s.schema()), after - before); }); } } SEASTAR_THREAD_TEST_CASE(test_cell_equals) { auto now = gc_clock::now(); auto ttl = gc_clock::duration(0); auto c1 = atomic_cell_or_collection(atomic_cell::make_live(*bytes_type, 1, bytes(1, 'a'), now, ttl)); auto c2 = atomic_cell_or_collection(atomic_cell::make_dead(1, now)); BOOST_REQUIRE(!c1.equals(*bytes_type, c2)); BOOST_REQUIRE(!c2.equals(*bytes_type, c1)); auto c3 = atomic_cell_or_collection(atomic_cell::make_live_counter_update(1, 2)); auto c4 = atomic_cell_or_collection(atomic_cell::make_live(*bytes_type, 1, bytes(1, 'a'))); BOOST_REQUIRE(!c3.equals(*bytes_type, c4)); BOOST_REQUIRE(!c4.equals(*bytes_type, c3)); BOOST_REQUIRE(!c1.equals(*bytes_type, c4)); BOOST_REQUIRE(!c4.equals(*bytes_type, c1)); } // Global to avoid elimination by the compiler; see below for use thread_local data_type force_type_thread_local_init_evaluation [[gnu::used]]; SEASTAR_THREAD_TEST_CASE(test_cell_external_memory_usage) { measuring_allocator alloc; // Force evaluation of int32_type and all the other types. This is so // the compiler doesn't reorder it into the with_allocator section, below, // and any managed_bytes instances creates during that operation interfere // with the measurements. force_type_thread_local_init_evaluation = int32_type; // optimization barrier std::atomic_signal_fence(std::memory_order_seq_cst); auto test_live_atomic_cell = [&] (data_type dt, bytes_view bv) { with_allocator(alloc, [&] { auto before = alloc.allocated_bytes(); auto ac = atomic_cell_or_collection(atomic_cell::make_live(*dt, 1, bv)); auto after = alloc.allocated_bytes(); BOOST_CHECK_EQUAL(ac.external_memory_usage(*dt), after - before); }); }; test_live_atomic_cell(int32_type, { }); test_live_atomic_cell(int32_type, int32_type->decompose(int32_t(1))); test_live_atomic_cell(bytes_type, { }); test_live_atomic_cell(bytes_type, bytes(1, 'a')); test_live_atomic_cell(bytes_type, bytes(16, 'a')); test_live_atomic_cell(bytes_type, bytes(32, 'a')); test_live_atomic_cell(bytes_type, bytes(1024, 'a')); test_live_atomic_cell(bytes_type, bytes(64 * 1024 - 1, 'a')); test_live_atomic_cell(bytes_type, bytes(64 * 1024, 'a')); test_live_atomic_cell(bytes_type, bytes(64 * 1024 + 1, 'a')); test_live_atomic_cell(bytes_type, bytes(1024 * 1024, 'a')); auto test_collection = [&] (bytes_view bv) { auto collection_type = map_type_impl::get_instance(int32_type, bytes_type, true); auto m = make_collection_mutation({ }, int32_type->decompose(0), make_collection_member(bytes_type, data_value(bytes(bv)))); auto cell = atomic_cell_or_collection(m.serialize(*collection_type)); with_allocator(alloc, [&] { auto before = alloc.allocated_bytes(); auto cell2 = cell.copy(*collection_type); auto after = alloc.allocated_bytes(); BOOST_CHECK_EQUAL(cell2.external_memory_usage(*collection_type), cell.external_memory_usage(*collection_type)); BOOST_CHECK_EQUAL(cell2.external_memory_usage(*collection_type), after - before); }); }; test_collection({ }); test_collection(bytes(1, 'a')); test_collection(bytes(16, 'a')); test_collection(bytes(32, 'a')); test_collection(bytes(1024, 'a')); test_collection(bytes(64 * 1024 - 1, 'a')); test_collection(bytes(64 * 1024, 'a')); test_collection(bytes(64 * 1024 + 1, 'a')); test_collection(bytes(1024 * 1024, 'a')); } // external_memory_usage() must be invariant to the merging order, // so that accounting of a clustering_row produced by partition_snapshot_flat_reader // doesn't give a greater result than what is used by the memtable region, possibly // after all MVCC versions are merged. // Overaccounting leads to assertion failure in ~flush_memory_accounter. SEASTAR_THREAD_TEST_CASE(test_row_size_is_immune_to_application_order) { auto s = schema_builder("ks", "cf") .with_column("pk", utf8_type, column_kind::partition_key) .with_column("v1", utf8_type) .with_column("v2", utf8_type) .with_column("v3", utf8_type) .with_column("v4", utf8_type) .with_column("v5", utf8_type) .with_column("v6", utf8_type) .with_column("v7", utf8_type) .with_column("v8", utf8_type) .with_column("v9", utf8_type) .build(); auto value = utf8_type->decompose(data_value("value")); row r1; r1.append_cell(7, make_atomic_cell(value)); row r2; r2.append_cell(8, make_atomic_cell(value)); auto size1 = [&] { auto r3 = row(*s, column_kind::regular_column, r1); r3.apply(*s, column_kind::regular_column, r2); return r3.external_memory_usage(*s, column_kind::regular_column); }(); auto size2 = [&] { auto r3 = row(*s, column_kind::regular_column, r2); r3.apply(*s, column_kind::regular_column, r1); return r3.external_memory_usage(*s, column_kind::regular_column); }(); BOOST_REQUIRE_EQUAL(size1, size2); } SEASTAR_THREAD_TEST_CASE(test_schema_changes) { for_each_schema_change([] (schema_ptr base, const utils::chunked_vector& base_mutations, schema_ptr changed, const utils::chunked_vector& changed_mutations) { BOOST_REQUIRE_EQUAL(base_mutations.size(), changed_mutations.size()); for (auto bc : boost::range::combine(base_mutations, changed_mutations)) { auto b = boost::get<0>(bc); b.upgrade(changed); BOOST_CHECK_EQUAL(b, boost::get<1>(bc)); } }); } SEASTAR_THREAD_TEST_CASE(test_collection_compaction) { auto key = to_bytes("key"); auto value = data_value(to_bytes("value")); // No collection tombstone, row tombstone covers all cells auto cmut = make_collection_mutation({}, key, make_collection_member(bytes_type, value)); auto row_tomb = row_tombstone(tombstone { 1, gc_clock::time_point() }); auto res = cmut.compact_and_expire(0, row_tomb, gc_clock::time_point(), always_gc, gc_clock::time_point()); BOOST_CHECK(!res.is_live()); BOOST_CHECK(!cmut.tomb); BOOST_CHECK(cmut.cells.empty()); // No collection tombstone, row tombstone doesn't cover anything cmut = make_collection_mutation({}, key, make_collection_member(bytes_type, value)); row_tomb = row_tombstone(tombstone { -1, gc_clock::time_point() }); res = cmut.compact_and_expire(0, row_tomb, gc_clock::time_point(), always_gc, gc_clock::time_point()); BOOST_CHECK(res.is_live()); BOOST_CHECK(!cmut.tomb); BOOST_CHECK_EQUAL(cmut.cells.size(), 1); // Collection tombstone covers everything cmut = make_collection_mutation(tombstone { 2, gc_clock::time_point() }, key, make_collection_member(bytes_type, value)); row_tomb = row_tombstone(tombstone { 1, gc_clock::time_point() }); res = cmut.compact_and_expire(0, row_tomb, gc_clock::time_point(), always_gc, gc_clock::time_point()); BOOST_CHECK(!res.is_live()); BOOST_CHECK(cmut.tomb); BOOST_CHECK_EQUAL(cmut.tomb.timestamp, 2); BOOST_CHECK(cmut.cells.empty()); // Collection tombstone covered by row tombstone cmut = make_collection_mutation(tombstone { 2, gc_clock::time_point() }, key, make_collection_member(bytes_type, value)); row_tomb = row_tombstone(tombstone { 3, gc_clock::time_point() }); res = cmut.compact_and_expire(0, row_tomb, gc_clock::time_point(), always_gc, gc_clock::time_point()); BOOST_CHECK(!res.is_live()); BOOST_CHECK(!cmut.tomb); BOOST_CHECK(cmut.cells.empty()); } namespace { template class basic_compacted_fragments_consumer_base { const schema& _schema; gc_clock::time_point _query_time; gc_clock::time_point _gc_before; max_purgeable_fn _get_max_purgeable; max_purgeable _max_purgeable; utils::chunked_vector _mutations; mutation_rebuilder_v2 _mutation; private: bool can_gc(tombstone t) { if (!t) { return true; } return t.timestamp < _max_purgeable.timestamp(); } bool is_tombstone_purgeable(const tombstone& t) { return t.deletion_time < _gc_before && can_gc(t); } bool is_tombstone_purgeable(const row_tombstone& t) { return t.max_deletion_time() < _gc_before && can_gc(t.tomb()); } bool is_marker_purgeable(const row_marker& marker, tombstone tomb) { return marker.timestamp() <= tomb.timestamp || (marker.is_dead(_query_time) && marker.expiry() < _gc_before && can_gc(tombstone(marker.timestamp(), marker.expiry()))); } bool is_cell_purgeable(const atomic_cell_view& cell) { return (cell.has_expired(_query_time) || !cell.is_live()) && cell.deletion_time() < _gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time())); } void examine_cell(const column_definition& cdef, const atomic_cell_or_collection& cell_or_collection, const row_tombstone& tomb) { if (cdef.type->is_atomic()) { auto cell = cell_or_collection.as_atomic_cell(cdef); if constexpr (OnlyPurged) { BOOST_REQUIRE(!cell.is_covered_by(tomb.tomb(), cdef.is_counter())); } BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged); } else if (cdef.type->is_collection() || cdef.type->is_user_type()) { auto cell = cell_or_collection.as_collection_mutation(); cell.with_deserialized(*cdef.type, [&] (collection_mutation_view_description m_view) { BOOST_REQUIRE(m_view.tomb.timestamp == api::missing_timestamp || m_view.tomb.timestamp > tomb.tomb().timestamp || is_tombstone_purgeable(m_view.tomb) == OnlyPurged); auto t = m_view.tomb; t.apply(tomb.tomb()); for (const auto& [key, cell] : m_view.cells) { if constexpr (OnlyPurged) { BOOST_REQUIRE(!cell.is_covered_by(t, false)); } BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged); } }); } else { throw std::runtime_error(fmt::format("Cannot check cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name())); } } void examine_row(column_kind kind, const row& r, const row_tombstone& tomb) { r.for_each_cell([&, this, kind] (column_id id, const atomic_cell_or_collection& cell) { examine_cell(_schema.column_at(kind, id), cell, tomb); }); } public: basic_compacted_fragments_consumer_base(const schema& schema, gc_clock::time_point query_time, max_purgeable_fn get_max_purgeable) : _schema(schema) , _query_time(query_time) , _gc_before(saturating_subtract(query_time, _schema.gc_grace_seconds())) , _get_max_purgeable(std::move(get_max_purgeable)) , _mutation(_schema.shared_from_this()) { } void consume_new_partition(const dht::decorated_key& dk) { _max_purgeable = _get_max_purgeable(dk, is_shadowable::no); _mutation.consume_new_partition(dk); } void consume(tombstone t) { BOOST_REQUIRE(t); BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(t), OnlyPurged); _mutation.consume(t); } stop_iteration consume(static_row&& sr, tombstone tomb, bool is_live) { BOOST_REQUIRE(!OnlyPurged || !is_live); examine_row(column_kind::static_column, sr.cells(), row_tombstone(tomb)); _mutation.consume(std::move(sr)); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr, row_tombstone tomb, bool is_live) { BOOST_REQUIRE(!OnlyPurged || !is_live); if (!cr.marker().is_missing()) { BOOST_REQUIRE_EQUAL(is_marker_purgeable(cr.marker(), tomb.tomb()), OnlyPurged); } if (cr.tomb().regular()) { BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(cr.tomb()), OnlyPurged); } examine_row(column_kind::regular_column, cr.cells(), tomb); _mutation.consume(std::move(cr)); return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rtc) { if (OnlyPurged) { BOOST_REQUIRE(is_tombstone_purgeable(rtc.tombstone())); } else { BOOST_REQUIRE(!rtc.tombstone() || !is_tombstone_purgeable(rtc.tombstone())); } _mutation.consume(std::move(rtc)); return stop_iteration::no; } stop_iteration consume_end_of_partition() { _mutation.consume_end_of_partition(); auto mut_opt = _mutation.consume_end_of_stream(); BOOST_REQUIRE(mut_opt); _mutations.emplace_back(std::move(*mut_opt)); return stop_iteration::no; } utils::chunked_vector consume_end_of_stream() { return _mutations; } }; using survived_compacted_fragments_consumer = basic_compacted_fragments_consumer_base; using purged_compacted_fragments_consumer = basic_compacted_fragments_consumer_base; void run_compaction_data_stream_split_test(const schema& schema, reader_permit permit, gc_clock::time_point query_time, utils::chunked_vector mutations) { for (auto& mut : mutations) { mut.partition().compact_for_compaction(schema, never_gc, mut.decorated_key(), query_time, tombstone_gc_state(nullptr)); } auto reader = make_mutation_reader_from_mutations(schema.shared_from_this(), std::move(permit), mutations); auto close_reader = deferred_close(reader); auto get_max_purgeable = can_always_purge; auto consumer = compact_for_compaction( schema, query_time, get_max_purgeable, tombstone_gc_state(nullptr), survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable), purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable)); auto [survived_muts, purged_muts] = reader.consume(std::move(consumer)).get(); auto survived_muts_it = survived_muts.begin(); const auto survived_muts_end = survived_muts.end(); auto purged_muts_it = purged_muts.begin(); const auto purged_muts_end = purged_muts.end(); for (const auto& expected_mut : mutations) { const auto& dkey = expected_mut.decorated_key(); auto actual_mut = mutation(schema.shared_from_this(), dkey); if (survived_muts_it != survived_muts_end && survived_muts_it->decorated_key().equal(schema, dkey)) { actual_mut.apply(*survived_muts_it++); } if (purged_muts_it != purged_muts_end && purged_muts_it->decorated_key().equal(schema, dkey)) { actual_mut.apply(*purged_muts_it++); } BOOST_REQUIRE_EQUAL(actual_mut, expected_mut); } BOOST_REQUIRE(survived_muts_it == survived_muts_end); BOOST_REQUIRE(purged_muts_it == purged_muts_end); } } // anonymous namespace SEASTAR_THREAD_TEST_CASE(test_compaction_data_stream_split) { tests::reader_concurrency_semaphore_wrapper semaphore; auto spec = tests::make_random_schema_specification(get_name()); tests::random_schema random_schema(tests::random::get_int(), *spec); const auto& schema = *random_schema.schema(); testlog.info("Random schema:\n{}", random_schema.cql()); const auto query_time = gc_clock::now(); const auto ttl = gc_clock::duration{schema.gc_grace_seconds().count() * 4}; const std::uniform_int_distribution partition_count_dist = std::uniform_int_distribution(16, 128); const std::uniform_int_distribution clustering_row_count_dist = std::uniform_int_distribution(2, 32); // Random data { testlog.info("Random data"); const auto ts_gen = tests::default_timestamp_generator(); // Half of the tombstones are gcable. // Half of the cells are expiring. Half of those is expired. const auto exp_gen = [query_time, ttl, schema] (std::mt19937& engine, tests::timestamp_destination destination) -> std::optional { const auto is_tombstone = (destination == tests::timestamp_destination::partition_tombstone || destination == tests::timestamp_destination::row_tombstone || destination == tests::timestamp_destination::range_tombstone || destination == tests::timestamp_destination::collection_tombstone); if (!is_tombstone && tests::random::get_bool(engine)) { return std::nullopt; } const auto offset = (is_tombstone ? schema.gc_grace_seconds().count() : ttl.count()) / 2; auto offset_dist = std::uniform_int_distribution(-offset, offset); return tests::expiry_info{ttl, query_time + gc_clock::duration{offset_dist(engine)}}; }; const auto mutations = tests::generate_random_mutations(random_schema, ts_gen, exp_gen, partition_count_dist, clustering_row_count_dist).get(); run_compaction_data_stream_split_test(schema, semaphore.make_permit(), query_time, mutations); } // All data is purged { testlog.info("All data is purged"); const auto ts_gen = [] (std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) { static const api::timestamp_type tomb_ts_min = 10000; static const api::timestamp_type tomb_ts_max = 99999; static const api::timestamp_type collection_tomb_ts_min = 100; static const api::timestamp_type collection_tomb_ts_max = 999; static const api::timestamp_type other_ts_min = 1000; static const api::timestamp_type other_ts_max = 9999; if (destination == tests::timestamp_destination::partition_tombstone || destination == tests::timestamp_destination::row_tombstone || destination == tests::timestamp_destination::range_tombstone) { SCYLLA_ASSERT(min_timestamp < tomb_ts_max); return tests::random::get_int(tomb_ts_min, tomb_ts_max, engine); } else if (destination == tests::timestamp_destination::collection_tombstone) { SCYLLA_ASSERT(min_timestamp < collection_tomb_ts_max); return tests::random::get_int(collection_tomb_ts_min, collection_tomb_ts_max, engine); } else { SCYLLA_ASSERT(min_timestamp < other_ts_max); return tests::random::get_int(other_ts_min, other_ts_max, engine); } }; const auto all_purged_exp_gen = [query_time, ttl, schema] (std::mt19937& engine, tests::timestamp_destination destination) -> std::optional { const auto offset = std::max(ttl.count(), schema.gc_grace_seconds().count()); auto offset_dist = std::uniform_int_distribution(-offset * 2, -offset); return tests::expiry_info{ttl, query_time + gc_clock::duration{offset_dist(engine)}}; }; const auto mutations = tests::generate_random_mutations(random_schema, ts_gen, all_purged_exp_gen, partition_count_dist, clustering_row_count_dist).get(); run_compaction_data_stream_split_test(schema, semaphore.make_permit(), query_time, mutations); } // No data is purged { testlog.info("No data is purged"); const auto ts_gen = [] (std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) { static const api::timestamp_type tomb_ts_min = 100; static const api::timestamp_type tomb_ts_max = 999; static const api::timestamp_type collection_tomb_ts_min = 1000; static const api::timestamp_type collection_tomb_ts_max = 9999; static const api::timestamp_type other_ts_min = 10000; static const api::timestamp_type other_ts_max = 99999; if (destination == tests::timestamp_destination::partition_tombstone || destination == tests::timestamp_destination::row_tombstone || destination == tests::timestamp_destination::range_tombstone) { SCYLLA_ASSERT(min_timestamp < tomb_ts_max); return tests::random::get_int(tomb_ts_min, tomb_ts_max, engine); } else if (destination == tests::timestamp_destination::collection_tombstone) { SCYLLA_ASSERT(min_timestamp < tomb_ts_max); return tests::random::get_int(collection_tomb_ts_min, collection_tomb_ts_max, engine); } else { SCYLLA_ASSERT(min_timestamp < other_ts_max); return tests::random::get_int(other_ts_min, other_ts_max, engine); } }; const auto mutations = tests::generate_random_mutations(random_schema, ts_gen, tests::no_expiry_expiry_generator(), partition_count_dist, clustering_row_count_dist).get(); run_compaction_data_stream_split_test(schema, semaphore.make_permit(), query_time, mutations); } } // Reproducer for #4567: "appending_hash ignores cells after first null" SEASTAR_THREAD_TEST_CASE(test_appending_hash_row_4567) { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("r1", bytes_type) .with_column("r2", bytes_type) .with_column("r3", bytes_type) .build(); auto r1 = row(); r1.append_cell(0, atomic_cell::make_live(*bytes_type, 1, bytes{})); r1.append_cell(2, atomic_cell::make_live(*bytes_type, 1, to_bytes("aaa"))); auto r2 = row(); r2.append_cell(0, atomic_cell::make_live(*bytes_type, 1, bytes{})); r2.append_cell(2, atomic_cell::make_live(*bytes_type, 1, to_bytes("bbb"))); auto r3 = row(); r3.append_cell(0, atomic_cell::make_live(*bytes_type, 1, bytes{})); r3.append_cell(1, atomic_cell::make_live(*bytes_type, 1, to_bytes("bbb"))); BOOST_CHECK(!r1.equal(column_kind::regular_column, *s, r2, *s)); auto compute_hash = [&] (const row& r, const query::column_id_vector& columns) { auto hasher = xx_hasher{}; max_timestamp ts; appending_hash{}(hasher, r, *s, column_kind::regular_column, columns, ts); return hasher.finalize_uint64(); }; BOOST_CHECK_EQUAL(compute_hash(r1, { 1 }), compute_hash(r2, { 1 })); BOOST_CHECK_EQUAL(compute_hash(r1, { 0, 1 }), compute_hash(r2, { 0, 1 })); BOOST_CHECK_NE(compute_hash(r1, { 0, 2 }), compute_hash(r2, { 0, 2 })); BOOST_CHECK_NE(compute_hash(r1, { 0, 1, 2 }), compute_hash(r2, { 0, 1, 2 })); // Additional test for making sure that {"", NULL, "bbb"} is not equal to {"", "bbb", NULL} // due to ignoring NULL in a hash BOOST_CHECK_NE(compute_hash(r2, { 0, 1, 2 }), compute_hash(r3, { 0, 1, 2 })); } SEASTAR_THREAD_TEST_CASE(test_mutation_consume) { std::mt19937 engine(tests::random::get_int()); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); auto rnd_schema_spec = tests::make_random_schema_specification( get_name(), std::uniform_int_distribution(1, 2), std::uniform_int_distribution(1, 8)); auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); auto forward_schema = rnd_schema.schema(); auto reverse_schema = forward_schema->make_reversed(); const auto muts = tests::generate_random_mutations( rnd_schema, tests::default_timestamp_generator(), tests::no_expiry_expiry_generator(), std::uniform_int_distribution(1, 10)).get(); testlog.info("Forward"); { for (const auto& mut : muts) { mutation_rebuilder_v2 rebuilder(forward_schema); auto rebuilt_mut = *mutation(mut).consume(rebuilder, consume_in_reverse::no).result; assert_that(std::move(rebuilt_mut)).is_equal_to(mut); } } testlog.info("Reverse"); { for (const auto& mut : muts) { mutation_rebuilder_v2 rebuilder(reverse_schema); auto rebuilt_mut = *mutation(mut).consume(rebuilder, consume_in_reverse::yes).result; assert_that(reverse(std::move(rebuilt_mut))).is_equal_to(mut); } } } SEASTAR_THREAD_TEST_CASE(test_mutation_consume_position_monotonicity) { std::mt19937 engine(tests::random::get_int()); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); auto rnd_schema_spec = tests::make_random_schema_specification( get_name(), std::uniform_int_distribution(1, 2), std::uniform_int_distribution(1, 8)); auto rnd_schema = tests::random_schema(engine(), *rnd_schema_spec); auto forward_schema = rnd_schema.schema(); auto reverse_schema = forward_schema->make_reversed(); const auto muts = tests::generate_random_mutations( rnd_schema, tests::default_timestamp_generator(), tests::no_expiry_expiry_generator(), std::uniform_int_distribution(1, 1)).get(); BOOST_TEST_MESSAGE("Forward"); { auto mut = muts.front(); validating_consumer consumer(*forward_schema); std::move(mut).consume(consumer, consume_in_reverse::no); } BOOST_TEST_MESSAGE("Reverse"); { auto mut = muts.front(); validating_consumer consumer(*reverse_schema); std::move(mut).consume(consumer, consume_in_reverse::yes); } BOOST_TEST_MESSAGE("Forward gently"); { auto mut = muts.front(); validating_consumer consumer(*forward_schema); std::move(mut).consume_gently(consumer, consume_in_reverse::no).get(); } BOOST_TEST_MESSAGE("Reverse gently"); { auto mut = muts.front(); validating_consumer consumer(*reverse_schema); std::move(mut).consume_gently(consumer, consume_in_reverse::yes).get(); } } // Tests mutation_rebuilder_v2::flush(). SEASTAR_THREAD_TEST_CASE(test_mutation_rebuilder_v2_flush) { simple_schema ss; schema_ptr s = ss.schema(); auto pk = ss.make_pkey(); tests::reader_concurrency_semaphore_wrapper semaphore; auto p = semaphore.make_permit(); // Main idea of the test: we prepare a stream with all "interesting" // situations (with respect to positions), for example: // - RTC right before and after a key // - Overlapping RTCs // - Keys without a RTC in between, but with an active RTC from before // - Keys without a RTC in between, but without an active RTC from before // etc. // // Then we pass this stream through mutation_rebuilder_v2 with two flushes // in between (on all possible positions), and check that the result is // the same as without flushes. auto frags = std::vector(); frags.emplace_back(*s, p, partition_start(pk, {})); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_all_clustered_rows(), ss.new_tombstone())); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(0))); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(1)), ss.new_tombstone())); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(1))); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone())); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(1)), ss.new_tombstone())); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(2))); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(3)), tombstone{})); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(3))); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(4))); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(4)), ss.new_tombstone())); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(5)), ss.new_tombstone())); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(5))); frags.emplace_back(*s, p, clustering_row(ss.make_ckey(6))); frags.emplace_back(*s, p, range_tombstone_change(position_in_partition::after_all_clustered_rows(), tombstone{})); frags.emplace_back(*s, p, partition_end()); mutation_rebuilder_v2 rebuilder_without_flush(s); for (unsigned i = 0; i < frags.size(); ++i) { rebuilder_without_flush.consume(mutation_fragment_v2(*s, p, frags[i])); } auto m_expected = std::move(*rebuilder_without_flush.consume_end_of_stream()); // We do two flushes (we test all possible combinations of their positions, // including no flush). // This is to test that the first flush doesn't break the rebuilder in // a way that prevents another flush. for (unsigned first_flush = 0; first_flush < frags.size(); ++first_flush) { for (unsigned second_flush = first_flush; second_flush < frags.size(); ++second_flush) { mutation_rebuilder_v2 rebuilder(s); auto m1 = mutation(s, pk); // Contents of flush 1. auto m2 = mutation(s, pk); // Contents of flush 2. auto m3 = mutation(s, pk); // Contents of final flush. for (unsigned i = 0; i < frags.size(); ++i) { rebuilder.consume(mutation_fragment_v2(*s, p, frags[i])); if (i == first_flush) { m1 = rebuilder.flush(); } if (i == second_flush) { m2 = rebuilder.flush(); } } m3 = std::move(*rebuilder.consume_end_of_stream()); assert_that(m1 + m2 + m3).is_equal_to(m_expected); } } } SEASTAR_TEST_CASE(mutation_with_dummy_clustering_row_is_consumed_monotonically) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; schema_ptr s = schema_builder{"ks", "cf"} .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck1", bytes_type, column_kind::clustering_key) .build(); clustering_key ck_e{std::vector{}}; clustering_key ck_0{{serialized((int32_t)0)}}; clustering_key ck_1{{serialized((int32_t)1)}}; clustering_key ck_2{{serialized((int32_t)2)}}; clustering_key ck_3{{serialized((int32_t)3)}}; mutation m{s, dht::decorate_key(*s, partition_key{{"pk"}})}; reader_permit p = semaphore.make_permit(); api::timestamp_type ts = api::min_timestamp; gc_clock::time_point tp{}; m.partition().apply(tombstone{api::min_timestamp + 2, gc_clock::time_point{}}); range_tombstone rt1{ck_e, bound_kind::incl_start, ck_1, bound_kind::incl_end, tombstone{ts + 0, tp}}; range_tombstone rt2{ck_1, bound_kind::excl_start, ck_e, bound_kind::incl_end, tombstone{ts + 1, tp}}; clustering_row cr1{*s, rows_entry{*s, position_in_partition{partition_region::clustered, bound_weight::equal, ck_0}, is_dummy::no, is_continuous::no}}; clustering_row cr2{*s, rows_entry{*s, position_in_partition{partition_region::clustered, bound_weight::equal, ck_2}, is_dummy::no, is_continuous::no}}; clustering_row cr3{*s, rows_entry{*s, position_in_partition{partition_region::clustered, bound_weight::equal, ck_3}, is_dummy::no, is_continuous::no}}; m.apply(mutation_fragment(*s, p, std::move(rt1))); m.apply(mutation_fragment(*s, p, std::move(rt2))); m.apply(mutation_fragment(*s, p, std::move(cr1))); m.apply(mutation_fragment(*s, p, std::move(cr2))); m.apply(mutation_fragment(*s, p, std::move(cr3))); m.partition().ensure_last_dummy(*s); mutation m1{m}; { schema_ptr reverse_schema = s->make_reversed(); validating_consumer consumer{*reverse_schema}; std::move(m).consume(consumer, consume_in_reverse::yes); } { validating_consumer consumer{*s}; std::move(m1).consume(consumer, consume_in_reverse::no); } }); } SEASTAR_THREAD_TEST_CASE(test_position_in_partition_reversal) { using p_i_p = position_in_partition; simple_schema ss; auto pk = ss.make_pkey(); position_in_partition::tri_compare fwd_cmp(*ss.schema()); auto rev_s = ss.schema()->make_reversed(); position_in_partition::tri_compare rev_cmp(*rev_s); BOOST_REQUIRE(fwd_cmp(p_i_p::before_key(ss.make_ckey(1)), p_i_p::after_key(*ss.schema(), ss.make_ckey(1))) < 0); BOOST_REQUIRE(fwd_cmp(p_i_p::before_key(ss.make_ckey(1)), ss.make_ckey(0)) > 0); BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)), p_i_p::before_key(ss.make_ckey(1))) > 0); BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)), ss.make_ckey(2)) < 0); BOOST_REQUIRE(rev_cmp(p_i_p::before_key(ss.make_ckey(1)).reversed(), p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed()) > 0); BOOST_REQUIRE(rev_cmp(p_i_p::before_key(ss.make_ckey(1)).reversed(), ss.make_ckey(0)) < 0); BOOST_REQUIRE(rev_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed(), p_i_p::before_key(ss.make_ckey(1)).reversed()) < 0); BOOST_REQUIRE(rev_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed(), ss.make_ckey(2)) > 0); // Test reversal-invariant positions BOOST_REQUIRE(rev_cmp(p_i_p::for_partition_start().reversed(), p_i_p::for_partition_start()) == 0); BOOST_REQUIRE(rev_cmp(p_i_p(p_i_p::for_partition_end()).reversed(), p_i_p(p_i_p::for_partition_end())) == 0); BOOST_REQUIRE(rev_cmp(p_i_p::for_static_row().reversed(), p_i_p::for_static_row()) == 0); } SEASTAR_THREAD_TEST_CASE(test_position_in_partition_order_with_prefix_keys) { using pip = position_in_partition; using pipv = position_in_partition_view; schema_ptr s = schema_builder("ks", "cf") .with_column("pk", utf8_type, column_kind::partition_key) .with_column("ck1", utf8_type, column_kind::clustering_key) .with_column("ck2", utf8_type, column_kind::clustering_key) .with_column("v", utf8_type) .build(); position_in_partition::tri_compare cmp(*s); auto make_ck = [&] (sstring ck1, std::optional ck2 = {}) { if (ck2) { return clustering_key::from_exploded(*s, {serialized(ck1), serialized(*ck2)}); } return clustering_key::from_exploded(*s, {serialized(ck1)}); }; auto prefix_a = make_ck("a"); auto full_a = prefix_a; clustering_key::make_full(*s, full_a); auto full_a_a = make_ck("a", "a"); auto holder = pipv::after_key(*s, prefix_a); BOOST_REQUIRE(holder.holder); { auto holder2 = std::move(holder); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), holder2.view) == 0); BOOST_REQUIRE(holder2.holder); } holder = pipv::after_key(*s, full_a_a); BOOST_REQUIRE(!holder.holder); { auto holder2 = std::move(holder); BOOST_REQUIRE(cmp(pip::after_key(*s, full_a_a), holder2.view) == 0); BOOST_REQUIRE(!holder2.holder); } BOOST_REQUIRE(cmp(pip::before_key(prefix_a), prefix_a) < 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), prefix_a) > 0); BOOST_REQUIRE(cmp(prefix_a, full_a_a) < 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), prefix_a) > 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), full_a_a) < 0); BOOST_REQUIRE(cmp(pipv::after_all_prefixed(prefix_a), full_a_a) > 0); BOOST_REQUIRE(cmp(prefix_a, full_a) < 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), full_a) < 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::before_key(full_a)) <= 0); // before_key()/after_key() applied to dummy does not change the position. BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::after_key(*s, pip::after_key(*s, prefix_a))) == 0); BOOST_REQUIRE(cmp(pip::before_key(prefix_a), pip::before_key(pip::before_key(prefix_a))) == 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::after_key(*s, pip::after_key(*s, prefix_a))) == 0); BOOST_REQUIRE(cmp(pip::for_key(prefix_a), pip::for_key(full_a_a)) < 0); BOOST_REQUIRE(cmp(pip::for_key(full_a_a), pip::for_key(prefix_a)) > 0); BOOST_REQUIRE(cmp(pip::for_key(prefix_a), pip::for_key(full_a)) < 0); BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::for_key(prefix_a)) > 0); // Check reversed schema auto rev_s = s->make_reversed(); position_in_partition::tri_compare rev_cmp(*rev_s); BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(), pip::for_key(full_a_a).reversed()) > 0); BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(), pip::for_key(full_a).reversed()) > 0); BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(), pip::for_key(full_a_a).reversed()) > 0); BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(), pip::for_key(full_a).reversed()) > 0); BOOST_REQUIRE(rev_cmp(pipv::after_all_prefixed(prefix_a).reversed(), pip::for_key(full_a_a).reversed()) < 0); BOOST_REQUIRE(rev_cmp(pipv::after_all_prefixed(prefix_a).reversed(), pip::for_key(full_a).reversed()) < 0); BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(), pip::for_key(full_a_a).reversed()) > 0); BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(), pip::for_key(full_a).reversed()) > 0); // FIXME: Below don't work due to https://github.com/scylladb/scylladb/issues/12258 // // BOOST_REQUIRE(rev_cmp(pip::for_key(prefix_a).reversed(), // pip::for_key(full_a_a).reversed()) > 0); // // BOOST_REQUIRE(rev_cmp(pip::for_key(full_a_a).reversed(), // pip::for_key(prefix_a).reversed()) < 0); // // BOOST_REQUIRE(rev_cmp(pip::for_key(prefix_a).reversed(), // pip::for_key(full_a).reversed()) > 0); // // BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(), // pip::for_key(prefix_a).reversed()) < 0); } SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { simple_schema ss; auto pk = ss.make_pkey(); auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); const auto marker_ts = ss.new_timestamp(); const auto tomb_ts = ss.new_timestamp(); const auto row_ts = ss.new_timestamp(); auto make_frags = [&] { std::deque frags; frags.emplace_back(*s, permit, partition_start(pk, {})); const auto& v_def = *s->get_column_definition(to_bytes("v")); frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(0)), tombstone{tomb_ts, {}})); for (uint32_t ck = 0; ck < 10; ++ck) { auto row = clustering_row(ss.make_ckey(ck)); row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v"))); row.marker() = row_marker(marker_ts); frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row))); } frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{})); frags.emplace_back(*s, permit, partition_end{}); return frags; }; auto restore_state = [&] (mutation_reader& reader, detached_compaction_state&& state) { if (auto rt_opt = state.current_tombstone) { reader.unpop_mutation_fragment(mutation_fragment_v2(*s, permit, std::move(*rt_opt))); } if (state.static_row) { reader.unpop_mutation_fragment(mutation_fragment_v2(*s, permit, std::move(*state.static_row))); } reader.unpop_mutation_fragment(mutation_fragment_v2(*s, permit, std::move(state.partition_start))); }; const auto query_time = gc_clock::now(); const auto max_rows = std::numeric_limits::max(); const auto max_partitions = std::numeric_limits::max(); mutation ref_mut(s, pk); { auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); auto mut_opt = reader.consume(mutation_rebuilder_v2(s)).get(); BOOST_REQUIRE(mut_opt); ref_mut = std::move(*mut_opt); ref_mut.partition().compact_for_query(*s, pk, query_time, {query::clustering_range::make_open_ended_both_sides()}, true, max_rows); } struct consumer { reader_permit permit; mutation& mut; const uint64_t row_limit; uint64_t rows = 0; mutation_rebuilder_v2 builder; consumer(reader_permit permit, mutation& mut, uint64_t row_limit, uint64_t rows = 0) : permit(std::move(permit)), mut(mut), row_limit(row_limit), rows(rows), builder(mut.schema()) { } void consume_new_partition(const dht::decorated_key& dk) { BOOST_REQUIRE(mut.decorated_key().equal(*mut.schema(), dk)); builder.consume_new_partition(dk); } void consume(const tombstone& t) { BOOST_REQUIRE_EQUAL(t, mut.partition().partition_tombstone()); builder.consume(t); } stop_iteration consume(static_row&& sr, tombstone, bool) { builder.consume(std::move(sr)); return stop_iteration(++rows >= row_limit); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { builder.consume(std::move(cr)); return stop_iteration(++rows >= row_limit); } stop_iteration consume(range_tombstone_change&& rtc) { builder.consume(std::move(rtc)); return stop_iteration(++rows >= row_limit); } stop_iteration consume_end_of_partition() { builder.consume_end_of_partition(); return stop_iteration::yes; } void consume_end_of_stream() { if (auto mut_opt = builder.consume_end_of_stream()) { mut += *mut_opt; } } }; testlog.info("non-paged v2"); { mutation res_mut(s, pk); auto c = compact_for_query(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr), consumer{permit, res_mut, max_rows}); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); reader.consume(std::move(c)).get(); BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } testlog.info("limited pages v2"); { mutation res_mut(s, pk); auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions, tombstone_gc_state(nullptr)); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { auto c = consumer{permit, res_mut, max_rows}; compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c); reader.consume(compact_for_query(compaction_state, std::move(c))).get(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } testlog.info("short pages v2"); { mutation res_mut(s, pk); auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr)); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { auto c = consumer{permit, res_mut, 2}; compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); reader.consume(compact_for_query(compaction_state, std::move(c))).get(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } testlog.info("limited pages - detach state v2"); { mutation res_mut(s, pk); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); std::optional detached_state; while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions, tombstone_gc_state(nullptr)); auto c = consumer{permit, res_mut, max_rows}; reader.consume(compact_for_query(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } testlog.info("short pages - detach state v2"); { mutation res_mut(s, pk); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); std::optional detached_state; while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { if (detached_state) { restore_state(reader, std::move(*detached_state)); } auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr)); auto c = consumer{permit, res_mut, 2}; reader.consume(compact_for_query(compaction_state, std::move(c))).get(); detached_state = std::move(*compaction_state).detach_state(); } BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } } SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) { simple_schema ss; auto pk = ss.make_pkey(); auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); const auto expiry_point = gc_clock::now() + std::chrono::days(10); const auto marker_ts = ss.new_timestamp(); const auto tomb_ts = ss.new_timestamp(); const auto row_ts = ss.new_timestamp(); const auto query_time = gc_clock::now(); const auto max_rows = std::numeric_limits::max(); const auto max_partitions = std::numeric_limits::max(); auto make_frags = [&] { std::deque frags; frags.emplace_back(*s, permit, partition_start(pk, {})); frags.emplace_back(*s, permit, ss.make_static_row_v2(permit, "static_row")); const auto& v_def = *s->get_column_definition(to_bytes("v")); frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::before_key(ss.make_ckey(0)), tombstone{tomb_ts, expiry_point})); for (uint32_t ck = 0; ck < 1; ++ck) { auto row = clustering_row(ss.make_ckey(ck)); row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, row_ts, serialized("v"))); row.marker() = row_marker(marker_ts); frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row))); } frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{})); frags.emplace_back(*s, permit, partition_end{}); return frags; }; struct consumer { uint64_t frags = 0; const uint64_t frag_limit; const bool final_stop; consumer(uint64_t stop_at, bool final_stop) : frag_limit(stop_at + 1), final_stop(final_stop) { } void consume_new_partition(const dht::decorated_key& dk) { } void consume(const tombstone& t) { } stop_iteration consume(static_row&& sr, tombstone, bool) { const auto ret = ++frags >= frag_limit; testlog.trace("consume(static_row) ret={}", ret); return stop_iteration(ret); } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { const auto ret = ++frags >= frag_limit; testlog.trace("consume(clustering_row) ret={}", ret); return stop_iteration(ret); } stop_iteration consume(range_tombstone_change&& rtc) { const auto ret = ++frags >= frag_limit; testlog.trace("consume(range_tombstone) ret={}", ret); return stop_iteration(ret); } stop_iteration consume_end_of_partition() { testlog.trace("consume_end_of_partition()"); return stop_iteration(final_stop); } void consume_end_of_stream() { } }; // deduct 2 for partition start and end respectively const auto inter_partition_frag_count = make_frags().size() - 2; auto check = [&] (uint64_t stop_at, bool final_stop) { testlog.debug("stop_at={}, final_stop={}", stop_at, final_stop); auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr)); auto reader = make_mutation_reader_from_fragments(s, permit, make_frags()); auto close_reader = deferred_close(reader); reader.consume(compact_for_query(compaction_state, consumer(stop_at, final_stop))).get(); const auto has_detached_state = bool(std::move(*compaction_state).detach_state()); if (stop_at < inter_partition_frag_count) { BOOST_CHECK_EQUAL(has_detached_state, final_stop); } else { BOOST_CHECK(!has_detached_state); } }; for (unsigned stop_at = 0; stop_at < inter_partition_frag_count; ++stop_at) { check(stop_at, true); check(stop_at, false); } }; SEASTAR_THREAD_TEST_CASE(test_compactor_validator) { const auto abort_ie = set_abort_on_internal_error(false); auto reset_abort_ie = defer([abort_ie] { set_abort_on_internal_error(abort_ie); }); simple_schema ss; auto pks = ss.make_pkeys(2); auto cks = ss.make_ckeys(3); auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); const auto expiry_point = gc_clock::now() + std::chrono::days(10); const auto base_ts = ss.new_timestamp(); const auto row_ts = base_ts + 1; const auto rtc_tombstone_ts = base_ts + 4; const auto partition_tombstone_ts = base_ts + 5; const auto row_ts2 = base_ts + 6; auto make_cr = [&] (const clustering_key& ckey, api::timestamp_type ts) { const auto& v_def = *s->get_column_definition(to_bytes("v")); auto row = clustering_row(ckey); row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, ts, serialized("v"))); row.marker() = row_marker(ts); return mutation_fragment_v2(*s, permit, std::move(row)); }; mutation_fragment_v2 ps1(*s, permit, partition_start(pks[0], {})); mutation_fragment_v2 ps1_tomb(*s, permit, partition_start(pks[0], {partition_tombstone_ts, expiry_point})); mutation_fragment_v2 ps2(*s, permit, partition_start(pks[1], {})); mutation_fragment_v2 sr(*s, permit, ss.make_static_row_v2(permit, "static_row")); mutation_fragment_v2 cr1(*s, permit, make_cr(cks[0], row_ts)); mutation_fragment_v2 cr1_high_ts(*s, permit, make_cr(cks[0], row_ts2)); mutation_fragment_v2 cr2(*s, permit, make_cr(cks[1], row_ts)); mutation_fragment_v2 cr3(*s, permit, make_cr(cks[2], row_ts)); mutation_fragment_v2 rtc1(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[0]), tombstone{rtc_tombstone_ts, expiry_point})); mutation_fragment_v2 rtc2(*s, permit, range_tombstone_change(position_in_partition::before_key(cks[1]), tombstone{rtc_tombstone_ts, expiry_point})); mutation_fragment_v2 rtc_end(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, cks[2]), tombstone{})); mutation_fragment_v2 pe(*s, permit, partition_end()); struct consumer { void consume_new_partition(const dht::decorated_key& dk) { } void consume(const tombstone& t) { } stop_iteration consume(static_row&& sr, tombstone, bool) { auto _ = std::move(sr); return stop_iteration::no; } stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { auto _ = std::move(cr); return stop_iteration::no; } stop_iteration consume(range_tombstone_change&& rtc) { auto _ = std::move(rtc); return stop_iteration::no; } stop_iteration consume_end_of_partition() { return stop_iteration::no; } void consume_end_of_stream() { } }; auto check = [&] (std::initializer_list> frag_refs, bool expected_is_valid) { std::deque frags; for (const auto& frag_ref : frag_refs) { frags.emplace_back(*s, permit, frag_ref.get()); } auto compaction_state = make_lw_shared>(*s, gc_clock::now(), s->full_slice(), std::numeric_limits::max(), std::numeric_limits::max(), tombstone_gc_state(nullptr), mutation_fragment_stream_validation_level::clustering_key); auto reader = make_mutation_reader_from_fragments(s, permit, std::move(frags)); auto close_reader = deferred_close(reader); bool is_valid = true; try { reader.consume(compact_for_query(compaction_state, consumer{})).get(); } catch (invalid_mutation_fragment_stream& ex) { is_valid = false; } if (expected_is_valid != is_valid) { auto msg = fmt::format("expected_is_valid ({}) != is_valid ({}), fragments:\n{}", expected_is_valid, is_valid, fmt::join(frag_refs | std::views::transform([&] (std::reference_wrapper mf) { return fmt::format("{}", mutation_fragment_v2::printer(*s, mf.get())); }), "\n")); BOOST_FAIL(msg); } }; auto check_valid = [&] (std::initializer_list> frag_refs) { return check(frag_refs, true); }; auto check_invalid = [&] (std::initializer_list> frag_refs) { return check(frag_refs, false); }; // Partitions check_valid({ps1, pe}); check_valid({ps1, pe, ps2, pe}); check_invalid({pe, ps1, pe}); check_invalid({ps2, pe, ps1, pe}); check_invalid({ps1}); check_invalid({ps1, pe, ps2}); // + static row check_valid({ps1, sr, pe}); check_valid({ps1_tomb, sr, pe}); check_valid({ps1, sr, pe, ps2, sr, pe}); check_invalid({ps1, pe, sr, ps2, pe}); check_invalid({sr, ps1, pe}); // + clustering row check_valid({ps1, cr1, pe}); check_valid({ps1, sr, cr1, pe}); check_valid({ps1, cr1, cr2, pe}); check_valid({ps1, sr, cr1, cr2, pe}); check_valid({ps1_tomb, cr1, pe}); check_valid({ps1_tomb, cr1, cr2, pe}); check_valid({ps1, cr1, pe, ps2, cr1, pe}); check_invalid({ps1, pe, cr1, ps2, pe}); check_invalid({cr1, ps1, pe}); check_invalid({ps1, cr1, sr, pe}); check_invalid({ps1_tomb, cr1, sr, pe}); check_invalid({ps1_tomb, cr1_high_ts, sr, pe}); check_invalid({ps1, cr2, cr1, pe}); // + range tombstones check_valid({ps1, rtc1, rtc_end, pe}); check_valid({ps1, rtc1, rtc1, rtc_end, pe}); check_valid({ps1, rtc1, rtc1, cr1, rtc_end, pe}); check_valid({ps1, sr, rtc1, cr1, rtc_end, pe}); check_valid({ps1, rtc1, rtc2, rtc_end, pe}); check_valid({ps1, sr, rtc1, cr1, rtc2, cr2, rtc_end, pe}); check_valid({ps1_tomb, rtc1, cr1, rtc_end, pe}); check_valid({ps1_tomb, rtc1, cr1, rtc2, cr2, rtc_end, pe}); check_valid({ps1, rtc1, cr1, rtc_end, pe, ps2, rtc1, cr1, rtc_end, pe}); check_invalid({ps1, rtc1, pe}); check_invalid({ps1, pe, rtc1, rtc_end, ps2, pe}); check_invalid({rtc1, ps1, pe}); check_invalid({ps1, rtc1, rtc_end, sr, pe}); check_invalid({ps1, sr, cr1, rtc1, rtc_end, pe}); check_invalid({ps1_tomb, cr1, rtc1, rtc_end, pe}); check_invalid({ps1_tomb, cr1_high_ts, rtc1, rtc_end, pe}); check_invalid({ps1, rtc2, rtc1, rtc_end, pe}); check_invalid({ps1_tomb, rtc2, rtc1, rtc_end, pe}); }; SEASTAR_TEST_CASE(test_tracing_format) { // scylla-dtest/tools/cdc_utils.py::CDCTraceInfoMatcher matches the // formatted token with "{key: pk(.*?), token:(.*)}", so let's make // sure we don't break it dht::token token{42}; int8_t bytes[] = {0x01, 0x03, 0x00}; dht::decorated_key key{token, partition_key::from_bytes(bytes)}; std::string formatted = fmt::to_string(key); BOOST_CHECK_EQUAL(formatted, "{key: pk{0103}, token: 42}"); return make_ready_future(); } void do_make_collection(collection_mutation_description& desc, std::pair&& cell) { desc.cells.push_back(std::move(cell)); }; template void do_make_collection(collection_mutation_description& desc, std::pair&& cell, Cell&& ...cells) { desc.cells.push_back(std::move(cell)); do_make_collection(desc, std::forward(cells)...); }; SEASTAR_TEST_CASE(test_compact_and_expire_cell_stats) { const auto collection_type = map_type_impl::get_instance(int32_type, int32_type, true); auto schema = schema_builder("test", "test_compact_and_expire_cell_stats") .with_column("pk", int32_type, column_kind::partition_key) .with_column("ck", int32_type, column_kind::clustering_key) .with_column("static_atomic", int32_type, column_kind::static_column) .with_column("static_collection", collection_type, column_kind::static_column) .with_column("regular_atomic", int32_type, column_kind::regular_column) .with_column("regular_collection", collection_type, column_kind::regular_column) .build(); auto& s = *schema; api::timestamp_type live_ts = 10; api::timestamp_type tomb_ts = 5; api::timestamp_type dead_ts = 0; const auto now = gc_clock::now(); const auto value = data_value(10).serialize_nonnull(); const auto value1 = data_value(11).serialize_nonnull(); struct row_content { std::optional atomic_column; std::optional collection_column; }; const auto make_collection = [&] (tombstone tomb, auto&&... cells) { collection_mutation_description desc; desc.tomb = tomb; do_make_collection(desc, std::forward(cells)...); return desc.serialize(*collection_type); }; const auto check = [&] (row_content rc, row_tombstone rt, compact_and_expire_result expected_res, std::source_location sl = std::source_location::current()) { testlog.info("check() @ {}:{}", sl.file_name(), sl.line()); const static std::unordered_map> column_names = { {column_kind::static_column, {"static_atomic", "static_collection"}}, {column_kind::regular_column, {"regular_atomic", "regular_collection"}}, }; for (const auto col_kind : {column_kind::static_column, column_kind::regular_column}) { row r; if (rc.atomic_column) { const auto cdef = *s.get_column_definition(column_names.at(col_kind)[0]); r.apply(cdef, atomic_cell_or_collection(atomic_cell(*int32_type, *rc.atomic_column))); } if (rc.collection_column) { const auto cdef = *s.get_column_definition(column_names.at(col_kind)[1]); r.apply(cdef, atomic_cell_or_collection(collection_mutation(*collection_type, *rc.collection_column))); } auto res = r.compact_and_expire(s, col_kind, rt, now, always_gc, now); BOOST_REQUIRE_EQUAL(res, expected_res); } }; check(row_content{}, row_tombstone{}, {}); check(row_content{.atomic_column = atomic_cell::make_live(*int32_type, live_ts, value)}, row_tombstone{}, {.live_cells = 1}); check(row_content{.atomic_column = atomic_cell::make_live(*int32_type, live_ts, value)}, row_tombstone{tombstone(tomb_ts, now)}, {.live_cells = 1}); check(row_content{.atomic_column = atomic_cell::make_dead(dead_ts, now)}, row_tombstone{}, {.dead_cells = 1}); check(row_content{.atomic_column = atomic_cell::make_live(*int32_type, dead_ts, value)}, row_tombstone{tombstone(tomb_ts, now)}, {.dead_cells = 1}); check(row_content{ .collection_column = make_collection({}, std::pair(value, atomic_cell::make_live(*int32_type, live_ts, value))) }, row_tombstone{}, {.live_cells = 1}); check(row_content{ .collection_column = make_collection({}, std::pair(value, atomic_cell::make_live(*int32_type, live_ts, value)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 2}); check(row_content{ .atomic_column = atomic_cell::make_dead(dead_ts, now), .collection_column = make_collection({}, std::pair(value, atomic_cell::make_live(*int32_type, live_ts, value)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 2, .dead_cells = 1}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, live_ts, value), .collection_column = make_collection({}, std::pair(value, atomic_cell::make_live(*int32_type, live_ts, value)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 3}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, live_ts, value), .collection_column = make_collection({}, std::pair(value, atomic_cell::make_dead(dead_ts, now)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 2, .dead_cells = 1}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, live_ts, value), .collection_column = make_collection(tombstone(tomb_ts, now), std::pair(value, atomic_cell::make_dead(dead_ts, now)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 2, .dead_cells = 1, .collection_tombstones = 1}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, live_ts, value), .collection_column = make_collection(tombstone(tomb_ts, now), std::pair(value, atomic_cell::make_live(*int32_type, dead_ts, value)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone{}, {.live_cells = 2, .dead_cells = 1, .collection_tombstones = 1}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, dead_ts, value), .collection_column = make_collection({}, std::pair(value, atomic_cell::make_dead(dead_ts, now)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone(tombstone(tomb_ts, now)), {.live_cells = 1, .dead_cells = 2}); check(row_content{ .atomic_column = atomic_cell::make_live(*int32_type, dead_ts, value), .collection_column = make_collection(tombstone(dead_ts, now), std::pair(value, atomic_cell::make_dead(dead_ts, now)), std::pair(value1, atomic_cell::make_live(*int32_type, live_ts, value1))) }, row_tombstone(tombstone(tomb_ts, now)), {.live_cells = 1, .dead_cells = 2, .collection_tombstones = 1}); return make_ready_future(); } SEASTAR_THREAD_TEST_CASE(test_to_data_query_results_with_distinct_and_per_partition_limit) { simple_schema ss; const auto& s = *ss.schema(); query::result_memory_limiter limiter(query::result_memory_limiter::maximum_result_size * 100); const auto max_size = query::max_result_size( query::result_memory_limiter::maximum_result_size, query::result_memory_limiter::maximum_result_size, query::result_memory_limiter::maximum_result_size); reconcilable_result_builder builder(s, s.full_slice(), limiter.new_mutation_read(max_size, query::short_read::yes).get()); const auto& v_def = *s.get_column_definition(to_bytes("v")); const auto value = serialized("v"); auto pkeys = ss.make_pkeys(4); for (const auto& pkey : pkeys) { builder.consume_new_partition(pkey); for (uint32_t ck = 0; ck < 10; ck++) { auto row = clustering_row(ss.make_ckey(ck)); row.cells().apply(v_def, atomic_cell::make_live(*v_def.type, ss.new_timestamp(), value)); builder.consume(std::move(row), {}, true); } builder.consume_end_of_partition(); } auto rr = builder.consume_end_of_stream(); BOOST_REQUIRE_EQUAL(rr.partitions().size(), pkeys.size()); BOOST_REQUIRE_EQUAL(rr.row_count(), pkeys.size() * 10); // SELECT DISTINCT { auto slice = partition_slice_builder(s) .with_no_static_columns() .with_no_regular_columns() .with_range(query::clustering_range::make_open_ended_both_sides()) .with_option() .with_option() .with_option() .build(); auto result = to_data_query_result(rr, ss.schema(), slice, query::max_rows, query::max_partitions, {}).get(); BOOST_REQUIRE_EQUAL(result.row_count(), pkeys.size()); } // per-partition limit { auto slice = partition_slice_builder(s) .with_partition_row_limit(2) .with_range(query::clustering_range::make_open_ended_both_sides()) .with_option() .build(); auto result = to_data_query_result(rr, ss.schema(), slice, query::max_rows, query::max_partitions, {}).get(); BOOST_REQUIRE_EQUAL(result.row_count(), pkeys.size() * 2); } } // Max-purgeable has two values: one for regular and one for shadowable // tombstones. Check that the value is not sticky -- if a shadowable is requested // first, it won't apply to regular tombstones and vice-versa. SEASTAR_THREAD_TEST_CASE(test_mutation_compactor_sticky_max_purgeable) { simple_schema ss; auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); auto dk = ss.make_pkey(1); const auto& v_def = *s->get_column_definition(to_bytes("v")); const auto value = serialized("v"); const auto deletion_time = gc_clock::now() - std::chrono::hours(1) - s->gc_grace_seconds(); const auto compaction_time = gc_clock::now(); const api::timestamp_type shadowable_max_purgeable = 110; const api::timestamp_type regular_max_purgeable = 50; const api::timestamp_type timestamp = 100; class mutation_rebuilding_consumer { mutation_rebuilder_v2 _mr; public: explicit mutation_rebuilding_consumer(schema_ptr s) : _mr(std::move(s)) { } void consume_new_partition(dht::decorated_key dk) { _mr.consume_new_partition(std::move(dk)); } void consume(tombstone t) { _mr.consume(t); } stop_iteration consume(static_row&& sr, tombstone, bool) { return _mr.consume(std::move(sr)); } stop_iteration consume(clustering_row&& cr, row_tombstone, bool) { return _mr.consume(std::move(cr)); } stop_iteration consume(range_tombstone_change&& rtc) { return _mr.consume(std::move(rtc)); } stop_iteration consume_end_of_partition() { return _mr.consume_end_of_partition(); } mutation_opt consume_end_of_stream() { return _mr.consume_end_of_stream(); } }; auto get_max_purgeable = [] (const dht::decorated_key&, is_shadowable is) { const auto ts = is == is_shadowable::yes ? shadowable_max_purgeable : regular_max_purgeable; return max_purgeable{ts, max_purgeable::timestamp_source::none}; }; auto compact_and_expire = [&] (mutation mut) { auto reader = make_mutation_reader_from_mutations(s, permit, std::move(mut)); auto close_reader = deferred_close(reader); auto compactor = compact_for_compaction( *s, compaction_time, get_max_purgeable, tombstone_gc_state(nullptr), mutation_rebuilding_consumer(s)); auto mut_opt = reader.consume(std::move(compactor)).get(); BOOST_REQUIRE(mut_opt); return *mut_opt; }; // max-purgeable returned for shadowable tombstone becomes sticky and applies to row tombstone after it { mutation mut(s, dk); mutation mut_compacted(s, dk); auto row1 = clustering_row(ss.make_ckey(1)); row1.apply(shadowable_tombstone(timestamp, deletion_time)); auto row2 = clustering_row(ss.make_ckey(2)); row2.apply(tombstone(timestamp, deletion_time)); auto row3 = clustering_row(ss.make_ckey(3)); row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value)); mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row2))); mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3))); mut.apply(mutation_fragment(*s, permit, std::move(row1))); mut.apply(mutation_fragment(*s, permit, std::move(row2))); mut.apply(mutation_fragment(*s, permit, std::move(row3))); assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted); } // max-purgeable returned for regular tombstone becomes sticky and applies to shadowable tombstone after it { mutation mut(s, dk); mutation mut_compacted(s, dk); auto row1 = clustering_row(ss.make_ckey(1)); row1.apply(tombstone(timestamp, deletion_time)); auto row2 = clustering_row(ss.make_ckey(2)); row2.apply(shadowable_tombstone(timestamp, deletion_time)); auto row3 = clustering_row(ss.make_ckey(3)); row3.cells().apply(v_def, atomic_cell::make_live(*v_def.type, timestamp, value)); mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row1))); mut_compacted.apply(mutation_fragment(*s, permit, clustering_row(*s, row3))); mut.apply(mutation_fragment(*s, permit, std::move(row1))); mut.apply(mutation_fragment(*s, permit, std::move(row2))); mut.apply(mutation_fragment(*s, permit, std::move(row3))); assert_that(compact_and_expire(std::move(mut))).is_equal_to(mut_compacted); } } SEASTAR_THREAD_TEST_CASE(test_serialized_mutation_empty_and_nonfull_keys) { auto random_spec = tests::make_random_schema_specification( get_name(), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(2, 8), std::uniform_int_distribution(2, 8)); auto random_schema = tests::random_schema(tests::random::get_int(), *random_spec); auto schema = random_schema.schema(); auto updated_schema = schema_builder(schema) .remove_column(schema->regular_column_at(0).name()) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; testlog.info("Random schema:\n{}", random_schema.cql()); const auto mutations = tests::generate_random_mutations(random_schema, 1).get(); { const auto& mut = mutations.back(); frozen_mutation fm(mut); assert_that(fm.unfreeze(schema)).is_equal_to(mut); } auto reset_abort = defer([abort = set_abort_on_internal_error(false)] { set_abort_on_internal_error(abort); }); auto check = [&] (const clustering_key& ckey) { testlog.info("Adding row with bad key: {}", ckey); auto mut = mutations.back(); // Need to sneak in the bad key via the back-door, the mutation_partition // will rejects it via the regular insert/update methods. auto& rows = mut.partition().mutable_clustered_rows(); auto e = alloc_strategy_unique_ptr(current_allocator().construct(ckey, deletable_row(*schema, mut.partition().clustered_rows().begin()->row()))); rows.insert_before_hint(rows.end(), std::move(e), rows_entry::tri_compare(*schema)); frozen_mutation fm(mut); BOOST_REQUIRE_THROW(fm.unfreeze(schema), std::runtime_error); BOOST_REQUIRE_THROW(fm.unfreeze(updated_schema), std::runtime_error); canonical_mutation cm(mut); BOOST_REQUIRE_THROW(cm.to_mutation(schema), std::runtime_error); BOOST_REQUIRE_THROW(cm.to_mutation(updated_schema), std::runtime_error); }; check(clustering_key::make_empty()); if (schema->clustering_key_size() > 1) { auto full_ckey = random_schema.make_ckey(0); full_ckey.erase(full_ckey.end() - 1); check(clustering_key::from_exploded(*schema, full_ckey)); } } SEASTAR_THREAD_TEST_CASE(test_mutation_empty_and_nonfull_keys) { auto random_spec = tests::make_random_schema_specification( get_name(), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(2, 8), std::uniform_int_distribution(2, 8)); auto random_schema = tests::random_schema(tests::random::get_int(), *random_spec); auto schema = random_schema.schema(); testlog.info("Random schema:\n{}", random_schema.cql()); auto reset_abort = defer([abort = set_abort_on_internal_error(false)] { set_abort_on_internal_error(abort); }); auto check = [&] (const clustering_key& ckey) { testlog.info("Adding row with bad key: {}", ckey); mutation_partition mp(*schema); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, ckey), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, clustering_key(ckey)), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, clustering_key_view(ckey)), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); }; check(clustering_key::make_empty()); if (schema->clustering_key_size() > 1) { auto full_ckey = random_schema.make_ckey(0); full_ckey.erase(full_ckey.end() - 1); check(clustering_key::from_exploded(*schema, full_ckey)); } mutation_partition mp(*schema); const auto ckey = clustering_key::from_exploded(*schema, random_schema.make_ckey(0)); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); } SEASTAR_THREAD_TEST_CASE(test_mutation_partition_v2_empty_and_nonfull_keys) { auto random_spec = tests::make_random_schema_specification( get_name(), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(1, 4), std::uniform_int_distribution(1, 1), std::uniform_int_distribution(1, 1)); auto random_schema = tests::random_schema(tests::random::get_int(), *random_spec); auto schema = random_schema.schema(); testlog.info("Random schema:\n{}", random_schema.cql()); auto reset_abort = defer([abort = set_abort_on_internal_error(false)] { set_abort_on_internal_error(abort); }); auto check = [&] (const clustering_key& ckey) { testlog.info("Adding row with bad key: {}", ckey); mutation_partition_v2 mp(*schema); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, ckey), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, clustering_key(ckey)), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, clustering_key_view(ckey)), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::for_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); }; check(clustering_key::make_empty()); if (schema->clustering_key_size() > 1) { auto full_ckey = random_schema.make_ckey(0); full_ckey.erase(full_ckey.end() - 1); check(clustering_key::from_exploded(*schema, full_ckey)); } mutation_partition_v2 mp(*schema); const auto ckey = clustering_key::from_exploded(*schema, random_schema.make_ckey(0)); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_row(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.clustered_rows_entry(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::before_key(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::after_all_prefixed(ckey), is_dummy::no, is_continuous::no), std::runtime_error); BOOST_REQUIRE_THROW(mp.append_clustered_row(*schema, position_in_partition_view::for_static_row(), is_dummy::no, is_continuous::no), std::runtime_error); }