/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include "replica/database.hh" #include "db/config.hh" #include "utils/assert.hh" #include "utils/UUID_gen.hh" #undef SEASTAR_TESTING_MAIN #include #include #include "schema/schema_builder.hh" #include #include "service/migration_manager.hh" #include #include #include "replica/memtable.hh" #include "test/lib/cql_test_env.hh" #include "test/lib/cql_assertions.hh" #include "test/lib/mutation_source_test.hh" #include "test/lib/mutation_assertions.hh" #include "test/lib/mutation_reader_assertions.hh" #include "test/lib/data_model.hh" #include "test/lib/eventually.hh" #include "test/lib/random_utils.hh" #include "test/lib/log.hh" #include "test/lib/simple_schema.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/simple_schema.hh" #include "test/lib/key_utils.hh" #include "test/lib/sstable_utils.hh" #include "utils/error_injection.hh" #include "db/commitlog/commitlog.hh" #include "test/lib/make_random_string.hh" #include "db/extensions.hh" #include "db/config.hh" #include "service/storage_service.hh" BOOST_AUTO_TEST_SUITE(memtable_test) using namespace std::literals::chrono_literals; static api::timestamp_type next_timestamp() { static thread_local api::timestamp_type next_timestamp = 1; return next_timestamp++; } static bytes make_unique_bytes() { return to_bytes(fmt::to_string(utils::UUID_gen::get_time_UUID())); } static void set_column(mutation& m, const sstring& column_name) { SCYLLA_ASSERT(m.schema()->get_column_definition(to_bytes(column_name))->type == bytes_type); auto value = data_value(make_unique_bytes()); m.set_clustered_cell(clustering_key::make_empty(), to_bytes(column_name), value, next_timestamp()); } static mutation make_unique_mutation(schema_ptr s) { return mutation(s, partition_key::from_single_value(*s, make_unique_bytes())); } // Returns a vector of empty mutations in ring order utils::chunked_vector make_ring(schema_ptr s, int n_mutations) { utils::chunked_vector ring; for (int i = 0; i < n_mutations; ++i) { ring.push_back(make_unique_mutation(s)); } std::sort(ring.begin(), ring.end(), mutation_decorated_key_less_comparator()); return ring; } SEASTAR_TEST_CASE(test_memtable_conforms_to_mutation_source) { return seastar::async([] { run_mutation_source_tests([](schema_ptr s, const utils::chunked_vector& partitions) { auto mt = make_memtable(s, partitions); logalloc::shard_tracker().full_compaction(); return mt->as_data_source(); }); }); } static future<> test_memtable(void (*run_tests)(populate_fn_ex, bool)) { return seastar::async([run_tests] { tests::reader_concurrency_semaphore_wrapper semaphore; lw_shared_ptr mt; std::vector readers; auto clear_readers = [&readers] { parallel_for_each(readers, [] (mutation_reader& rd) { return rd.close(); }).finally([&readers] { readers.clear(); }).get(); }; auto cleanup_readers = defer([&] { clear_readers(); }); std::deque ranges_storage; lw_shared_ptr finished = make_lw_shared(false); auto full_compaction_in_background = seastar::do_until([finished] {return *finished;}, [] { // do_refresh_state is called when we detect a new partition snapshot version. // If snapshot version changes in process of reading mutation fragments from a // clustering range, the partition_snapshot_reader state is refreshed with saved // last position of emitted row and range tombstone. full_compaction increases the // change mark. logalloc::shard_tracker().full_compaction(); return seastar::sleep(100us); }); run_tests([&] (schema_ptr s, const utils::chunked_vector& muts, gc_clock::time_point) { clear_readers(); mt = make_lw_shared(s); for (auto&& m : muts) { mt->apply(m); // Create reader so that each mutation is in a separate version auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), ranges_storage.emplace_back(dht::partition_range::make_singular(m.decorated_key()))); rd.set_max_buffer_size(1); rd.fill_buffer().get(); readers.emplace_back(std::move(rd)); } return mt->as_data_source(); }, true); *finished = true; full_compaction_in_background.get(); }); } // plain SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_basic) { return test_memtable(run_mutation_source_tests_plain_basic); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_reader_conversion) { return test_memtable(run_mutation_source_tests_plain_reader_conversion); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_fragments_monotonic) { return test_memtable(run_mutation_source_tests_plain_fragments_monotonic); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_read_back) { return test_memtable(run_mutation_source_tests_plain_read_back); } // reverse SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_basic) { return test_memtable(run_mutation_source_tests_reverse_basic); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_reader_conversion) { return test_memtable(run_mutation_source_tests_reverse_reader_conversion); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_fragments_monotonic) { return test_memtable(run_mutation_source_tests_reverse_fragments_monotonic); } SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_read_back) { return test_memtable(run_mutation_source_tests_reverse_read_back); } SEASTAR_TEST_CASE(test_memtable_flush_reader) { // Memtable flush reader is severely limited, it always assumes that // the full partition range is being read and that // streamed_mutation::forwarding is set to no. Therefore, we cannot use // run_mutation_source_tests() to test it. return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; auto make_memtable = [] (replica::dirty_memory_manager& mgr, replica::memtable_table_shared_data& table_shared_data, replica::table_stats& tbl_stats, utils::chunked_vector muts) { SCYLLA_ASSERT(!muts.empty()); auto mt = make_lw_shared(muts.front().schema(), mgr, table_shared_data, tbl_stats); for (auto& m : muts) { mt->apply(m); } return mt; }; auto test_random_streams = [&] (random_mutation_generator&& gen) { for (auto i = 0; i < 4; i++) { replica::table_stats tbl_stats; replica::memtable_table_shared_data table_shared_data; replica::dirty_memory_manager mgr; const auto muts = gen(4); const auto now = gc_clock::now(); auto compacted_muts = muts; for (auto& mut : compacted_muts) { mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now, tombstone_gc_state::for_tests()); } testlog.info("Simple read"); auto mt = make_memtable(mgr, table_shared_data, tbl_stats, muts); assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit())) .produces_compacted(compacted_muts[0], now) .produces_compacted(compacted_muts[1], now) .produces_compacted(compacted_muts[2], now) .produces_compacted(compacted_muts[3], now) .produces_end_of_stream(); testlog.info("Read with next_partition() calls between partition"); mt = make_memtable(mgr, table_shared_data, tbl_stats, muts); assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit())) .next_partition() .produces_compacted(compacted_muts[0], now) .next_partition() .produces_compacted(compacted_muts[1], now) .next_partition() .produces_compacted(compacted_muts[2], now) .next_partition() .produces_compacted(compacted_muts[3], now) .next_partition() .produces_end_of_stream(); testlog.info("Read with next_partition() calls inside partitions"); mt = make_memtable(mgr, table_shared_data, tbl_stats, muts); assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit())) .produces_compacted(compacted_muts[0], now) .produces_partition_start(muts[1].decorated_key(), muts[1].partition().partition_tombstone()) .next_partition() .produces_compacted(compacted_muts[2], now) .next_partition() .produces_partition_start(muts[3].decorated_key(), muts[3].partition().partition_tombstone()) .next_partition() .produces_end_of_stream(); } }; test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); }); } SEASTAR_TEST_CASE(test_adding_a_column_during_reading_doesnt_affect_read_result) { return seastar::async([] { auto common_builder = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key); auto s1 = common_builder .with_column("v2", bytes_type, column_kind::regular_column) .build(); auto s2 = common_builder .with_column("v1", bytes_type, column_kind::regular_column) // new column .with_column("v2", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; auto mt = make_lw_shared(s1); utils::chunked_vector ring = make_ring(s1, 3); for (auto&& m : ring) { set_column(m, "v2"); mt->apply(m); } auto check_rd_s1 = assert_that(mt->make_mutation_reader(s1, semaphore.make_permit())); auto check_rd_s2 = assert_that(mt->make_mutation_reader(s2, semaphore.make_permit())); check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[0]); check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[0]); mt->set_schema(s2); check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[1]); check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[1]); check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[2]); check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[2]); check_rd_s1.produces_end_of_stream(); check_rd_s2.produces_end_of_stream(); assert_that(mt->make_mutation_reader(s1, semaphore.make_permit())) .produces(ring[0]) .produces(ring[1]) .produces(ring[2]) .produces_end_of_stream(); assert_that(mt->make_mutation_reader(s2, semaphore.make_permit())) .produces(ring[0]) .produces(ring[1]) .produces(ring[2]) .produces_end_of_stream(); }); } SEASTAR_TEST_CASE(test_unspooled_dirty_accounting_on_flush) { return seastar::async([] { schema_ptr s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("col", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; replica::dirty_memory_manager mgr; replica::memtable_table_shared_data table_shared_data; replica::table_stats tbl_stats; auto mt = make_lw_shared(s, mgr, table_shared_data, tbl_stats); utils::chunked_vector ring = make_ring(s, 3); utils::chunked_vector current_ring; for (auto&& m : ring) { auto m_with_cell = m; m_with_cell.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"), data_value(bytes(bytes::initialized_later(), 4096)), next_timestamp()); mt->apply(m_with_cell); current_ring.push_back(m_with_cell); } // Create a reader which will cause many partition versions to be created mutation_reader_opt rd1 = mt->make_mutation_reader(s, semaphore.make_permit()); auto close_rd1 = deferred_close(*rd1); rd1->set_max_buffer_size(1); rd1->fill_buffer().get(); // Override large cell value with a short one { auto part0_update = ring[0]; part0_update.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp()); mt->apply(std::move(part0_update)); current_ring[0] = part0_update; } std::vector unspooled_dirty_values; unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory()); auto flush_reader_check = assert_that(mt->make_flush_reader(s, semaphore.make_permit())); flush_reader_check.produces_partition(current_ring[0]); unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory()); flush_reader_check.produces_partition(current_ring[1]); unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory()); while ((*rd1)().get()) ; close_rd1.close_now(); logalloc::shard_tracker().full_compaction(); flush_reader_check.produces_partition(current_ring[2]); unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory()); flush_reader_check.produces_end_of_stream(); unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory()); std::reverse(unspooled_dirty_values.begin(), unspooled_dirty_values.end()); BOOST_REQUIRE(std::is_sorted(unspooled_dirty_values.begin(), unspooled_dirty_values.end())); }); } // Reproducer for #1753 SEASTAR_TEST_CASE(test_partition_version_consistency_after_lsa_compaction_happens) { return seastar::async([] { schema_ptr s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("col", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; auto mt = make_lw_shared(s); auto empty_m = make_unique_mutation(s); auto ck1 = clustering_key::from_single_value(*s, serialized(make_unique_bytes())); auto ck2 = clustering_key::from_single_value(*s, serialized(make_unique_bytes())); auto ck3 = clustering_key::from_single_value(*s, serialized(make_unique_bytes())); auto m1 = empty_m; m1.set_clustered_cell(ck1, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp()); auto m2 = empty_m; m2.set_clustered_cell(ck2, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp()); auto m3 = empty_m; m3.set_clustered_cell(ck3, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp()); mt->apply(m1); std::optional rd1 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd1->set_max_buffer_size(1); rd1->fill_buffer().get(); mt->apply(m2); std::optional rd2 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd2->set_max_buffer_size(1); rd2->fill_buffer().get(); mt->apply(m3); std::optional rd3 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd3->set_max_buffer_size(1); rd3->fill_buffer().get(); logalloc::shard_tracker().full_compaction(); auto rd4 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd4.set_max_buffer_size(1); rd4.fill_buffer().get(); auto rd5 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd5.set_max_buffer_size(1); rd5.fill_buffer().get(); auto rd6 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd6.set_max_buffer_size(1); rd6.fill_buffer().get(); rd1->next_mutation().is_equal_to(m1); rd2->next_mutation().is_equal_to(m1 + m2); rd3->next_mutation().is_equal_to(m1 + m2 + m3); rd3 = {}; rd4.next_mutation().is_equal_to(m1 + m2 + m3); rd1 = {}; rd5.next_mutation().is_equal_to(m1 + m2 + m3); rd2 = {}; rd6.next_mutation().is_equal_to(m1 + m2 + m3); }); } // Reproducer for #1746 SEASTAR_TEST_CASE(test_segment_migration_during_flush) { return seastar::async([] { schema_ptr s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("ck", bytes_type, column_kind::clustering_key) .with_column("col", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; replica::table_stats tbl_stats; replica::memtable_table_shared_data table_shared_data; replica::dirty_memory_manager mgr; auto mt = make_lw_shared(s, mgr, table_shared_data, tbl_stats); const int rows_per_partition = 300; const int partitions = 3; utils::chunked_vector ring = make_ring(s, partitions); for (auto& m : ring) { for (int i = 0; i < rows_per_partition; ++i) { auto ck = clustering_key::from_single_value(*s, serialized(make_unique_bytes())); auto col_value = data_value(bytes(bytes::initialized_later(), 8)); m.set_clustered_cell(ck, to_bytes("col"), col_value, next_timestamp()); } mt->apply(m); } auto rd = mt->make_flush_reader(s, semaphore.make_permit()); auto close_rd = deferred_close(rd); for (int i = 0; i < partitions; ++i) { auto mfopt = rd().get(); BOOST_REQUIRE(bool(mfopt)); BOOST_REQUIRE(mfopt->is_partition_start()); while (!mfopt->is_end_of_partition()) { logalloc::shard_tracker().full_compaction(); mfopt = rd().get(); } BOOST_REQUIRE_LE(mgr.unspooled_dirty_memory(), mgr.real_dirty_memory()); } BOOST_REQUIRE(!rd().get()); }); } // Reproducer for #2854 SEASTAR_TEST_CASE(test_fast_forward_to_after_memtable_is_flushed) { return seastar::async([] { schema_ptr s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("col", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; utils::chunked_vector ring = make_ring(s, 5); auto mt = make_memtable(s, ring); auto mt2 = make_memtable(s, ring); auto rd = assert_that(mt->make_mutation_reader(s, semaphore.make_permit())); rd.produces(ring[0]); mt->mark_flushed(mt2->as_data_source()); rd.produces(ring[1]); auto range = dht::partition_range::make_starting_with(dht::ring_position(ring[3].decorated_key())); rd.fast_forward_to(range); rd.produces(ring[3]).produces(ring[4]).produces_end_of_stream(); }); } SEASTAR_TEST_CASE(test_exception_safety_of_partition_range_reads) { return seastar::async([] { random_mutation_generator gen(random_mutation_generator::generate_counters::no); auto s = gen.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; utils::chunked_vector ms = gen(2); auto mt = make_memtable(s, ms); memory::with_allocation_failures([&] { assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range)) .produces(ms); }); }); } SEASTAR_TEST_CASE(test_exception_safety_of_flush_reads) { return seastar::async([] { random_mutation_generator gen(random_mutation_generator::generate_counters::no); auto s = gen.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; utils::chunked_vector ms = gen(2); auto mt = make_memtable(s, ms); memory::with_allocation_failures([&] { auto revert = defer([&] { mt->revert_flushed_memory(); }); assert_that(mt->make_flush_reader(s, semaphore.make_permit())) .produces(ms); }); }); } SEASTAR_TEST_CASE(test_exception_safety_of_single_partition_reads) { return seastar::async([] { random_mutation_generator gen(random_mutation_generator::generate_counters::no); auto s = gen.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; utils::chunked_vector ms = gen(2); auto mt = make_memtable(s, ms); memory::with_allocation_failures([&] { assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), dht::partition_range::make_singular(ms[1].decorated_key()))) .produces(ms[1]); }); }); } SEASTAR_THREAD_TEST_CASE(test_tombstone_compaction_during_flush) { tests::reader_concurrency_semaphore_wrapper semaphore; simple_schema ss; auto s = ss.schema(); auto mt = make_lw_shared(ss.schema()); auto pk = ss.make_pkey(0); auto pr = dht::partition_range::make_singular(pk); int n_rows = 10000; { mutation m(ss.schema(), pk); for (int i = 0; i < n_rows; ++i) { ss.add_row(m, ss.make_ckey(i), "v1"); } mt->apply(m); } auto rd1 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_rd1 = defer([&] { rd1.close().get(); }); rd1.fill_buffer().get(); mutation rt_m(ss.schema(), pk); auto rt = ss.delete_range(rt_m, ss.make_ckey_range(0, n_rows)); mt->apply(rt_m); auto rd2 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_rd2 = defer([&] { rd2.close().get(); }); rd2.fill_buffer().get(); mt->apply(rt_m); // whatever auto flush_rd = mt->make_flush_reader(ss.schema(), semaphore.make_permit()); auto close_flush_rd = defer([&] { flush_rd.close().get(); }); while (!flush_rd.is_end_of_stream()) { flush_rd().get(); } { auto close_rd = std::move(close_rd2); } { auto rd = std::move(rd2); } { auto close_rd = std::move(close_rd1); } { auto rd = std::move(rd1); } mt->cleaner().drain().get(); } SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_multiple_versions) { tests::reader_concurrency_semaphore_wrapper semaphore; simple_schema ss; auto s = ss.schema(); auto mt = make_lw_shared(ss.schema()); auto pk = ss.make_pkey(0); auto pr = dht::partition_range::make_singular(pk); auto t0 = ss.new_tombstone(); auto t1 = ss.new_tombstone(); auto t2 = ss.new_tombstone(); auto t3 = ss.new_tombstone(); mutation m1(s, pk); ss.delete_range(m1, *position_range_to_clustering_range(position_range( position_in_partition::before_key(ss.make_ckey(0)), position_in_partition::for_key(ss.make_ckey(3))), *s), t1); ss.add_row(m1, ss.make_ckey(0), "v"); ss.add_row(m1, ss.make_ckey(1), "v"); // Fill so that rd1 stays in the partition snapshot int n_rows = 1000; auto v = make_random_string(512); for (int i = 0; i < n_rows; ++i) { ss.add_row(m1, ss.make_ckey(i), v); } mutation m2(s, pk); ss.delete_range(m2, *position_range_to_clustering_range(position_range( position_in_partition::before_key(ss.make_ckey(0)), position_in_partition::before_key(ss.make_ckey(1))), *s), t2); ss.delete_range(m2, *position_range_to_clustering_range(position_range( position_in_partition::before_key(ss.make_ckey(1)), position_in_partition::for_key(ss.make_ckey(3))), *s), t3); mutation m3(s, pk); ss.delete_range(m3, *position_range_to_clustering_range(position_range( position_in_partition::before_key(ss.make_ckey(0)), position_in_partition::for_key(ss.make_ckey(4))), *s), t0); mt->apply(m1); auto rd1 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_rd1 = defer([&] { rd1.close().get(); }); rd1.fill_buffer().get(); BOOST_REQUIRE(!rd1.is_end_of_stream()); // rd1 must keep the m1 version alive mt->apply(m2); auto rd2 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_r2 = defer([&] { rd2.close().get(); }); rd2.fill_buffer().get(); BOOST_REQUIRE(!rd2.is_end_of_stream()); // rd2 must keep the m1 version alive mt->apply(m3); assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr)) .has_monotonic_positions(); assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr)) .produces(m1 + m2 + m3); } SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_mvcc_and_preemption) { tests::reader_concurrency_semaphore_wrapper semaphore; simple_schema ss; auto s = ss.schema(); auto mt = make_lw_shared(ss.schema()); auto pk = ss.make_pkey(0); auto pr = dht::partition_range::make_singular(pk); // Produce large m0 so that merging range tombstone from m1 into m0 is likely to be preempted in the middle. int n_tombstones = 10000; int key_delta_per_tombstone = 3; mutation m0(s, pk); { int key = 0; for (int i = 0; i < n_tombstones; ++i) { ss.add_row(m0, ss.make_ckey(key), "value"); key += key_delta_per_tombstone; } } mt->apply(m0); std::optional rd0 = mt->make_mutation_reader( s, semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_rd0 = defer([&] { rd0->close().get(); }); rd0->fill_buffer().get(); BOOST_REQUIRE(!rd0->is_end_of_stream()); auto k1 = n_tombstones * key_delta_per_tombstone / 3; auto k2 = k1 + n_tombstones * key_delta_per_tombstone / 2; mutation m1(s, pk); ss.delete_range(m1, ss.make_ckey_range(k1, k2)); mt->apply(m1); std::optional rd1 = mt->make_mutation_reader( s, semaphore.make_permit(), pr, s->full_slice(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); auto close_rd1 = defer([&] { rd1->close().get(); }); rd1->fill_buffer().get(); BOOST_REQUIRE(!rd1->is_end_of_stream()); // Trigger merging of m1 into m0. close_rd0.cancel(); rd0->close().get(); rd0 = {}; mutation m2(s, pk); ss.delete_range(m2, ss.make_ckey_range(0, 1)); // Shadow earlier range tombstone in m1 to test whether applying this range tombstone // to m1 (from m2) while m1 is still merging its range tombstones to m0 doesn't // lead to loss of information from m2 due to the way preemption is handled in m1 -> m0 merging. ss.delete_range(m2, ss.make_ckey_range(k1, k2)); mt->apply(m2); // Trigger merging of m2 into m1. // Some of it will complete immediately. // Let's see if updates of m1 from m2 are not lost while merging of m1 into m0 is still in progress. close_rd1.cancel(); rd1->close().get(); rd1 = {}; // Wait for merging to complete so that we read the final result later. mt->cleaner().drain().get(); assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr)) .produces(m0 + m1 + m2); } SEASTAR_THREAD_TEST_CASE(test_range_tombstones_are_compacted_with_data) { tests::reader_concurrency_semaphore_wrapper semaphore; simple_schema ss; auto s = ss.schema(); auto mt = make_lw_shared(ss.schema()); auto pk = ss.make_pkey(0); auto pr = dht::partition_range::make_singular(pk); auto old_tombstone = ss.new_tombstone(); // older than any write, does not cover anything { mutation m(ss.schema(), pk); ss.add_row(m, ss.make_ckey(1), "v1"); ss.add_row(m, ss.make_ckey(2), "v1"); ss.add_row(m, ss.make_ckey(3), "v1"); ss.add_row(m, ss.make_ckey(4), "v1"); mt->apply(m); } mutation rt_m(ss.schema(), pk); auto rt = ss.delete_range(rt_m, ss.make_ckey_range(2,3)); mt->apply(rt_m); mt->cleaner().drain().get(); assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr)) .produces_partition_start(pk) .produces_row_with_key(ss.make_ckey(1)) .produces_range_tombstone_change({rt.position(), rt.tomb}) .produces_range_tombstone_change({rt.end_position(), {}}) .produces_row_with_key(ss.make_ckey(4)) .produces_partition_end() .produces_end_of_stream(); { mutation m(ss.schema(), pk); m.partition().apply(old_tombstone); mt->apply(m); mt->cleaner().drain().get(); } // No change assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr)) .produces_partition_start(pk, {old_tombstone}) .produces_row_with_key(ss.make_ckey(1)) .produces_range_tombstone_change({rt.position(), rt.tomb}) .produces_range_tombstone_change({rt.end_position(), {}}) .produces_row_with_key(ss.make_ckey(4)) .produces_partition_end() .produces_end_of_stream(); auto new_tomb = ss.new_tombstone(); { mutation m(ss.schema(), pk); m.partition().apply(new_tomb); mt->apply(m); mt->cleaner().drain().get(); } assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr)) .produces_partition_start(pk, {new_tomb}) .produces_range_tombstone_change({rt.position(), rt.tomb}) .produces_range_tombstone_change({rt.end_position(), {}}) .produces_partition_end() .produces_end_of_stream(); } SEASTAR_TEST_CASE(test_hash_is_cached) { return seastar::async([] { auto s = schema_builder("ks", "cf") .with_column("pk", bytes_type, column_kind::partition_key) .with_column("v", bytes_type, column_kind::regular_column) .build(); tests::reader_concurrency_semaphore_wrapper semaphore; auto mt = make_lw_shared(s); auto m = make_unique_mutation(s); set_column(m, "v"); mt->apply(m); { auto rd = mt->make_mutation_reader(s, semaphore.make_permit()); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } { auto slice = s->full_slice(); slice.options.set(); auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = mt->make_mutation_reader(s, semaphore.make_permit()); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } set_column(m, "v"); mt->apply(m); { auto rd = mt->make_mutation_reader(s, semaphore.make_permit()); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(!row.cells().cell_hash_for(0)); } { auto slice = s->full_slice(); slice.options.set(); auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } { auto rd = mt->make_mutation_reader(s, semaphore.make_permit()); auto close_rd = deferred_close(rd); rd().get()->as_partition_start(); clustering_row row = std::move(*rd().get()).as_clustering_row(); BOOST_REQUIRE(row.cells().cell_hash_for(0)); } }); } SEASTAR_THREAD_TEST_CASE(test_collecting_encoding_stats) { auto random_int32_value = [] { return int32_type->decompose(tests::random::get_int()); }; auto now = gc_clock::now(); auto td = tests::data_model::table_description({ { "pk", int32_type } }, { { "ck", utf8_type } }); auto td1 = td; td1.add_static_column("s1", int32_type); td1.add_regular_column("v1", int32_type); td1.add_regular_column("v2", int32_type); auto built_schema = td1.build(); auto s = built_schema.schema; auto md1 = tests::data_model::mutation_description({ to_bytes("pk1") }); md1.add_clustered_row_marker({ to_bytes("ck1") }); md1.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value()); auto m1 = md1.build(s); auto md2 = tests::data_model::mutation_description({ to_bytes("pk2") }); auto md2_ttl = gc_clock::duration(std::chrono::seconds(1)); api::timestamp_type md2_timestamp = -10; md2.add_clustered_row_marker({ to_bytes("ck1") }, md2_timestamp); md2.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value()); md2.add_clustered_cell({ to_bytes("ck2") }, "v2", tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md2_ttl, now + md2_ttl)); auto m2 = md2.build(s); auto md3 = tests::data_model::mutation_description({ to_bytes("pk3") }); auto md3_ttl = gc_clock::duration(std::chrono::seconds(2)); auto md3_expiry_point = now - std::chrono::hours(8); md3.add_static_cell("s1", tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md3_ttl, md3_expiry_point)); auto m3 = md3.build(s); auto md4 = tests::data_model::mutation_description({ to_bytes("pk1") }); auto md4_tombstone = tombstone(md2_timestamp - 10, now - std::chrono::hours(9)); md4.set_partition_tombstone(md4_tombstone); auto m4 = md4.build(s); auto mt = make_lw_shared(s); auto stats = mt->get_encoding_stats(); BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max()); BOOST_CHECK_EQUAL(stats.min_timestamp, api::max_timestamp); BOOST_CHECK(stats.min_ttl == gc_clock::duration::max()); BOOST_CHECK_EQUAL(mt->get_min_timestamp(), 0); BOOST_CHECK_EQUAL(mt->get_max_timestamp(), 0); BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), api::max_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), api::max_timestamp); mt->apply(m1); stats = mt->get_encoding_stats(); BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max()); BOOST_CHECK_EQUAL(stats.min_timestamp, tests::data_model::data_timestamp); BOOST_CHECK(stats.min_ttl == gc_clock::duration::max()); BOOST_CHECK_EQUAL(mt->get_min_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), tests::data_model::data_timestamp); mt->apply(m2); stats = mt->get_encoding_stats(); BOOST_CHECK(stats.min_local_deletion_time == now + md2_ttl); BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp); BOOST_CHECK(stats.min_ttl == md2_ttl); BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp); BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp); mt->apply(m3); stats = mt->get_encoding_stats(); BOOST_CHECK(stats.min_local_deletion_time == md3_expiry_point); BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp); BOOST_CHECK(stats.min_ttl == md2_ttl); BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp); BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp); mt->apply(m4); stats = mt->get_encoding_stats(); BOOST_CHECK(stats.min_local_deletion_time == md4_tombstone.deletion_time); BOOST_CHECK_EQUAL(stats.min_timestamp, md4_tombstone.timestamp); BOOST_CHECK(stats.min_ttl == md2_ttl); BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md4_tombstone.timestamp); BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp); BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp); } SEASTAR_TEST_CASE(memtable_flush_compresses_mutations) { auto db_config = make_shared(); db_config->enable_cache.set(false); return do_with_cql_env_thread([](cql_test_env& env) { // Create table and insert some data char const* ks_name = "keyspace_name"; char const* table_name = "table_name"; env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get(); env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get(); replica::database& db = env.local_db(); replica::table& t = db.find_column_family(ks_name, table_name); tests::reader_concurrency_semaphore_wrapper semaphore; schema_ptr s = t.schema(); // Build expected mutation with partition key: 1, clustering_key: 2 and value of id column: 3 dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1))); clustering_key ck = clustering_key::from_single_value(*s, serialized(2)); mutation m1 = mutation(s, pk); m1.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp()); mutation m2 = mutation(s, pk); m2.partition().apply_delete(*s, clustering_key_prefix::from_singular(*s, 2), tombstone{api::new_timestamp(), gc_clock::now()}); t.apply(m1); t.apply(m2); // Flush to make sure all the modifications make it to disk t.flush().get(); // Treat the table as mutation_source and SCYLLA_ASSERT we get the expected mutation and end of stream mutation_source ms = t.as_mutation_source(); assert_that(ms.make_mutation_reader(s, semaphore.make_permit())) .produces(m2) .produces_end_of_stream(); }, db_config); } static auto check_has_error_injection() { return boost::unit_test::precondition([](auto){ return #ifdef SCYLLA_ENABLE_ERROR_INJECTION true #else false #endif ; }); } SEASTAR_TEST_CASE(memtable_flush_period, *check_has_error_injection()) { #ifdef SCYLLA_ENABLE_ERROR_INJECTION auto db_config = make_shared(); db_config->enable_cache.set(false); return do_with_cql_env_thread([](cql_test_env& env) { // Create table and insert some data char const* ks_name = "keyspace_name"; char const* table_name = "table_name"; env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get(); env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get(); replica::database& db = env.local_db(); replica::table& t = db.find_column_family(ks_name, table_name); tests::reader_concurrency_semaphore_wrapper semaphore; auto s1 = t.schema(); dht::decorated_key pk = dht::decorate_key(*s1, partition_key::from_single_value(*s1, serialized(1))); clustering_key ck = clustering_key::from_single_value(*s1, serialized(2)); mutation m = mutation(s1, pk); m.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp()); t.apply(m); BOOST_REQUIRE_EQUAL(t.sstables_count(), 0); // add mutation and check there are no sstables for this table auto& errj = utils::get_local_injector(); errj.enable("table_seal_post_flush_waiters", true); // change schema to set memtable flush period // we use small value in this test but it is impossible to set the period less than 60000ms using ALTER TABLE construction schema_builder b(t.schema()); b.set_memtable_flush_period(200); schema_ptr s2 = b.build(); t.set_schema(s2); BOOST_TEST_MESSAGE("Wait for flush"); errj.inject("table_seal_post_flush_waiters", utils::wait_for_message(std::chrono::minutes(2))).get(); BOOST_TEST_MESSAGE("Flush received"); BOOST_REQUIRE(eventually_true([&] { // wait until memtable will be flushed at least once return t.sstables_count() == 1; })); // Check mutation presents in the table mutation_source ms = t.as_mutation_source(); assert_that(ms.make_mutation_reader(s2, semaphore.make_permit())) .produces(m) .produces_end_of_stream(); }, db_config); #else BOOST_TEST_MESSAGE("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev)"); return make_ready_future<>(); #endif } SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) { auto db_config = make_shared(); db_config->enable_cache.set(false); return do_with_cql_env_thread([](cql_test_env& env) { replica::database& db = env.local_db(); service::migration_manager& mm = env.migration_manager().local(); sstring ks_name = "ks"; sstring table_name = "table_name"; schema_ptr s = schema_builder(ks_name, table_name) .with_column(to_bytes("pk"), int32_type, column_kind::partition_key) .with_column(to_bytes("ck"), int32_type, column_kind::clustering_key) .with_column(to_bytes("id"), int32_type) .set_gc_grace_seconds(1) .build(); auto group0_guard = mm.start_group0_operation().get(); auto ts = group0_guard.write_timestamp(); mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get(); replica::table& t = db.find_column_family(ks_name, table_name); dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1))); clustering_key ck_to_delete = clustering_key::from_single_value(*s, serialized(2)); clustering_key ck = clustering_key::from_single_value(*s, serialized(3)); api::timestamp_type insertion_timestamp_before_delete = api::new_timestamp(); forward_jump_clocks(1s); api::timestamp_type deletion_timestamp = api::new_timestamp(); forward_jump_clocks(1s); api::timestamp_type insertion_timestamp_after_delete = api::new_timestamp(); mutation m_delete = mutation(s, pk); m_delete.partition().apply_delete( *s, ck_to_delete, tombstone{deletion_timestamp, gc_clock::now()}); t.apply(m_delete); // Insert data that won't be removed by tombstone to prevent compaction from skipping whole partition mutation m_insert = mutation(s, pk); m_insert.set_clustered_cell(ck, to_bytes("id"), data_value(3), insertion_timestamp_after_delete); t.apply(m_insert); // Flush and wait until the gc_grace_seconds pass t.flush().get(); forward_jump_clocks(2s); // Apply the past mutation to memtable to simulate repair. This row should be deleted by tombstone mutation m_past_insert = mutation(s, pk); m_past_insert.set_clustered_cell( ck_to_delete, to_bytes("id"), data_value(4), insertion_timestamp_before_delete); t.apply(m_past_insert); // Trigger compaction. If all goes well, compaction should check if a relevant row is in the memtable // and should not purge the tombstone. t.compact_all_sstables(tasks::task_info{}).get(); // If we get additional row (1, 2, 4), that means the tombstone was purged and data was resurrected assert_that(env.execute_cql(format("SELECT * FROM {}.{};", ks_name, table_name)).get()) .is_rows() .with_rows_ignore_order({ {serialized(1), serialized(3), serialized(3)}, }); }, db_config); } SEASTAR_TEST_CASE(failed_flush_prevents_writes) { #ifndef SCYLLA_ENABLE_ERROR_INJECTION std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n"; return make_ready_future<>(); #else auto db_config = make_shared(); db_config->unspooled_dirty_soft_limit.set(1.0); return do_with_cql_env_thread([](cql_test_env& env) { replica::database& db = env.local_db(); service::migration_manager& mm = env.migration_manager().local(); simple_schema ss; schema_ptr s = ss.schema(); auto group0_guard = mm.start_group0_operation().get(); auto ts = group0_guard.write_timestamp(); mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get(); replica::table& t = db.find_column_family("ks", "cf"); auto memtables = active_memtables(t); // Insert something so that we have data in memtable to flush // it has to be somewhat large, as automatic flushing picks the // largest memtable to flush mutation mt = {s, tests::generate_partition_key(s)}; for (uint32_t i = 0; i < 1000; ++i) { ss.add_row(mt, ss.make_ckey(i), format("{}", i)); } t.apply(mt); BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp); BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp); auto failed_memtables_flushes_count = db.cf_stats()->failed_memtables_flushes_count; utils::get_local_injector().enable("table_seal_active_memtable_add_memtable", true /* oneshot */); utils::get_local_injector().enable("table_seal_active_memtable_start_op", true /* oneshot */); utils::get_local_injector().enable("table_seal_active_memtable_try_flush", true /* oneshot */); utils::get_local_injector().enable("table_seal_active_memtable_reacquire_write_permit"); // Trigger flush auto f = t.flush(); BOOST_REQUIRE(eventually_true([&] { return db.cf_stats()->failed_memtables_flushes_count - failed_memtables_flushes_count >= 4; })); // The flush failed, make sure there is still data in memtable. BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp); BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp); utils::get_local_injector().disable("table_seal_active_memtable_reacquire_write_permit"); BOOST_REQUIRE(eventually_true([&] { // The error above is no longer being injected, so // seal_active_memtable retry loop should eventually succeed return t.min_memtable_timestamp() == api::max_timestamp && t.min_memtable_live_timestamp() == api::max_timestamp && t.min_memtable_live_row_marker_timestamp() == api::max_timestamp; })); std::move(f).get(); }, db_config); #endif } SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) { #ifdef DEBUG // This test was observed to take multiple minutes to run in debug mode on CI machines. // This test checks that a certain behaviour is triggered when compaction falls behind. // Not critical to run in debug mode. Both compaction and memtable have their own // correctness tests, which do run in debug mode. return make_ready_future<>(); #else BOOST_ASSERT(smp::count == 2); // The test simulates a situation where 2 threads issue flushes to 2 // tables. Both issue small flushes, but one has injected reactor stalls. // This can lead to a situation where lots of small sstables accumulate on // disk, and, if compaction never has a chance to keep up, resources can be // exhausted. return do_with_cql_env([](cql_test_env& env) -> future<> { struct flusher { cql_test_env& env; const int num_flushes; const int sleep_ms; static sstring cf_name(unsigned thread_id) { return format("cf_{}", thread_id); } static sstring ks_name() { return "ks"; } future<> create_table(schema_ptr s) { return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> { auto group0_guard = co_await mm.start_group0_operation(); auto ts = group0_guard.write_timestamp(); auto announcement = co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts); co_await mm.announce(std::move(announcement), std::move(group0_guard), ""); }); } future<> drop_table() { return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> { auto group0_guard = co_await mm.start_group0_operation(); auto ts = group0_guard.write_timestamp(); auto announcement = co_await service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), ks_name(), cf_name(shard), ts); co_await mm.announce(std::move(announcement), std::move(group0_guard), ""); }); } future<> operator()() { const sstring ks_name = this->ks_name(); const sstring cf_name = this->cf_name(this_shard_id()); random_mutation_generator gen{ random_mutation_generator::generate_counters::no, local_shard_only::yes, random_mutation_generator::generate_uncompactable::no, std::nullopt, ks_name.c_str(), cf_name.c_str() }; schema_ptr s = gen.schema(); co_await create_table(s); replica::database& db = env.local_db(); replica::table& t = db.find_column_family(ks_name, cf_name); for ([[maybe_unused]] int value : std::views::iota(0, num_flushes)) { ::usleep(sleep_ms * 1000); co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout); co_await t.flush(); BOOST_ASSERT(t.sstables_count() < size_t(t.schema()->max_compaction_threshold() * 4)); } co_await drop_table(); } }; int sleep_ms = 2; for ([[maybe_unused]] int i : std::views::iota(0, 8)) { future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0}); future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms}); co_await std::move(f0); co_await std::move(f1); sleep_ms *= 2; } }); #endif } static future<> exceptions_in_flush_helper(std::unique_ptr mep, bool& should_fail, const bool& did_fail, const schema*& schema_filter, bool expect_isolate) { auto ext = std::make_shared(); auto cfg = seastar::make_shared(ext); ext->add_sstable_file_io_extension("test", std::move(mep)); co_await do_with_cql_env([&](cql_test_env& env) -> future<> { co_await env.execute_cql(fmt::format("create table t0 (pk text primary key, v text)")); schema_filter = env.local_db().find_column_family("ks", "t0").schema().get(); should_fail = true; int i = 0; testlog.debug("Wait for fail"); auto f = make_ready_future<>(); while (!did_fail) { std::string pk = "apa" + std::to_string(i++); std::string v = "ko"; co_await env.execute_cql(fmt::format("insert into ks.t0 (pk, v) values ('{}', '{}')", pk, v)); f = f.then([&] { return env.db().invoke_on_all([] (replica::database& db) { return db.flush_all_memtables(); }); }); } BOOST_REQUIRE(did_fail); testlog.debug("Reset fail trigger"); should_fail = false; if (expect_isolate) { bool isolated = false; // can't use eventually_true here, because neither we nor the invoke on shard 0 is in seastar // thread. for (int i = 0; i < 10; ++i) { isolated = co_await env.get_storage_service().invoke_on(0, [&](service::storage_service& ss) { return ss.is_isolated(); }); if (isolated) { break; } // isolation is not syncnronous; co_await sleep(2s); } BOOST_REQUIRE(isolated); } testlog.debug("Trying to stop"); co_await std::move(f); }, cfg); } static future<> exceptions_in_flush_on_sstable_write_helper(std::function throw_func, bool expect_isolate = true) { class myext : public sstables::file_io_extension { public: bool should_fail = false; bool did_fail = false; const schema* schema_filter = nullptr; std::function throw_func; bool match_schema_filter(const schema& s) const { const auto ret = !schema_filter || schema_filter->id() == s.id(); testlog.info("exceptions_in_flush_on_sstable_write_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret); return ret; } future wrap_file(const sstable& t, component_type type, file f, open_flags flags) override { if (should_fail && match_schema_filter(*t.get_schema())) { class myimpl : public seastar::file_impl { file _file; myext& _myext; public: myimpl(file f, myext& ext) : _file(std::move(f)) , _myext(ext) {} void fail() const { if (_myext.should_fail) { _myext.did_fail = true; testlog.debug("Throwing exception"); _myext.throw_func(); } } future write_dma(uint64_t pos, const void* buffer, size_t len, io_intent* intent) override { fail(); return get_file_impl(_file)->write_dma(pos, buffer, len, intent); } future write_dma(uint64_t pos, std::vector iov, io_intent* intent) override { fail(); return get_file_impl(_file)->write_dma(pos, std::move(iov), intent); } future read_dma(uint64_t pos, void* buffer, size_t len, io_intent* intent) override { fail(); return get_file_impl(_file)->read_dma(pos, buffer, len, intent); } future read_dma(uint64_t pos, std::vector iov, io_intent* intent) override { fail(); return get_file_impl(_file)->read_dma(pos, std::move(iov), intent); } future<> flush(void) override { fail(); return get_file_impl(_file)->flush(); } future stat(void) override { fail(); return get_file_impl(_file)->stat(); } future<> truncate(uint64_t length) override { fail(); return get_file_impl(_file)->truncate(length); } future<> discard(uint64_t offset, uint64_t length) override { fail(); return get_file_impl(_file)->discard(offset, length); } future<> allocate(uint64_t position, uint64_t length) override { fail(); return get_file_impl(_file)->allocate(position, length); } future size(void) override { fail(); return get_file_impl(_file)->size(); } future<> close() override { fail(); return get_file_impl(_file)->close(); } std::unique_ptr dup() override { fail(); return get_file_impl(_file)->dup(); } subscription list_directory(std::function (directory_entry de)> next) override { fail(); return get_file_impl(_file)->list_directory(std::move(next)); } future> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override { fail(); return get_file_impl(_file)->dma_read_bulk(offset, range_size, intent); } }; co_return file(make_shared(std::move(f), *this)); } co_return f; } }; auto mep = std::make_unique(); auto& me = *mep; me.throw_func = std::move(throw_func); co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate); } SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_write) { co_await exceptions_in_flush_on_sstable_write_helper( [] { throw std::system_error(EACCES, std::system_category()); } ); } SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_write) { co_await exceptions_in_flush_on_sstable_write_helper( [] { throw db::extension_storage_permission_error(get_name()); } ); } SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_write) { co_await exceptions_in_flush_on_sstable_write_helper( [] { throw db::extension_storage_resource_unavailable(get_name()); } , false // equal no ENOENT ); } SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_write) { co_await exceptions_in_flush_on_sstable_write_helper( [] { throw db::extension_storage_misconfigured(get_name()); } ); } static future<> exceptions_in_flush_on_sstable_open_helper(std::function throw_func, bool expect_isolate = true) { auto ext = std::make_shared(); auto cfg = seastar::make_shared(ext); class myext : public sstables::file_io_extension { public: bool should_fail = false; bool did_fail = false; const schema* schema_filter = nullptr; std::function throw_func; bool match_schema_filter(const schema& s) const { const auto ret = !schema_filter || schema_filter->id() == s.id(); testlog.info("exceptions_in_flush_on_sstable_open_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret); return ret; } future wrap_file(const sstable& t, component_type type, file f, open_flags flags) override { if (should_fail && match_schema_filter(*t.get_schema())) { did_fail = true; testlog.debug("Throwing exception"); throw_func(); } co_return f; } }; auto mep = std::make_unique(); auto& me = *mep; me.throw_func = std::move(throw_func);; co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate); } SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_open) { co_await exceptions_in_flush_on_sstable_open_helper( [] { throw std::system_error(EACCES, std::system_category()); } ); } SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_open) { co_await exceptions_in_flush_on_sstable_open_helper( [] { throw db::extension_storage_permission_error(get_name()); } ); } SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_open) { co_await exceptions_in_flush_on_sstable_open_helper( [] { throw db::extension_storage_resource_unavailable(get_name()); } , false // equal no ENOENT ); } SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_open) { co_await exceptions_in_flush_on_sstable_open_helper( [] { throw db::extension_storage_misconfigured(get_name()); } ); } SEASTAR_TEST_CASE(memtable_reader_after_tablet_migration) { cql_test_config cfg; cfg.initial_tablets = 1; cfg.ms_listen = true; return do_with_cql_env_thread([](cql_test_env& env) { replica::database& db = env.local_db(); auto& ss = env.get_storage_service().local(); // This test needs specific tablet layout, disable the load balancer to // have manual control over it. ss.set_tablet_balancing_enabled(false).get(); // Create table and insert some data char const* table_name = "tbl"; env.execute_cql(format("CREATE TABLE ks.{} (pk int, ck int, v text, PRIMARY KEY(pk, ck));", table_name)).get(); auto& tbl = db.find_column_family("ks", table_name); const auto schema = tbl.schema(); BOOST_REQUIRE(tbl.uses_tablets()); const auto& tablet_map = db.get_token_metadata().tablets().get_tablet_map(schema->id()); BOOST_REQUIRE_EQUAL(tablet_map.tablet_count(), 1); const int32_t pk = 0; mutation expected_mut(schema, dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(pk)))); const api::timestamp_type ts = 100; const auto raw_v = utf8_type->decompose(sstring(1024, 'v')); const auto& v_def = *schema->get_column_definition(to_bytes("v")); // Add enough data to fill at least two buffers. for (int32_t ck = 0; size_t(ck) < (2 * mutation_reader::default_max_buffer_size_in_bytes()) / 1024; ++ck) { const auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(ck)); expected_mut.set_clustered_cell(ckey, v_def, atomic_cell::make_live(*v_def.type, ts, raw_v)); } const auto first_tablet_id = tablet_map.first_tablet(); const auto first_tablet_info = tablet_map.get_tablet_info(first_tablet_id); struct remote_data { schema_ptr schema; mutation expected_mut; mutation_reader reader; }; auto data_ptr = env.db().invoke_on(first_tablet_info.replicas.front().shard, [&table_name, fm = freeze(expected_mut)] (replica::database& db) -> future>> { auto& tbl = db.find_column_family("ks", table_name); const auto schema = tbl.schema(); co_await db.apply(schema, fm, {}, db::commitlog_force_sync::no, db::no_timeout); testlog.info("create reader -- first buffer fill"); auto reader = tbl.make_mutation_reader(schema, co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {}), query::full_partition_range, schema->full_slice()); std::exception_ptr ex; try { co_await reader.fill_buffer(); co_return make_foreign(std::make_unique(remote_data{schema, fm.unfreeze(schema), std::move(reader)})); } catch (...) { ex = std::current_exception(); } // If we are here, there was an exception, but check to be sure. SCYLLA_ASSERT(ex); co_await reader.close(); std::rethrow_exception(std::move(ex)); }).get(); // Migrate the tablet to another shard { const auto src = first_tablet_info.replicas.front(); auto dst = src; dst.shard = (src.shard + 1) % smp::count; // Closing the storage-group is done in the background, so it is fine // to wait for this. ss.move_tablet(schema->id(), tablet_map.get_last_token(first_tablet_id), src, dst).get(); } smp::submit_to(data_ptr.get_owner_shard(), [&data_ptr] { return async([&data_ptr] { testlog.info("exhaust reader"); auto data = data_ptr.release(); auto close_reader = deferred_close(data->reader); auto m_opt = read_mutation_from_mutation_reader(data->reader).get(); BOOST_REQUIRE(m_opt); BOOST_REQUIRE(data->reader.is_end_of_stream()); assert_that(*m_opt).is_equal_to(data->expected_mut); }); }).get(); }, cfg); } SEASTAR_THREAD_TEST_CASE(test_memtable_reader_abort) { simple_schema ss; const auto s = ss.schema(); tests::reader_concurrency_semaphore_wrapper semaphore; replica::table_stats tbl_stats; replica::memtable_table_shared_data table_shared_data; replica::dirty_memory_manager mgr; auto mt = make_lw_shared(s, mgr, table_shared_data, tbl_stats); auto pk = ss.make_pkey(0); auto pr = dht::partition_range::make_singular(pk); mutation m(s, pk); for (int i = 0; i < 10; ++i) { ss.add_row(m, ss.make_ckey(i), "v1"); } mt->apply(m); auto permit = semaphore.make_permit(); auto reader_opt = mt->make_mutation_reader_opt(s, permit, pr, s->full_slice()); BOOST_REQUIRE(reader_opt); auto close_reader = deferred_close(*reader_opt); permit.set_timeout(db::timeout_clock::now()); // Wait for timer to fire so the permit is timed out. BOOST_REQUIRE(eventually_true([&] { return bool(permit.get_abort_exception()); })); BOOST_REQUIRE_THROW((*reader_opt)().get(), named_semaphore_timed_out); } BOOST_AUTO_TEST_SUITE_END()