Files
scylladb/test/boost/memtable_test.cc
Calle Wilund 0013f22374 memtable_test::memtable_flush_period: Change sleep to use injection signal instead
Fixes: SCYLLADB-942

Adds an injection signal _from_ table::seal_active_memtable to allow us to
reliably wait for flushing. And does so.

Closes scylladb/scylladb#29070
2026-03-18 16:23:13 +02:00

1671 lines
69 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <boost/test/framework.hpp>
#include "replica/database.hh"
#include "db/config.hh"
#include "utils/assert.hh"
#include "utils/UUID_gen.hh"
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include "schema/schema_builder.hh"
#include <seastar/util/closeable.hh>
#include "service/migration_manager.hh"
#include <fmt/ranges.h>
#include <seastar/core/thread.hh>
#include "replica/memtable.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/mutation_reader_assertions.hh"
#include "test/lib/data_model.hh"
#include "test/lib/eventually.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/log.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/key_utils.hh"
#include "test/lib/sstable_utils.hh"
#include "utils/error_injection.hh"
#include "db/commitlog/commitlog.hh"
#include "test/lib/make_random_string.hh"
#include "db/extensions.hh"
#include "db/config.hh"
#include "service/storage_service.hh"
BOOST_AUTO_TEST_SUITE(memtable_test)
using namespace std::literals::chrono_literals;
static api::timestamp_type next_timestamp() {
static thread_local api::timestamp_type next_timestamp = 1;
return next_timestamp++;
}
static bytes make_unique_bytes() {
return to_bytes(fmt::to_string(utils::UUID_gen::get_time_UUID()));
}
static void set_column(mutation& m, const sstring& column_name) {
SCYLLA_ASSERT(m.schema()->get_column_definition(to_bytes(column_name))->type == bytes_type);
auto value = data_value(make_unique_bytes());
m.set_clustered_cell(clustering_key::make_empty(), to_bytes(column_name), value, next_timestamp());
}
static
mutation make_unique_mutation(schema_ptr s) {
return mutation(s, partition_key::from_single_value(*s, make_unique_bytes()));
}
// Returns a vector of empty mutations in ring order
utils::chunked_vector<mutation> make_ring(schema_ptr s, int n_mutations) {
utils::chunked_vector<mutation> ring;
for (int i = 0; i < n_mutations; ++i) {
ring.push_back(make_unique_mutation(s));
}
std::sort(ring.begin(), ring.end(), mutation_decorated_key_less_comparator());
return ring;
}
SEASTAR_TEST_CASE(test_memtable_conforms_to_mutation_source) {
return seastar::async([] {
run_mutation_source_tests([](schema_ptr s, const utils::chunked_vector<mutation>& partitions) {
auto mt = make_memtable(s, partitions);
logalloc::shard_tracker().full_compaction();
return mt->as_data_source();
});
});
}
static future<> test_memtable(void (*run_tests)(populate_fn_ex, bool)) {
return seastar::async([run_tests] {
tests::reader_concurrency_semaphore_wrapper semaphore;
lw_shared_ptr<replica::memtable> mt;
std::vector<mutation_reader> readers;
auto clear_readers = [&readers] {
parallel_for_each(readers, [] (mutation_reader& rd) {
return rd.close();
}).finally([&readers] {
readers.clear();
}).get();
};
auto cleanup_readers = defer([&] { clear_readers(); });
std::deque<dht::partition_range> ranges_storage;
lw_shared_ptr<bool> finished = make_lw_shared(false);
auto full_compaction_in_background = seastar::do_until([finished] {return *finished;}, [] {
// do_refresh_state is called when we detect a new partition snapshot version.
// If snapshot version changes in process of reading mutation fragments from a
// clustering range, the partition_snapshot_reader state is refreshed with saved
// last position of emitted row and range tombstone. full_compaction increases the
// change mark.
logalloc::shard_tracker().full_compaction();
return seastar::sleep(100us);
});
run_tests([&] (schema_ptr s, const utils::chunked_vector<mutation>& muts, gc_clock::time_point) {
clear_readers();
mt = make_lw_shared<replica::memtable>(s);
for (auto&& m : muts) {
mt->apply(m);
// Create reader so that each mutation is in a separate version
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), ranges_storage.emplace_back(dht::partition_range::make_singular(m.decorated_key())));
rd.set_max_buffer_size(1);
rd.fill_buffer().get();
readers.emplace_back(std::move(rd));
}
return mt->as_data_source();
}, true);
*finished = true;
full_compaction_in_background.get();
});
}
// plain
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_basic) {
return test_memtable(run_mutation_source_tests_plain_basic);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_reader_conversion) {
return test_memtable(run_mutation_source_tests_plain_reader_conversion);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_fragments_monotonic) {
return test_memtable(run_mutation_source_tests_plain_fragments_monotonic);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_read_back) {
return test_memtable(run_mutation_source_tests_plain_read_back);
}
// reverse
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_basic) {
return test_memtable(run_mutation_source_tests_reverse_basic);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_reader_conversion) {
return test_memtable(run_mutation_source_tests_reverse_reader_conversion);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_fragments_monotonic) {
return test_memtable(run_mutation_source_tests_reverse_fragments_monotonic);
}
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_read_back) {
return test_memtable(run_mutation_source_tests_reverse_read_back);
}
SEASTAR_TEST_CASE(test_memtable_flush_reader) {
// Memtable flush reader is severely limited, it always assumes that
// the full partition range is being read and that
// streamed_mutation::forwarding is set to no. Therefore, we cannot use
// run_mutation_source_tests() to test it.
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto make_memtable = [] (replica::dirty_memory_manager& mgr, replica::memtable_table_shared_data& table_shared_data, replica::table_stats& tbl_stats, utils::chunked_vector<mutation> muts) {
SCYLLA_ASSERT(!muts.empty());
auto mt = make_lw_shared<replica::memtable>(muts.front().schema(), mgr, table_shared_data, tbl_stats);
for (auto& m : muts) {
mt->apply(m);
}
return mt;
};
auto test_random_streams = [&] (random_mutation_generator&& gen) {
for (auto i = 0; i < 4; i++) {
replica::table_stats tbl_stats;
replica::memtable_table_shared_data table_shared_data;
replica::dirty_memory_manager mgr;
const auto muts = gen(4);
const auto now = gc_clock::now();
auto compacted_muts = muts;
for (auto& mut : compacted_muts) {
mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now, tombstone_gc_state::for_tests());
}
testlog.info("Simple read");
auto mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
.produces_compacted(compacted_muts[0], now)
.produces_compacted(compacted_muts[1], now)
.produces_compacted(compacted_muts[2], now)
.produces_compacted(compacted_muts[3], now)
.produces_end_of_stream();
testlog.info("Read with next_partition() calls between partition");
mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
.next_partition()
.produces_compacted(compacted_muts[0], now)
.next_partition()
.produces_compacted(compacted_muts[1], now)
.next_partition()
.produces_compacted(compacted_muts[2], now)
.next_partition()
.produces_compacted(compacted_muts[3], now)
.next_partition()
.produces_end_of_stream();
testlog.info("Read with next_partition() calls inside partitions");
mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
.produces_compacted(compacted_muts[0], now)
.produces_partition_start(muts[1].decorated_key(), muts[1].partition().partition_tombstone())
.next_partition()
.produces_compacted(compacted_muts[2], now)
.next_partition()
.produces_partition_start(muts[3].decorated_key(), muts[3].partition().partition_tombstone())
.next_partition()
.produces_end_of_stream();
}
};
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
});
}
SEASTAR_TEST_CASE(test_adding_a_column_during_reading_doesnt_affect_read_result) {
return seastar::async([] {
auto common_builder = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key);
auto s1 = common_builder
.with_column("v2", bytes_type, column_kind::regular_column)
.build();
auto s2 = common_builder
.with_column("v1", bytes_type, column_kind::regular_column) // new column
.with_column("v2", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto mt = make_lw_shared<replica::memtable>(s1);
utils::chunked_vector<mutation> ring = make_ring(s1, 3);
for (auto&& m : ring) {
set_column(m, "v2");
mt->apply(m);
}
auto check_rd_s1 = assert_that(mt->make_mutation_reader(s1, semaphore.make_permit()));
auto check_rd_s2 = assert_that(mt->make_mutation_reader(s2, semaphore.make_permit()));
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[0]);
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[0]);
mt->set_schema(s2);
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[1]);
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[1]);
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[2]);
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[2]);
check_rd_s1.produces_end_of_stream();
check_rd_s2.produces_end_of_stream();
assert_that(mt->make_mutation_reader(s1, semaphore.make_permit()))
.produces(ring[0])
.produces(ring[1])
.produces(ring[2])
.produces_end_of_stream();
assert_that(mt->make_mutation_reader(s2, semaphore.make_permit()))
.produces(ring[0])
.produces(ring[1])
.produces(ring[2])
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_unspooled_dirty_accounting_on_flush) {
return seastar::async([] {
schema_ptr s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("col", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
replica::dirty_memory_manager mgr;
replica::memtable_table_shared_data table_shared_data;
replica::table_stats tbl_stats;
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
utils::chunked_vector<mutation> ring = make_ring(s, 3);
utils::chunked_vector<mutation> current_ring;
for (auto&& m : ring) {
auto m_with_cell = m;
m_with_cell.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"),
data_value(bytes(bytes::initialized_later(), 4096)), next_timestamp());
mt->apply(m_with_cell);
current_ring.push_back(m_with_cell);
}
// Create a reader which will cause many partition versions to be created
mutation_reader_opt rd1 = mt->make_mutation_reader(s, semaphore.make_permit());
auto close_rd1 = deferred_close(*rd1);
rd1->set_max_buffer_size(1);
rd1->fill_buffer().get();
// Override large cell value with a short one
{
auto part0_update = ring[0];
part0_update.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"),
data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
mt->apply(std::move(part0_update));
current_ring[0] = part0_update;
}
std::vector<size_t> unspooled_dirty_values;
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
auto flush_reader_check = assert_that(mt->make_flush_reader(s, semaphore.make_permit()));
flush_reader_check.produces_partition(current_ring[0]);
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
flush_reader_check.produces_partition(current_ring[1]);
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
while ((*rd1)().get()) ;
close_rd1.close_now();
logalloc::shard_tracker().full_compaction();
flush_reader_check.produces_partition(current_ring[2]);
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
flush_reader_check.produces_end_of_stream();
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
std::reverse(unspooled_dirty_values.begin(), unspooled_dirty_values.end());
BOOST_REQUIRE(std::is_sorted(unspooled_dirty_values.begin(), unspooled_dirty_values.end()));
});
}
// Reproducer for #1753
SEASTAR_TEST_CASE(test_partition_version_consistency_after_lsa_compaction_happens) {
return seastar::async([] {
schema_ptr s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.with_column("col", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto mt = make_lw_shared<replica::memtable>(s);
auto empty_m = make_unique_mutation(s);
auto ck1 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
auto ck2 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
auto ck3 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
auto m1 = empty_m;
m1.set_clustered_cell(ck1, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
auto m2 = empty_m;
m2.set_clustered_cell(ck2, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
auto m3 = empty_m;
m3.set_clustered_cell(ck3, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
mt->apply(m1);
std::optional<mutation_reader_assertions> rd1 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd1->set_max_buffer_size(1);
rd1->fill_buffer().get();
mt->apply(m2);
std::optional<mutation_reader_assertions> rd2 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd2->set_max_buffer_size(1);
rd2->fill_buffer().get();
mt->apply(m3);
std::optional<mutation_reader_assertions> rd3 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd3->set_max_buffer_size(1);
rd3->fill_buffer().get();
logalloc::shard_tracker().full_compaction();
auto rd4 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd4.set_max_buffer_size(1);
rd4.fill_buffer().get();
auto rd5 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd5.set_max_buffer_size(1);
rd5.fill_buffer().get();
auto rd6 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd6.set_max_buffer_size(1);
rd6.fill_buffer().get();
rd1->next_mutation().is_equal_to(m1);
rd2->next_mutation().is_equal_to(m1 + m2);
rd3->next_mutation().is_equal_to(m1 + m2 + m3);
rd3 = {};
rd4.next_mutation().is_equal_to(m1 + m2 + m3);
rd1 = {};
rd5.next_mutation().is_equal_to(m1 + m2 + m3);
rd2 = {};
rd6.next_mutation().is_equal_to(m1 + m2 + m3);
});
}
// Reproducer for #1746
SEASTAR_TEST_CASE(test_segment_migration_during_flush) {
return seastar::async([] {
schema_ptr s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("ck", bytes_type, column_kind::clustering_key)
.with_column("col", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
replica::table_stats tbl_stats;
replica::memtable_table_shared_data table_shared_data;
replica::dirty_memory_manager mgr;
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
const int rows_per_partition = 300;
const int partitions = 3;
utils::chunked_vector<mutation> ring = make_ring(s, partitions);
for (auto& m : ring) {
for (int i = 0; i < rows_per_partition; ++i) {
auto ck = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
auto col_value = data_value(bytes(bytes::initialized_later(), 8));
m.set_clustered_cell(ck, to_bytes("col"), col_value, next_timestamp());
}
mt->apply(m);
}
auto rd = mt->make_flush_reader(s, semaphore.make_permit());
auto close_rd = deferred_close(rd);
for (int i = 0; i < partitions; ++i) {
auto mfopt = rd().get();
BOOST_REQUIRE(bool(mfopt));
BOOST_REQUIRE(mfopt->is_partition_start());
while (!mfopt->is_end_of_partition()) {
logalloc::shard_tracker().full_compaction();
mfopt = rd().get();
}
BOOST_REQUIRE_LE(mgr.unspooled_dirty_memory(), mgr.real_dirty_memory());
}
BOOST_REQUIRE(!rd().get());
});
}
// Reproducer for #2854
SEASTAR_TEST_CASE(test_fast_forward_to_after_memtable_is_flushed) {
return seastar::async([] {
schema_ptr s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("col", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
utils::chunked_vector<mutation> ring = make_ring(s, 5);
auto mt = make_memtable(s, ring);
auto mt2 = make_memtable(s, ring);
auto rd = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
rd.produces(ring[0]);
mt->mark_flushed(mt2->as_data_source());
rd.produces(ring[1]);
auto range = dht::partition_range::make_starting_with(dht::ring_position(ring[3].decorated_key()));
rd.fast_forward_to(range);
rd.produces(ring[3]).produces(ring[4]).produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_exception_safety_of_partition_range_reads) {
return seastar::async([] {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
utils::chunked_vector<mutation> ms = gen(2);
auto mt = make_memtable(s, ms);
memory::with_allocation_failures([&] {
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range))
.produces(ms);
});
});
}
SEASTAR_TEST_CASE(test_exception_safety_of_flush_reads) {
return seastar::async([] {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
utils::chunked_vector<mutation> ms = gen(2);
auto mt = make_memtable(s, ms);
memory::with_allocation_failures([&] {
auto revert = defer([&] {
mt->revert_flushed_memory();
});
assert_that(mt->make_flush_reader(s, semaphore.make_permit()))
.produces(ms);
});
});
}
SEASTAR_TEST_CASE(test_exception_safety_of_single_partition_reads) {
return seastar::async([] {
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
auto s = gen.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
utils::chunked_vector<mutation> ms = gen(2);
auto mt = make_memtable(s, ms);
memory::with_allocation_failures([&] {
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), dht::partition_range::make_singular(ms[1].decorated_key())))
.produces(ms[1]);
});
});
}
SEASTAR_THREAD_TEST_CASE(test_tombstone_compaction_during_flush) {
tests::reader_concurrency_semaphore_wrapper semaphore;
simple_schema ss;
auto s = ss.schema();
auto mt = make_lw_shared<replica::memtable>(ss.schema());
auto pk = ss.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
int n_rows = 10000;
{
mutation m(ss.schema(), pk);
for (int i = 0; i < n_rows; ++i) {
ss.add_row(m, ss.make_ckey(i), "v1");
}
mt->apply(m);
}
auto rd1 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_rd1 = defer([&] { rd1.close().get(); });
rd1.fill_buffer().get();
mutation rt_m(ss.schema(), pk);
auto rt = ss.delete_range(rt_m, ss.make_ckey_range(0, n_rows));
mt->apply(rt_m);
auto rd2 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_rd2 = defer([&] { rd2.close().get(); });
rd2.fill_buffer().get();
mt->apply(rt_m); // whatever
auto flush_rd = mt->make_flush_reader(ss.schema(), semaphore.make_permit());
auto close_flush_rd = defer([&] { flush_rd.close().get(); });
while (!flush_rd.is_end_of_stream()) {
flush_rd().get();
}
{ auto close_rd = std::move(close_rd2); }
{ auto rd = std::move(rd2); }
{ auto close_rd = std::move(close_rd1); }
{ auto rd = std::move(rd1); }
mt->cleaner().drain().get();
}
SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_multiple_versions) {
tests::reader_concurrency_semaphore_wrapper semaphore;
simple_schema ss;
auto s = ss.schema();
auto mt = make_lw_shared<replica::memtable>(ss.schema());
auto pk = ss.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
auto t0 = ss.new_tombstone();
auto t1 = ss.new_tombstone();
auto t2 = ss.new_tombstone();
auto t3 = ss.new_tombstone();
mutation m1(s, pk);
ss.delete_range(m1, *position_range_to_clustering_range(position_range(
position_in_partition::before_key(ss.make_ckey(0)),
position_in_partition::for_key(ss.make_ckey(3))), *s), t1);
ss.add_row(m1, ss.make_ckey(0), "v");
ss.add_row(m1, ss.make_ckey(1), "v");
// Fill so that rd1 stays in the partition snapshot
int n_rows = 1000;
auto v = make_random_string(512);
for (int i = 0; i < n_rows; ++i) {
ss.add_row(m1, ss.make_ckey(i), v);
}
mutation m2(s, pk);
ss.delete_range(m2, *position_range_to_clustering_range(position_range(
position_in_partition::before_key(ss.make_ckey(0)),
position_in_partition::before_key(ss.make_ckey(1))), *s), t2);
ss.delete_range(m2, *position_range_to_clustering_range(position_range(
position_in_partition::before_key(ss.make_ckey(1)),
position_in_partition::for_key(ss.make_ckey(3))), *s), t3);
mutation m3(s, pk);
ss.delete_range(m3, *position_range_to_clustering_range(position_range(
position_in_partition::before_key(ss.make_ckey(0)),
position_in_partition::for_key(ss.make_ckey(4))), *s), t0);
mt->apply(m1);
auto rd1 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_rd1 = defer([&] { rd1.close().get(); });
rd1.fill_buffer().get();
BOOST_REQUIRE(!rd1.is_end_of_stream()); // rd1 must keep the m1 version alive
mt->apply(m2);
auto rd2 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_r2 = defer([&] { rd2.close().get(); });
rd2.fill_buffer().get();
BOOST_REQUIRE(!rd2.is_end_of_stream()); // rd2 must keep the m1 version alive
mt->apply(m3);
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
.has_monotonic_positions();
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
.produces(m1 + m2 + m3);
}
SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_mvcc_and_preemption) {
tests::reader_concurrency_semaphore_wrapper semaphore;
simple_schema ss;
auto s = ss.schema();
auto mt = make_lw_shared<replica::memtable>(ss.schema());
auto pk = ss.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
// Produce large m0 so that merging range tombstone from m1 into m0 is likely to be preempted in the middle.
int n_tombstones = 10000;
int key_delta_per_tombstone = 3;
mutation m0(s, pk);
{
int key = 0;
for (int i = 0; i < n_tombstones; ++i) {
ss.add_row(m0, ss.make_ckey(key), "value");
key += key_delta_per_tombstone;
}
}
mt->apply(m0);
std::optional<mutation_reader> rd0 = mt->make_mutation_reader(
s, semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_rd0 = defer([&] { rd0->close().get(); });
rd0->fill_buffer().get();
BOOST_REQUIRE(!rd0->is_end_of_stream());
auto k1 = n_tombstones * key_delta_per_tombstone / 3;
auto k2 = k1 + n_tombstones * key_delta_per_tombstone / 2;
mutation m1(s, pk);
ss.delete_range(m1, ss.make_ckey_range(k1, k2));
mt->apply(m1);
std::optional<mutation_reader> rd1 = mt->make_mutation_reader(
s, semaphore.make_permit(), pr, s->full_slice(),
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto close_rd1 = defer([&] { rd1->close().get(); });
rd1->fill_buffer().get();
BOOST_REQUIRE(!rd1->is_end_of_stream());
// Trigger merging of m1 into m0.
close_rd0.cancel();
rd0->close().get();
rd0 = {};
mutation m2(s, pk);
ss.delete_range(m2, ss.make_ckey_range(0, 1));
// Shadow earlier range tombstone in m1 to test whether applying this range tombstone
// to m1 (from m2) while m1 is still merging its range tombstones to m0 doesn't
// lead to loss of information from m2 due to the way preemption is handled in m1 -> m0 merging.
ss.delete_range(m2, ss.make_ckey_range(k1, k2));
mt->apply(m2);
// Trigger merging of m2 into m1.
// Some of it will complete immediately.
// Let's see if updates of m1 from m2 are not lost while merging of m1 into m0 is still in progress.
close_rd1.cancel();
rd1->close().get();
rd1 = {};
// Wait for merging to complete so that we read the final result later.
mt->cleaner().drain().get();
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
.produces(m0 + m1 + m2);
}
SEASTAR_THREAD_TEST_CASE(test_range_tombstones_are_compacted_with_data) {
tests::reader_concurrency_semaphore_wrapper semaphore;
simple_schema ss;
auto s = ss.schema();
auto mt = make_lw_shared<replica::memtable>(ss.schema());
auto pk = ss.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
auto old_tombstone = ss.new_tombstone(); // older than any write, does not cover anything
{
mutation m(ss.schema(), pk);
ss.add_row(m, ss.make_ckey(1), "v1");
ss.add_row(m, ss.make_ckey(2), "v1");
ss.add_row(m, ss.make_ckey(3), "v1");
ss.add_row(m, ss.make_ckey(4), "v1");
mt->apply(m);
}
mutation rt_m(ss.schema(), pk);
auto rt = ss.delete_range(rt_m, ss.make_ckey_range(2,3));
mt->apply(rt_m);
mt->cleaner().drain().get();
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
.produces_partition_start(pk)
.produces_row_with_key(ss.make_ckey(1))
.produces_range_tombstone_change({rt.position(), rt.tomb})
.produces_range_tombstone_change({rt.end_position(), {}})
.produces_row_with_key(ss.make_ckey(4))
.produces_partition_end()
.produces_end_of_stream();
{
mutation m(ss.schema(), pk);
m.partition().apply(old_tombstone);
mt->apply(m);
mt->cleaner().drain().get();
}
// No change
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
.produces_partition_start(pk, {old_tombstone})
.produces_row_with_key(ss.make_ckey(1))
.produces_range_tombstone_change({rt.position(), rt.tomb})
.produces_range_tombstone_change({rt.end_position(), {}})
.produces_row_with_key(ss.make_ckey(4))
.produces_partition_end()
.produces_end_of_stream();
auto new_tomb = ss.new_tombstone();
{
mutation m(ss.schema(), pk);
m.partition().apply(new_tomb);
mt->apply(m);
mt->cleaner().drain().get();
}
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
.produces_partition_start(pk, {new_tomb})
.produces_range_tombstone_change({rt.position(), rt.tomb})
.produces_range_tombstone_change({rt.end_position(), {}})
.produces_partition_end()
.produces_end_of_stream();
}
SEASTAR_TEST_CASE(test_hash_is_cached) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", bytes_type, column_kind::partition_key)
.with_column("v", bytes_type, column_kind::regular_column)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto mt = make_lw_shared<replica::memtable>(s);
auto m = make_unique_mutation(s);
set_column(m, "v");
mt->apply(m);
{
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
{
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice);
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
set_column(m, "v");
mt->apply(m);
{
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
{
auto slice = s->full_slice();
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice);
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
auto close_rd = deferred_close(rd);
rd().get()->as_partition_start();
clustering_row row = std::move(*rd().get()).as_clustering_row();
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});
}
SEASTAR_THREAD_TEST_CASE(test_collecting_encoding_stats) {
auto random_int32_value = [] {
return int32_type->decompose(tests::random::get_int<int32_t>());
};
auto now = gc_clock::now();
auto td = tests::data_model::table_description({ { "pk", int32_type } }, { { "ck", utf8_type } });
auto td1 = td;
td1.add_static_column("s1", int32_type);
td1.add_regular_column("v1", int32_type);
td1.add_regular_column("v2", int32_type);
auto built_schema = td1.build();
auto s = built_schema.schema;
auto md1 = tests::data_model::mutation_description({ to_bytes("pk1") });
md1.add_clustered_row_marker({ to_bytes("ck1") });
md1.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value());
auto m1 = md1.build(s);
auto md2 = tests::data_model::mutation_description({ to_bytes("pk2") });
auto md2_ttl = gc_clock::duration(std::chrono::seconds(1));
api::timestamp_type md2_timestamp = -10;
md2.add_clustered_row_marker({ to_bytes("ck1") }, md2_timestamp);
md2.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value());
md2.add_clustered_cell({ to_bytes("ck2") }, "v2",
tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md2_ttl, now + md2_ttl));
auto m2 = md2.build(s);
auto md3 = tests::data_model::mutation_description({ to_bytes("pk3") });
auto md3_ttl = gc_clock::duration(std::chrono::seconds(2));
auto md3_expiry_point = now - std::chrono::hours(8);
md3.add_static_cell("s1",
tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md3_ttl, md3_expiry_point));
auto m3 = md3.build(s);
auto md4 = tests::data_model::mutation_description({ to_bytes("pk1") });
auto md4_tombstone = tombstone(md2_timestamp - 10, now - std::chrono::hours(9));
md4.set_partition_tombstone(md4_tombstone);
auto m4 = md4.build(s);
auto mt = make_lw_shared<replica::memtable>(s);
auto stats = mt->get_encoding_stats();
BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max());
BOOST_CHECK_EQUAL(stats.min_timestamp, api::max_timestamp);
BOOST_CHECK(stats.min_ttl == gc_clock::duration::max());
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), 0);
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), 0);
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), api::max_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), api::max_timestamp);
mt->apply(m1);
stats = mt->get_encoding_stats();
BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max());
BOOST_CHECK_EQUAL(stats.min_timestamp, tests::data_model::data_timestamp);
BOOST_CHECK(stats.min_ttl == gc_clock::duration::max());
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), tests::data_model::data_timestamp);
mt->apply(m2);
stats = mt->get_encoding_stats();
BOOST_CHECK(stats.min_local_deletion_time == now + md2_ttl);
BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp);
BOOST_CHECK(stats.min_ttl == md2_ttl);
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp);
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
mt->apply(m3);
stats = mt->get_encoding_stats();
BOOST_CHECK(stats.min_local_deletion_time == md3_expiry_point);
BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp);
BOOST_CHECK(stats.min_ttl == md2_ttl);
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp);
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
mt->apply(m4);
stats = mt->get_encoding_stats();
BOOST_CHECK(stats.min_local_deletion_time == md4_tombstone.deletion_time);
BOOST_CHECK_EQUAL(stats.min_timestamp, md4_tombstone.timestamp);
BOOST_CHECK(stats.min_ttl == md2_ttl);
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md4_tombstone.timestamp);
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
}
SEASTAR_TEST_CASE(memtable_flush_compresses_mutations) {
auto db_config = make_shared<db::config>();
db_config->enable_cache.set(false);
return do_with_cql_env_thread([](cql_test_env& env) {
// Create table and insert some data
char const* ks_name = "keyspace_name";
char const* table_name = "table_name";
env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get();
env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
replica::database& db = env.local_db();
replica::table& t = db.find_column_family(ks_name, table_name);
tests::reader_concurrency_semaphore_wrapper semaphore;
schema_ptr s = t.schema();
// Build expected mutation with partition key: 1, clustering_key: 2 and value of id column: 3
dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1)));
clustering_key ck = clustering_key::from_single_value(*s, serialized(2));
mutation m1 = mutation(s, pk);
m1.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp());
mutation m2 = mutation(s, pk);
m2.partition().apply_delete(*s, clustering_key_prefix::from_singular(*s, 2), tombstone{api::new_timestamp(), gc_clock::now()});
t.apply(m1);
t.apply(m2);
// Flush to make sure all the modifications make it to disk
t.flush().get();
// Treat the table as mutation_source and SCYLLA_ASSERT we get the expected mutation and end of stream
mutation_source ms = t.as_mutation_source();
assert_that(ms.make_mutation_reader(s, semaphore.make_permit()))
.produces(m2)
.produces_end_of_stream();
}, db_config);
}
static auto check_has_error_injection() {
return boost::unit_test::precondition([](auto){
return
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
true
#else
false
#endif
;
});
}
SEASTAR_TEST_CASE(memtable_flush_period, *check_has_error_injection()) {
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
auto db_config = make_shared<db::config>();
db_config->enable_cache.set(false);
return do_with_cql_env_thread([](cql_test_env& env) {
// Create table and insert some data
char const* ks_name = "keyspace_name";
char const* table_name = "table_name";
env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get();
env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
replica::database& db = env.local_db();
replica::table& t = db.find_column_family(ks_name, table_name);
tests::reader_concurrency_semaphore_wrapper semaphore;
auto s1 = t.schema();
dht::decorated_key pk = dht::decorate_key(*s1, partition_key::from_single_value(*s1, serialized(1)));
clustering_key ck = clustering_key::from_single_value(*s1, serialized(2));
mutation m = mutation(s1, pk);
m.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp());
t.apply(m);
BOOST_REQUIRE_EQUAL(t.sstables_count(), 0); // add mutation and check there are no sstables for this table
auto& errj = utils::get_local_injector();
errj.enable("table_seal_post_flush_waiters", true);
// change schema to set memtable flush period
// we use small value in this test but it is impossible to set the period less than 60000ms using ALTER TABLE construction
schema_builder b(t.schema());
b.set_memtable_flush_period(200);
schema_ptr s2 = b.build();
t.set_schema(s2);
BOOST_TEST_MESSAGE("Wait for flush");
errj.inject("table_seal_post_flush_waiters", utils::wait_for_message(std::chrono::minutes(2))).get();
BOOST_TEST_MESSAGE("Flush received");
BOOST_REQUIRE(eventually_true([&] { // wait until memtable will be flushed at least once
return t.sstables_count() == 1;
}));
// Check mutation presents in the table
mutation_source ms = t.as_mutation_source();
assert_that(ms.make_mutation_reader(s2, semaphore.make_permit()))
.produces(m)
.produces_end_of_stream();
}, db_config);
#else
BOOST_TEST_MESSAGE("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev)");
return make_ready_future<>();
#endif
}
SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) {
auto db_config = make_shared<db::config>();
db_config->enable_cache.set(false);
return do_with_cql_env_thread([](cql_test_env& env) {
replica::database& db = env.local_db();
service::migration_manager& mm = env.migration_manager().local();
sstring ks_name = "ks";
sstring table_name = "table_name";
schema_ptr s = schema_builder(ks_name, table_name)
.with_column(to_bytes("pk"), int32_type, column_kind::partition_key)
.with_column(to_bytes("ck"), int32_type, column_kind::clustering_key)
.with_column(to_bytes("id"), int32_type)
.set_gc_grace_seconds(1)
.build();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
replica::table& t = db.find_column_family(ks_name, table_name);
dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1)));
clustering_key ck_to_delete = clustering_key::from_single_value(*s, serialized(2));
clustering_key ck = clustering_key::from_single_value(*s, serialized(3));
api::timestamp_type insertion_timestamp_before_delete = api::new_timestamp();
forward_jump_clocks(1s);
api::timestamp_type deletion_timestamp = api::new_timestamp();
forward_jump_clocks(1s);
api::timestamp_type insertion_timestamp_after_delete = api::new_timestamp();
mutation m_delete = mutation(s, pk);
m_delete.partition().apply_delete(
*s,
ck_to_delete,
tombstone{deletion_timestamp, gc_clock::now()});
t.apply(m_delete);
// Insert data that won't be removed by tombstone to prevent compaction from skipping whole partition
mutation m_insert = mutation(s, pk);
m_insert.set_clustered_cell(ck, to_bytes("id"), data_value(3), insertion_timestamp_after_delete);
t.apply(m_insert);
// Flush and wait until the gc_grace_seconds pass
t.flush().get();
forward_jump_clocks(2s);
// Apply the past mutation to memtable to simulate repair. This row should be deleted by tombstone
mutation m_past_insert = mutation(s, pk);
m_past_insert.set_clustered_cell(
ck_to_delete,
to_bytes("id"),
data_value(4),
insertion_timestamp_before_delete);
t.apply(m_past_insert);
// Trigger compaction. If all goes well, compaction should check if a relevant row is in the memtable
// and should not purge the tombstone.
t.compact_all_sstables(tasks::task_info{}).get();
// If we get additional row (1, 2, 4), that means the tombstone was purged and data was resurrected
assert_that(env.execute_cql(format("SELECT * FROM {}.{};", ks_name, table_name)).get())
.is_rows()
.with_rows_ignore_order({
{serialized(1), serialized(3), serialized(3)},
});
}, db_config);
}
SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n";
return make_ready_future<>();
#else
auto db_config = make_shared<db::config>();
db_config->unspooled_dirty_soft_limit.set(1.0);
return do_with_cql_env_thread([](cql_test_env& env) {
replica::database& db = env.local_db();
service::migration_manager& mm = env.migration_manager().local();
simple_schema ss;
schema_ptr s = ss.schema();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
replica::table& t = db.find_column_family("ks", "cf");
auto memtables = active_memtables(t);
// Insert something so that we have data in memtable to flush
// it has to be somewhat large, as automatic flushing picks the
// largest memtable to flush
mutation mt = {s, tests::generate_partition_key(s)};
for (uint32_t i = 0; i < 1000; ++i) {
ss.add_row(mt, ss.make_ckey(i), format("{}", i));
}
t.apply(mt);
BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp);
BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp);
auto failed_memtables_flushes_count = db.cf_stats()->failed_memtables_flushes_count;
utils::get_local_injector().enable("table_seal_active_memtable_add_memtable", true /* oneshot */);
utils::get_local_injector().enable("table_seal_active_memtable_start_op", true /* oneshot */);
utils::get_local_injector().enable("table_seal_active_memtable_try_flush", true /* oneshot */);
utils::get_local_injector().enable("table_seal_active_memtable_reacquire_write_permit");
// Trigger flush
auto f = t.flush();
BOOST_REQUIRE(eventually_true([&] {
return db.cf_stats()->failed_memtables_flushes_count - failed_memtables_flushes_count >= 4;
}));
// The flush failed, make sure there is still data in memtable.
BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp);
BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp);
utils::get_local_injector().disable("table_seal_active_memtable_reacquire_write_permit");
BOOST_REQUIRE(eventually_true([&] {
// The error above is no longer being injected, so
// seal_active_memtable retry loop should eventually succeed
return t.min_memtable_timestamp() == api::max_timestamp
&& t.min_memtable_live_timestamp() == api::max_timestamp
&& t.min_memtable_live_row_marker_timestamp() == api::max_timestamp;
}));
std::move(f).get();
}, db_config);
#endif
}
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
#ifdef DEBUG
// This test was observed to take multiple minutes to run in debug mode on CI machines.
// This test checks that a certain behaviour is triggered when compaction falls behind.
// Not critical to run in debug mode. Both compaction and memtable have their own
// correctness tests, which do run in debug mode.
return make_ready_future<>();
#else
BOOST_ASSERT(smp::count == 2);
// The test simulates a situation where 2 threads issue flushes to 2
// tables. Both issue small flushes, but one has injected reactor stalls.
// This can lead to a situation where lots of small sstables accumulate on
// disk, and, if compaction never has a chance to keep up, resources can be
// exhausted.
return do_with_cql_env([](cql_test_env& env) -> future<> {
struct flusher {
cql_test_env& env;
const int num_flushes;
const int sleep_ms;
static sstring cf_name(unsigned thread_id) {
return format("cf_{}", thread_id);
}
static sstring ks_name() {
return "ks";
}
future<> create_table(schema_ptr s) {
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard), "");
});
}
future<> drop_table() {
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), ks_name(), cf_name(shard), ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard), "");
});
}
future<> operator()() {
const sstring ks_name = this->ks_name();
const sstring cf_name = this->cf_name(this_shard_id());
random_mutation_generator gen{
random_mutation_generator::generate_counters::no,
local_shard_only::yes,
random_mutation_generator::generate_uncompactable::no,
std::nullopt,
ks_name.c_str(),
cf_name.c_str()
};
schema_ptr s = gen.schema();
co_await create_table(s);
replica::database& db = env.local_db();
replica::table& t = db.find_column_family(ks_name, cf_name);
for ([[maybe_unused]] int value : std::views::iota(0, num_flushes)) {
::usleep(sleep_ms * 1000);
co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout);
co_await t.flush();
BOOST_ASSERT(t.sstables_count() < size_t(t.schema()->max_compaction_threshold() * 4));
}
co_await drop_table();
}
};
int sleep_ms = 2;
for ([[maybe_unused]] int i : std::views::iota(0, 8)) {
future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0});
future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms});
co_await std::move(f0);
co_await std::move(f1);
sleep_ms *= 2;
}
});
#endif
}
static future<> exceptions_in_flush_helper(std::unique_ptr<sstables::file_io_extension> mep, bool& should_fail, const bool& did_fail, const schema*& schema_filter, bool expect_isolate) {
auto ext = std::make_shared<db::extensions>();
auto cfg = seastar::make_shared<db::config>(ext);
ext->add_sstable_file_io_extension("test", std::move(mep));
co_await do_with_cql_env([&](cql_test_env& env) -> future<> {
co_await env.execute_cql(fmt::format("create table t0 (pk text primary key, v text)"));
schema_filter = env.local_db().find_column_family("ks", "t0").schema().get();
should_fail = true;
int i = 0;
testlog.debug("Wait for fail");
auto f = make_ready_future<>();
while (!did_fail) {
std::string pk = "apa" + std::to_string(i++);
std::string v = "ko";
co_await env.execute_cql(fmt::format("insert into ks.t0 (pk, v) values ('{}', '{}')", pk, v));
f = f.then([&] {
return env.db().invoke_on_all([] (replica::database& db) {
return db.flush_all_memtables();
});
});
}
BOOST_REQUIRE(did_fail);
testlog.debug("Reset fail trigger");
should_fail = false;
if (expect_isolate) {
bool isolated = false;
// can't use eventually_true here, because neither we nor the invoke on shard 0 is in seastar
// thread.
for (int i = 0; i < 10; ++i) {
isolated = co_await env.get_storage_service().invoke_on(0, [&](service::storage_service& ss) {
return ss.is_isolated();
});
if (isolated) {
break;
}
// isolation is not syncnronous;
co_await sleep(2s);
}
BOOST_REQUIRE(isolated);
}
testlog.debug("Trying to stop");
co_await std::move(f);
}, cfg);
}
static future<> exceptions_in_flush_on_sstable_write_helper(std::function<void()> throw_func, bool expect_isolate = true) {
class myext : public sstables::file_io_extension {
public:
bool should_fail = false;
bool did_fail = false;
const schema* schema_filter = nullptr;
std::function<void()> throw_func;
bool match_schema_filter(const schema& s) const {
const auto ret = !schema_filter || schema_filter->id() == s.id();
testlog.info("exceptions_in_flush_on_sstable_write_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret);
return ret;
}
future<file> wrap_file(const sstable& t, component_type type, file f, open_flags flags) override {
if (should_fail && match_schema_filter(*t.get_schema())) {
class myimpl : public seastar::file_impl {
file _file;
myext& _myext;
public:
myimpl(file f, myext& ext)
: _file(std::move(f))
, _myext(ext)
{}
void fail() const {
if (_myext.should_fail) {
_myext.did_fail = true;
testlog.debug("Throwing exception");
_myext.throw_func();
}
}
future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent* intent) override {
fail();
return get_file_impl(_file)->write_dma(pos, buffer, len, intent);
}
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
fail();
return get_file_impl(_file)->write_dma(pos, std::move(iov), intent);
}
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent* intent) override {
fail();
return get_file_impl(_file)->read_dma(pos, buffer, len, intent);
}
future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
fail();
return get_file_impl(_file)->read_dma(pos, std::move(iov), intent);
}
future<> flush(void) override {
fail();
return get_file_impl(_file)->flush();
}
future<struct stat> stat(void) override {
fail();
return get_file_impl(_file)->stat();
}
future<> truncate(uint64_t length) override {
fail();
return get_file_impl(_file)->truncate(length);
}
future<> discard(uint64_t offset, uint64_t length) override {
fail();
return get_file_impl(_file)->discard(offset, length);
}
future<> allocate(uint64_t position, uint64_t length) override {
fail();
return get_file_impl(_file)->allocate(position, length);
}
future<uint64_t> size(void) override {
fail();
return get_file_impl(_file)->size();
}
future<> close() override {
fail();
return get_file_impl(_file)->close();
}
std::unique_ptr<seastar::file_handle_impl> dup() override {
fail();
return get_file_impl(_file)->dup();
}
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override {
fail();
return get_file_impl(_file)->list_directory(std::move(next));
}
future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override {
fail();
return get_file_impl(_file)->dma_read_bulk(offset, range_size, intent);
}
};
co_return file(make_shared<myimpl>(std::move(f), *this));
}
co_return f;
}
};
auto mep = std::make_unique<myext>();
auto& me = *mep;
me.throw_func = std::move(throw_func);
co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate);
}
SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_write) {
co_await exceptions_in_flush_on_sstable_write_helper(
[] { throw std::system_error(EACCES, std::system_category()); }
);
}
SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_write) {
co_await exceptions_in_flush_on_sstable_write_helper(
[] { throw db::extension_storage_permission_error(get_name()); }
);
}
SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_write) {
co_await exceptions_in_flush_on_sstable_write_helper(
[] { throw db::extension_storage_resource_unavailable(get_name()); }
, false // equal no ENOENT
);
}
SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_write) {
co_await exceptions_in_flush_on_sstable_write_helper(
[] { throw db::extension_storage_misconfigured(get_name()); }
);
}
static future<> exceptions_in_flush_on_sstable_open_helper(std::function<void()> throw_func, bool expect_isolate = true) {
auto ext = std::make_shared<db::extensions>();
auto cfg = seastar::make_shared<db::config>(ext);
class myext : public sstables::file_io_extension {
public:
bool should_fail = false;
bool did_fail = false;
const schema* schema_filter = nullptr;
std::function<void()> throw_func;
bool match_schema_filter(const schema& s) const {
const auto ret = !schema_filter || schema_filter->id() == s.id();
testlog.info("exceptions_in_flush_on_sstable_open_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret);
return ret;
}
future<file> wrap_file(const sstable& t, component_type type, file f, open_flags flags) override {
if (should_fail && match_schema_filter(*t.get_schema())) {
did_fail = true;
testlog.debug("Throwing exception");
throw_func();
}
co_return f;
}
};
auto mep = std::make_unique<myext>();
auto& me = *mep;
me.throw_func = std::move(throw_func);;
co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate);
}
SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_open) {
co_await exceptions_in_flush_on_sstable_open_helper(
[] { throw std::system_error(EACCES, std::system_category()); }
);
}
SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_open) {
co_await exceptions_in_flush_on_sstable_open_helper(
[] { throw db::extension_storage_permission_error(get_name()); }
);
}
SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_open) {
co_await exceptions_in_flush_on_sstable_open_helper(
[] { throw db::extension_storage_resource_unavailable(get_name()); }
, false // equal no ENOENT
);
}
SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_open) {
co_await exceptions_in_flush_on_sstable_open_helper(
[] { throw db::extension_storage_misconfigured(get_name()); }
);
}
SEASTAR_TEST_CASE(memtable_reader_after_tablet_migration) {
cql_test_config cfg;
cfg.initial_tablets = 1;
cfg.ms_listen = true;
return do_with_cql_env_thread([](cql_test_env& env) {
replica::database& db = env.local_db();
auto& ss = env.get_storage_service().local();
// This test needs specific tablet layout, disable the load balancer to
// have manual control over it.
ss.set_tablet_balancing_enabled(false).get();
// Create table and insert some data
char const* table_name = "tbl";
env.execute_cql(format("CREATE TABLE ks.{} (pk int, ck int, v text, PRIMARY KEY(pk, ck));", table_name)).get();
auto& tbl = db.find_column_family("ks", table_name);
const auto schema = tbl.schema();
BOOST_REQUIRE(tbl.uses_tablets());
const auto& tablet_map = db.get_token_metadata().tablets().get_tablet_map(schema->id());
BOOST_REQUIRE_EQUAL(tablet_map.tablet_count(), 1);
const int32_t pk = 0;
mutation expected_mut(schema, dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(pk))));
const api::timestamp_type ts = 100;
const auto raw_v = utf8_type->decompose(sstring(1024, 'v'));
const auto& v_def = *schema->get_column_definition(to_bytes("v"));
// Add enough data to fill at least two buffers.
for (int32_t ck = 0; size_t(ck) < (2 * mutation_reader::default_max_buffer_size_in_bytes()) / 1024; ++ck) {
const auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(ck));
expected_mut.set_clustered_cell(ckey, v_def, atomic_cell::make_live(*v_def.type, ts, raw_v));
}
const auto first_tablet_id = tablet_map.first_tablet();
const auto first_tablet_info = tablet_map.get_tablet_info(first_tablet_id);
struct remote_data {
schema_ptr schema;
mutation expected_mut;
mutation_reader reader;
};
auto data_ptr = env.db().invoke_on(first_tablet_info.replicas.front().shard, [&table_name, fm = freeze(expected_mut)] (replica::database& db)
-> future<foreign_ptr<std::unique_ptr<remote_data>>> {
auto& tbl = db.find_column_family("ks", table_name);
const auto schema = tbl.schema();
co_await db.apply(schema, fm, {}, db::commitlog_force_sync::no, db::no_timeout);
testlog.info("create reader -- first buffer fill");
auto reader = tbl.make_mutation_reader(schema, co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {}), query::full_partition_range, schema->full_slice());
std::exception_ptr ex;
try {
co_await reader.fill_buffer();
co_return make_foreign(std::make_unique<remote_data>(remote_data{schema, fm.unfreeze(schema), std::move(reader)}));
} catch (...) {
ex = std::current_exception();
}
// If we are here, there was an exception, but check to be sure.
SCYLLA_ASSERT(ex);
co_await reader.close();
std::rethrow_exception(std::move(ex));
}).get();
// Migrate the tablet to another shard
{
const auto src = first_tablet_info.replicas.front();
auto dst = src;
dst.shard = (src.shard + 1) % smp::count;
// Closing the storage-group is done in the background, so it is fine
// to wait for this.
ss.move_tablet(schema->id(), tablet_map.get_last_token(first_tablet_id), src, dst).get();
}
smp::submit_to(data_ptr.get_owner_shard(), [&data_ptr] {
return async([&data_ptr] {
testlog.info("exhaust reader");
auto data = data_ptr.release();
auto close_reader = deferred_close(data->reader);
auto m_opt = read_mutation_from_mutation_reader(data->reader).get();
BOOST_REQUIRE(m_opt);
BOOST_REQUIRE(data->reader.is_end_of_stream());
assert_that(*m_opt).is_equal_to(data->expected_mut);
});
}).get();
}, cfg);
}
SEASTAR_THREAD_TEST_CASE(test_memtable_reader_abort) {
simple_schema ss;
const auto s = ss.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
replica::table_stats tbl_stats;
replica::memtable_table_shared_data table_shared_data;
replica::dirty_memory_manager mgr;
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
auto pk = ss.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);
mutation m(s, pk);
for (int i = 0; i < 10; ++i) {
ss.add_row(m, ss.make_ckey(i), "v1");
}
mt->apply(m);
auto permit = semaphore.make_permit();
auto reader_opt = mt->make_mutation_reader_opt(s, permit, pr, s->full_slice());
BOOST_REQUIRE(reader_opt);
auto close_reader = deferred_close(*reader_opt);
permit.set_timeout(db::timeout_clock::now());
// Wait for timer to fire so the permit is timed out.
BOOST_REQUIRE(eventually_true([&] { return bool(permit.get_abort_exception()); }));
BOOST_REQUIRE_THROW((*reader_opt)().get(), named_semaphore_timed_out);
}
BOOST_AUTO_TEST_SUITE_END()