Fixes: SCYLLADB-942 Adds an injection signal _from_ table::seal_active_memtable to allow us to reliably wait for flushing. And does so. Closes scylladb/scylladb#29070
1671 lines
69 KiB
C++
1671 lines
69 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <boost/test/unit_test.hpp>
|
|
#include <boost/test/framework.hpp>
|
|
#include "replica/database.hh"
|
|
#include "db/config.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/UUID_gen.hh"
|
|
#undef SEASTAR_TESTING_MAIN
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include "schema/schema_builder.hh"
|
|
#include <seastar/util/closeable.hh>
|
|
#include "service/migration_manager.hh"
|
|
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/core/thread.hh>
|
|
#include "replica/memtable.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/cql_assertions.hh"
|
|
#include "test/lib/mutation_source_test.hh"
|
|
#include "test/lib/mutation_assertions.hh"
|
|
#include "test/lib/mutation_reader_assertions.hh"
|
|
#include "test/lib/data_model.hh"
|
|
#include "test/lib/eventually.hh"
|
|
#include "test/lib/random_utils.hh"
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/simple_schema.hh"
|
|
#include "test/lib/reader_concurrency_semaphore.hh"
|
|
#include "test/lib/simple_schema.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
#include "test/lib/sstable_utils.hh"
|
|
#include "utils/error_injection.hh"
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "test/lib/make_random_string.hh"
|
|
#include "db/extensions.hh"
|
|
#include "db/config.hh"
|
|
#include "service/storage_service.hh"
|
|
|
|
BOOST_AUTO_TEST_SUITE(memtable_test)
|
|
|
|
using namespace std::literals::chrono_literals;
|
|
|
|
static api::timestamp_type next_timestamp() {
|
|
static thread_local api::timestamp_type next_timestamp = 1;
|
|
return next_timestamp++;
|
|
}
|
|
|
|
static bytes make_unique_bytes() {
|
|
return to_bytes(fmt::to_string(utils::UUID_gen::get_time_UUID()));
|
|
}
|
|
|
|
static void set_column(mutation& m, const sstring& column_name) {
|
|
SCYLLA_ASSERT(m.schema()->get_column_definition(to_bytes(column_name))->type == bytes_type);
|
|
auto value = data_value(make_unique_bytes());
|
|
m.set_clustered_cell(clustering_key::make_empty(), to_bytes(column_name), value, next_timestamp());
|
|
}
|
|
|
|
static
|
|
mutation make_unique_mutation(schema_ptr s) {
|
|
return mutation(s, partition_key::from_single_value(*s, make_unique_bytes()));
|
|
}
|
|
|
|
// Returns a vector of empty mutations in ring order
|
|
utils::chunked_vector<mutation> make_ring(schema_ptr s, int n_mutations) {
|
|
utils::chunked_vector<mutation> ring;
|
|
for (int i = 0; i < n_mutations; ++i) {
|
|
ring.push_back(make_unique_mutation(s));
|
|
}
|
|
std::sort(ring.begin(), ring.end(), mutation_decorated_key_less_comparator());
|
|
return ring;
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_conforms_to_mutation_source) {
|
|
return seastar::async([] {
|
|
run_mutation_source_tests([](schema_ptr s, const utils::chunked_vector<mutation>& partitions) {
|
|
auto mt = make_memtable(s, partitions);
|
|
logalloc::shard_tracker().full_compaction();
|
|
return mt->as_data_source();
|
|
});
|
|
});
|
|
}
|
|
|
|
static future<> test_memtable(void (*run_tests)(populate_fn_ex, bool)) {
|
|
return seastar::async([run_tests] {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
lw_shared_ptr<replica::memtable> mt;
|
|
std::vector<mutation_reader> readers;
|
|
auto clear_readers = [&readers] {
|
|
parallel_for_each(readers, [] (mutation_reader& rd) {
|
|
return rd.close();
|
|
}).finally([&readers] {
|
|
readers.clear();
|
|
}).get();
|
|
};
|
|
auto cleanup_readers = defer([&] { clear_readers(); });
|
|
std::deque<dht::partition_range> ranges_storage;
|
|
lw_shared_ptr<bool> finished = make_lw_shared(false);
|
|
auto full_compaction_in_background = seastar::do_until([finished] {return *finished;}, [] {
|
|
// do_refresh_state is called when we detect a new partition snapshot version.
|
|
// If snapshot version changes in process of reading mutation fragments from a
|
|
// clustering range, the partition_snapshot_reader state is refreshed with saved
|
|
// last position of emitted row and range tombstone. full_compaction increases the
|
|
// change mark.
|
|
logalloc::shard_tracker().full_compaction();
|
|
return seastar::sleep(100us);
|
|
});
|
|
run_tests([&] (schema_ptr s, const utils::chunked_vector<mutation>& muts, gc_clock::time_point) {
|
|
clear_readers();
|
|
mt = make_lw_shared<replica::memtable>(s);
|
|
|
|
for (auto&& m : muts) {
|
|
mt->apply(m);
|
|
// Create reader so that each mutation is in a separate version
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), ranges_storage.emplace_back(dht::partition_range::make_singular(m.decorated_key())));
|
|
rd.set_max_buffer_size(1);
|
|
rd.fill_buffer().get();
|
|
readers.emplace_back(std::move(rd));
|
|
}
|
|
|
|
return mt->as_data_source();
|
|
}, true);
|
|
*finished = true;
|
|
full_compaction_in_background.get();
|
|
});
|
|
}
|
|
|
|
// plain
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_basic) {
|
|
return test_memtable(run_mutation_source_tests_plain_basic);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_reader_conversion) {
|
|
return test_memtable(run_mutation_source_tests_plain_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_fragments_monotonic) {
|
|
return test_memtable(run_mutation_source_tests_plain_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_plain_read_back) {
|
|
return test_memtable(run_mutation_source_tests_plain_read_back);
|
|
}
|
|
|
|
// reverse
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_basic) {
|
|
return test_memtable(run_mutation_source_tests_reverse_basic);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_reader_conversion) {
|
|
return test_memtable(run_mutation_source_tests_reverse_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_fragments_monotonic) {
|
|
return test_memtable(run_mutation_source_tests_reverse_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_with_many_versions_conforms_to_mutation_source_reverse_read_back) {
|
|
return test_memtable(run_mutation_source_tests_reverse_read_back);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_memtable_flush_reader) {
|
|
// Memtable flush reader is severely limited, it always assumes that
|
|
// the full partition range is being read and that
|
|
// streamed_mutation::forwarding is set to no. Therefore, we cannot use
|
|
// run_mutation_source_tests() to test it.
|
|
return seastar::async([] {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
auto make_memtable = [] (replica::dirty_memory_manager& mgr, replica::memtable_table_shared_data& table_shared_data, replica::table_stats& tbl_stats, utils::chunked_vector<mutation> muts) {
|
|
SCYLLA_ASSERT(!muts.empty());
|
|
auto mt = make_lw_shared<replica::memtable>(muts.front().schema(), mgr, table_shared_data, tbl_stats);
|
|
for (auto& m : muts) {
|
|
mt->apply(m);
|
|
}
|
|
return mt;
|
|
};
|
|
|
|
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
|
for (auto i = 0; i < 4; i++) {
|
|
replica::table_stats tbl_stats;
|
|
replica::memtable_table_shared_data table_shared_data;
|
|
replica::dirty_memory_manager mgr;
|
|
const auto muts = gen(4);
|
|
const auto now = gc_clock::now();
|
|
auto compacted_muts = muts;
|
|
for (auto& mut : compacted_muts) {
|
|
mut.partition().compact_for_compaction(*mut.schema(), always_gc, mut.decorated_key(), now, tombstone_gc_state::for_tests());
|
|
}
|
|
|
|
testlog.info("Simple read");
|
|
auto mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
|
|
|
|
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
|
|
.produces_compacted(compacted_muts[0], now)
|
|
.produces_compacted(compacted_muts[1], now)
|
|
.produces_compacted(compacted_muts[2], now)
|
|
.produces_compacted(compacted_muts[3], now)
|
|
.produces_end_of_stream();
|
|
|
|
testlog.info("Read with next_partition() calls between partition");
|
|
mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
|
|
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
|
|
.next_partition()
|
|
.produces_compacted(compacted_muts[0], now)
|
|
.next_partition()
|
|
.produces_compacted(compacted_muts[1], now)
|
|
.next_partition()
|
|
.produces_compacted(compacted_muts[2], now)
|
|
.next_partition()
|
|
.produces_compacted(compacted_muts[3], now)
|
|
.next_partition()
|
|
.produces_end_of_stream();
|
|
|
|
testlog.info("Read with next_partition() calls inside partitions");
|
|
mt = make_memtable(mgr, table_shared_data, tbl_stats, muts);
|
|
assert_that(mt->make_flush_reader(gen.schema(), semaphore.make_permit()))
|
|
.produces_compacted(compacted_muts[0], now)
|
|
.produces_partition_start(muts[1].decorated_key(), muts[1].partition().partition_tombstone())
|
|
.next_partition()
|
|
.produces_compacted(compacted_muts[2], now)
|
|
.next_partition()
|
|
.produces_partition_start(muts[3].decorated_key(), muts[3].partition().partition_tombstone())
|
|
.next_partition()
|
|
.produces_end_of_stream();
|
|
}
|
|
};
|
|
|
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_adding_a_column_during_reading_doesnt_affect_read_result) {
|
|
return seastar::async([] {
|
|
auto common_builder = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key);
|
|
|
|
auto s1 = common_builder
|
|
.with_column("v2", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
auto s2 = common_builder
|
|
.with_column("v1", bytes_type, column_kind::regular_column) // new column
|
|
.with_column("v2", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s1);
|
|
|
|
utils::chunked_vector<mutation> ring = make_ring(s1, 3);
|
|
|
|
for (auto&& m : ring) {
|
|
set_column(m, "v2");
|
|
mt->apply(m);
|
|
}
|
|
|
|
auto check_rd_s1 = assert_that(mt->make_mutation_reader(s1, semaphore.make_permit()));
|
|
auto check_rd_s2 = assert_that(mt->make_mutation_reader(s2, semaphore.make_permit()));
|
|
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[0]);
|
|
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[0]);
|
|
mt->set_schema(s2);
|
|
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[1]);
|
|
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[1]);
|
|
check_rd_s1.next_mutation().has_schema(s1).is_equal_to(ring[2]);
|
|
check_rd_s2.next_mutation().has_schema(s2).is_equal_to(ring[2]);
|
|
check_rd_s1.produces_end_of_stream();
|
|
check_rd_s2.produces_end_of_stream();
|
|
|
|
assert_that(mt->make_mutation_reader(s1, semaphore.make_permit()))
|
|
.produces(ring[0])
|
|
.produces(ring[1])
|
|
.produces(ring[2])
|
|
.produces_end_of_stream();
|
|
|
|
assert_that(mt->make_mutation_reader(s2, semaphore.make_permit()))
|
|
.produces(ring[0])
|
|
.produces(ring[1])
|
|
.produces(ring[2])
|
|
.produces_end_of_stream();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_unspooled_dirty_accounting_on_flush) {
|
|
return seastar::async([] {
|
|
schema_ptr s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("col", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
replica::dirty_memory_manager mgr;
|
|
replica::memtable_table_shared_data table_shared_data;
|
|
replica::table_stats tbl_stats;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
|
|
|
|
utils::chunked_vector<mutation> ring = make_ring(s, 3);
|
|
utils::chunked_vector<mutation> current_ring;
|
|
|
|
for (auto&& m : ring) {
|
|
auto m_with_cell = m;
|
|
m_with_cell.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"),
|
|
data_value(bytes(bytes::initialized_later(), 4096)), next_timestamp());
|
|
mt->apply(m_with_cell);
|
|
current_ring.push_back(m_with_cell);
|
|
}
|
|
|
|
// Create a reader which will cause many partition versions to be created
|
|
mutation_reader_opt rd1 = mt->make_mutation_reader(s, semaphore.make_permit());
|
|
auto close_rd1 = deferred_close(*rd1);
|
|
rd1->set_max_buffer_size(1);
|
|
rd1->fill_buffer().get();
|
|
|
|
// Override large cell value with a short one
|
|
{
|
|
auto part0_update = ring[0];
|
|
part0_update.set_clustered_cell(clustering_key::make_empty(), to_bytes("col"),
|
|
data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
|
|
mt->apply(std::move(part0_update));
|
|
current_ring[0] = part0_update;
|
|
}
|
|
|
|
std::vector<size_t> unspooled_dirty_values;
|
|
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
|
|
|
|
auto flush_reader_check = assert_that(mt->make_flush_reader(s, semaphore.make_permit()));
|
|
flush_reader_check.produces_partition(current_ring[0]);
|
|
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
|
|
flush_reader_check.produces_partition(current_ring[1]);
|
|
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
|
|
|
|
while ((*rd1)().get()) ;
|
|
close_rd1.close_now();
|
|
|
|
logalloc::shard_tracker().full_compaction();
|
|
|
|
flush_reader_check.produces_partition(current_ring[2]);
|
|
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
|
|
flush_reader_check.produces_end_of_stream();
|
|
unspooled_dirty_values.push_back(mgr.unspooled_dirty_memory());
|
|
|
|
std::reverse(unspooled_dirty_values.begin(), unspooled_dirty_values.end());
|
|
BOOST_REQUIRE(std::is_sorted(unspooled_dirty_values.begin(), unspooled_dirty_values.end()));
|
|
});
|
|
}
|
|
|
|
// Reproducer for #1753
|
|
SEASTAR_TEST_CASE(test_partition_version_consistency_after_lsa_compaction_happens) {
|
|
return seastar::async([] {
|
|
schema_ptr s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("ck", bytes_type, column_kind::clustering_key)
|
|
.with_column("col", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s);
|
|
|
|
auto empty_m = make_unique_mutation(s);
|
|
auto ck1 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
|
|
auto ck2 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
|
|
auto ck3 = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
|
|
|
|
auto m1 = empty_m;
|
|
m1.set_clustered_cell(ck1, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
|
|
|
|
auto m2 = empty_m;
|
|
m2.set_clustered_cell(ck2, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
|
|
|
|
auto m3 = empty_m;
|
|
m3.set_clustered_cell(ck3, to_bytes("col"), data_value(bytes(bytes::initialized_later(), 8)), next_timestamp());
|
|
|
|
mt->apply(m1);
|
|
std::optional<mutation_reader_assertions> rd1 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd1->set_max_buffer_size(1);
|
|
rd1->fill_buffer().get();
|
|
|
|
mt->apply(m2);
|
|
std::optional<mutation_reader_assertions> rd2 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd2->set_max_buffer_size(1);
|
|
rd2->fill_buffer().get();
|
|
|
|
mt->apply(m3);
|
|
std::optional<mutation_reader_assertions> rd3 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd3->set_max_buffer_size(1);
|
|
rd3->fill_buffer().get();
|
|
|
|
logalloc::shard_tracker().full_compaction();
|
|
|
|
auto rd4 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd4.set_max_buffer_size(1);
|
|
rd4.fill_buffer().get();
|
|
auto rd5 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd5.set_max_buffer_size(1);
|
|
rd5.fill_buffer().get();
|
|
auto rd6 = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd6.set_max_buffer_size(1);
|
|
rd6.fill_buffer().get();
|
|
|
|
rd1->next_mutation().is_equal_to(m1);
|
|
rd2->next_mutation().is_equal_to(m1 + m2);
|
|
rd3->next_mutation().is_equal_to(m1 + m2 + m3);
|
|
rd3 = {};
|
|
|
|
rd4.next_mutation().is_equal_to(m1 + m2 + m3);
|
|
rd1 = {};
|
|
|
|
rd5.next_mutation().is_equal_to(m1 + m2 + m3);
|
|
rd2 = {};
|
|
|
|
rd6.next_mutation().is_equal_to(m1 + m2 + m3);
|
|
});
|
|
}
|
|
|
|
// Reproducer for #1746
|
|
SEASTAR_TEST_CASE(test_segment_migration_during_flush) {
|
|
return seastar::async([] {
|
|
schema_ptr s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("ck", bytes_type, column_kind::clustering_key)
|
|
.with_column("col", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
replica::table_stats tbl_stats;
|
|
replica::memtable_table_shared_data table_shared_data;
|
|
replica::dirty_memory_manager mgr;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
|
|
|
|
const int rows_per_partition = 300;
|
|
const int partitions = 3;
|
|
utils::chunked_vector<mutation> ring = make_ring(s, partitions);
|
|
|
|
for (auto& m : ring) {
|
|
for (int i = 0; i < rows_per_partition; ++i) {
|
|
auto ck = clustering_key::from_single_value(*s, serialized(make_unique_bytes()));
|
|
auto col_value = data_value(bytes(bytes::initialized_later(), 8));
|
|
m.set_clustered_cell(ck, to_bytes("col"), col_value, next_timestamp());
|
|
}
|
|
mt->apply(m);
|
|
}
|
|
|
|
auto rd = mt->make_flush_reader(s, semaphore.make_permit());
|
|
auto close_rd = deferred_close(rd);
|
|
|
|
for (int i = 0; i < partitions; ++i) {
|
|
auto mfopt = rd().get();
|
|
BOOST_REQUIRE(bool(mfopt));
|
|
BOOST_REQUIRE(mfopt->is_partition_start());
|
|
while (!mfopt->is_end_of_partition()) {
|
|
logalloc::shard_tracker().full_compaction();
|
|
mfopt = rd().get();
|
|
}
|
|
BOOST_REQUIRE_LE(mgr.unspooled_dirty_memory(), mgr.real_dirty_memory());
|
|
}
|
|
|
|
BOOST_REQUIRE(!rd().get());
|
|
});
|
|
}
|
|
|
|
// Reproducer for #2854
|
|
SEASTAR_TEST_CASE(test_fast_forward_to_after_memtable_is_flushed) {
|
|
return seastar::async([] {
|
|
schema_ptr s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("col", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
utils::chunked_vector<mutation> ring = make_ring(s, 5);
|
|
auto mt = make_memtable(s, ring);
|
|
auto mt2 = make_memtable(s, ring);
|
|
|
|
auto rd = assert_that(mt->make_mutation_reader(s, semaphore.make_permit()));
|
|
rd.produces(ring[0]);
|
|
mt->mark_flushed(mt2->as_data_source());
|
|
rd.produces(ring[1]);
|
|
auto range = dht::partition_range::make_starting_with(dht::ring_position(ring[3].decorated_key()));
|
|
rd.fast_forward_to(range);
|
|
rd.produces(ring[3]).produces(ring[4]).produces_end_of_stream();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_exception_safety_of_partition_range_reads) {
|
|
return seastar::async([] {
|
|
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
|
auto s = gen.schema();
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
utils::chunked_vector<mutation> ms = gen(2);
|
|
|
|
auto mt = make_memtable(s, ms);
|
|
memory::with_allocation_failures([&] {
|
|
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range))
|
|
.produces(ms);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_exception_safety_of_flush_reads) {
|
|
return seastar::async([] {
|
|
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
|
auto s = gen.schema();
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
utils::chunked_vector<mutation> ms = gen(2);
|
|
|
|
auto mt = make_memtable(s, ms);
|
|
memory::with_allocation_failures([&] {
|
|
auto revert = defer([&] {
|
|
mt->revert_flushed_memory();
|
|
});
|
|
assert_that(mt->make_flush_reader(s, semaphore.make_permit()))
|
|
.produces(ms);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_exception_safety_of_single_partition_reads) {
|
|
return seastar::async([] {
|
|
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
|
auto s = gen.schema();
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
utils::chunked_vector<mutation> ms = gen(2);
|
|
|
|
auto mt = make_memtable(s, ms);
|
|
memory::with_allocation_failures([&] {
|
|
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), dht::partition_range::make_singular(ms[1].decorated_key())))
|
|
.produces(ms[1]);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_compaction_during_flush) {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
simple_schema ss;
|
|
auto s = ss.schema();
|
|
auto mt = make_lw_shared<replica::memtable>(ss.schema());
|
|
|
|
auto pk = ss.make_pkey(0);
|
|
auto pr = dht::partition_range::make_singular(pk);
|
|
int n_rows = 10000;
|
|
{
|
|
mutation m(ss.schema(), pk);
|
|
for (int i = 0; i < n_rows; ++i) {
|
|
ss.add_row(m, ss.make_ckey(i), "v1");
|
|
}
|
|
mt->apply(m);
|
|
}
|
|
|
|
auto rd1 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_rd1 = defer([&] { rd1.close().get(); });
|
|
|
|
rd1.fill_buffer().get();
|
|
|
|
mutation rt_m(ss.schema(), pk);
|
|
auto rt = ss.delete_range(rt_m, ss.make_ckey_range(0, n_rows));
|
|
mt->apply(rt_m);
|
|
|
|
auto rd2 = mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_rd2 = defer([&] { rd2.close().get(); });
|
|
|
|
rd2.fill_buffer().get();
|
|
|
|
mt->apply(rt_m); // whatever
|
|
|
|
auto flush_rd = mt->make_flush_reader(ss.schema(), semaphore.make_permit());
|
|
auto close_flush_rd = defer([&] { flush_rd.close().get(); });
|
|
|
|
while (!flush_rd.is_end_of_stream()) {
|
|
flush_rd().get();
|
|
}
|
|
|
|
{ auto close_rd = std::move(close_rd2); }
|
|
{ auto rd = std::move(rd2); }
|
|
|
|
{ auto close_rd = std::move(close_rd1); }
|
|
{ auto rd = std::move(rd1); }
|
|
|
|
mt->cleaner().drain().get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_multiple_versions) {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
simple_schema ss;
|
|
auto s = ss.schema();
|
|
auto mt = make_lw_shared<replica::memtable>(ss.schema());
|
|
|
|
auto pk = ss.make_pkey(0);
|
|
auto pr = dht::partition_range::make_singular(pk);
|
|
|
|
auto t0 = ss.new_tombstone();
|
|
auto t1 = ss.new_tombstone();
|
|
auto t2 = ss.new_tombstone();
|
|
auto t3 = ss.new_tombstone();
|
|
|
|
mutation m1(s, pk);
|
|
ss.delete_range(m1, *position_range_to_clustering_range(position_range(
|
|
position_in_partition::before_key(ss.make_ckey(0)),
|
|
position_in_partition::for_key(ss.make_ckey(3))), *s), t1);
|
|
ss.add_row(m1, ss.make_ckey(0), "v");
|
|
ss.add_row(m1, ss.make_ckey(1), "v");
|
|
|
|
// Fill so that rd1 stays in the partition snapshot
|
|
int n_rows = 1000;
|
|
auto v = make_random_string(512);
|
|
for (int i = 0; i < n_rows; ++i) {
|
|
ss.add_row(m1, ss.make_ckey(i), v);
|
|
}
|
|
|
|
mutation m2(s, pk);
|
|
ss.delete_range(m2, *position_range_to_clustering_range(position_range(
|
|
position_in_partition::before_key(ss.make_ckey(0)),
|
|
position_in_partition::before_key(ss.make_ckey(1))), *s), t2);
|
|
ss.delete_range(m2, *position_range_to_clustering_range(position_range(
|
|
position_in_partition::before_key(ss.make_ckey(1)),
|
|
position_in_partition::for_key(ss.make_ckey(3))), *s), t3);
|
|
|
|
mutation m3(s, pk);
|
|
ss.delete_range(m3, *position_range_to_clustering_range(position_range(
|
|
position_in_partition::before_key(ss.make_ckey(0)),
|
|
position_in_partition::for_key(ss.make_ckey(4))), *s), t0);
|
|
|
|
mt->apply(m1);
|
|
|
|
auto rd1 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_rd1 = defer([&] { rd1.close().get(); });
|
|
|
|
rd1.fill_buffer().get();
|
|
BOOST_REQUIRE(!rd1.is_end_of_stream()); // rd1 must keep the m1 version alive
|
|
|
|
mt->apply(m2);
|
|
|
|
auto rd2 = mt->make_mutation_reader(s, semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_r2 = defer([&] { rd2.close().get(); });
|
|
|
|
rd2.fill_buffer().get();
|
|
BOOST_REQUIRE(!rd2.is_end_of_stream()); // rd2 must keep the m1 version alive
|
|
|
|
mt->apply(m3);
|
|
|
|
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
|
|
.has_monotonic_positions();
|
|
|
|
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
|
|
.produces(m1 + m2 + m3);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_merging_with_mvcc_and_preemption) {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
simple_schema ss;
|
|
auto s = ss.schema();
|
|
auto mt = make_lw_shared<replica::memtable>(ss.schema());
|
|
|
|
auto pk = ss.make_pkey(0);
|
|
auto pr = dht::partition_range::make_singular(pk);
|
|
|
|
// Produce large m0 so that merging range tombstone from m1 into m0 is likely to be preempted in the middle.
|
|
int n_tombstones = 10000;
|
|
int key_delta_per_tombstone = 3;
|
|
mutation m0(s, pk);
|
|
{
|
|
int key = 0;
|
|
for (int i = 0; i < n_tombstones; ++i) {
|
|
ss.add_row(m0, ss.make_ckey(key), "value");
|
|
key += key_delta_per_tombstone;
|
|
}
|
|
}
|
|
mt->apply(m0);
|
|
|
|
std::optional<mutation_reader> rd0 = mt->make_mutation_reader(
|
|
s, semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_rd0 = defer([&] { rd0->close().get(); });
|
|
rd0->fill_buffer().get();
|
|
BOOST_REQUIRE(!rd0->is_end_of_stream());
|
|
|
|
auto k1 = n_tombstones * key_delta_per_tombstone / 3;
|
|
auto k2 = k1 + n_tombstones * key_delta_per_tombstone / 2;
|
|
|
|
mutation m1(s, pk);
|
|
ss.delete_range(m1, ss.make_ckey_range(k1, k2));
|
|
mt->apply(m1);
|
|
|
|
std::optional<mutation_reader> rd1 = mt->make_mutation_reader(
|
|
s, semaphore.make_permit(), pr, s->full_slice(),
|
|
nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
|
auto close_rd1 = defer([&] { rd1->close().get(); });
|
|
rd1->fill_buffer().get();
|
|
BOOST_REQUIRE(!rd1->is_end_of_stream());
|
|
|
|
// Trigger merging of m1 into m0.
|
|
close_rd0.cancel();
|
|
rd0->close().get();
|
|
rd0 = {};
|
|
|
|
mutation m2(s, pk);
|
|
ss.delete_range(m2, ss.make_ckey_range(0, 1));
|
|
// Shadow earlier range tombstone in m1 to test whether applying this range tombstone
|
|
// to m1 (from m2) while m1 is still merging its range tombstones to m0 doesn't
|
|
// lead to loss of information from m2 due to the way preemption is handled in m1 -> m0 merging.
|
|
ss.delete_range(m2, ss.make_ckey_range(k1, k2));
|
|
mt->apply(m2);
|
|
|
|
// Trigger merging of m2 into m1.
|
|
// Some of it will complete immediately.
|
|
// Let's see if updates of m1 from m2 are not lost while merging of m1 into m0 is still in progress.
|
|
close_rd1.cancel();
|
|
rd1->close().get();
|
|
rd1 = {};
|
|
|
|
// Wait for merging to complete so that we read the final result later.
|
|
mt->cleaner().drain().get();
|
|
|
|
assert_that(mt->make_mutation_reader(s, semaphore.make_permit(), pr))
|
|
.produces(m0 + m1 + m2);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_range_tombstones_are_compacted_with_data) {
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
simple_schema ss;
|
|
auto s = ss.schema();
|
|
auto mt = make_lw_shared<replica::memtable>(ss.schema());
|
|
|
|
auto pk = ss.make_pkey(0);
|
|
auto pr = dht::partition_range::make_singular(pk);
|
|
|
|
auto old_tombstone = ss.new_tombstone(); // older than any write, does not cover anything
|
|
|
|
{
|
|
mutation m(ss.schema(), pk);
|
|
ss.add_row(m, ss.make_ckey(1), "v1");
|
|
ss.add_row(m, ss.make_ckey(2), "v1");
|
|
ss.add_row(m, ss.make_ckey(3), "v1");
|
|
ss.add_row(m, ss.make_ckey(4), "v1");
|
|
mt->apply(m);
|
|
}
|
|
|
|
mutation rt_m(ss.schema(), pk);
|
|
auto rt = ss.delete_range(rt_m, ss.make_ckey_range(2,3));
|
|
mt->apply(rt_m);
|
|
mt->cleaner().drain().get();
|
|
|
|
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
|
|
.produces_partition_start(pk)
|
|
.produces_row_with_key(ss.make_ckey(1))
|
|
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
|
.produces_range_tombstone_change({rt.end_position(), {}})
|
|
.produces_row_with_key(ss.make_ckey(4))
|
|
.produces_partition_end()
|
|
.produces_end_of_stream();
|
|
|
|
{
|
|
mutation m(ss.schema(), pk);
|
|
m.partition().apply(old_tombstone);
|
|
mt->apply(m);
|
|
mt->cleaner().drain().get();
|
|
}
|
|
|
|
// No change
|
|
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
|
|
.produces_partition_start(pk, {old_tombstone})
|
|
.produces_row_with_key(ss.make_ckey(1))
|
|
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
|
.produces_range_tombstone_change({rt.end_position(), {}})
|
|
.produces_row_with_key(ss.make_ckey(4))
|
|
.produces_partition_end()
|
|
.produces_end_of_stream();
|
|
|
|
auto new_tomb = ss.new_tombstone();
|
|
|
|
{
|
|
mutation m(ss.schema(), pk);
|
|
m.partition().apply(new_tomb);
|
|
mt->apply(m);
|
|
mt->cleaner().drain().get();
|
|
}
|
|
|
|
assert_that(mt->make_mutation_reader(ss.schema(), semaphore.make_permit(), pr))
|
|
.produces_partition_start(pk, {new_tomb})
|
|
.produces_range_tombstone_change({rt.position(), rt.tomb})
|
|
.produces_range_tombstone_change({rt.end_position(), {}})
|
|
.produces_partition_end()
|
|
.produces_end_of_stream();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_hash_is_cached) {
|
|
return seastar::async([] {
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("pk", bytes_type, column_kind::partition_key)
|
|
.with_column("v", bytes_type, column_kind::regular_column)
|
|
.build();
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s);
|
|
|
|
auto m = make_unique_mutation(s);
|
|
set_column(m, "v");
|
|
mt->apply(m);
|
|
|
|
{
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
|
}
|
|
|
|
{
|
|
auto slice = s->full_slice();
|
|
slice.options.set<query::partition_slice::option::with_digest>();
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice);
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
|
}
|
|
|
|
{
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
|
}
|
|
|
|
set_column(m, "v");
|
|
mt->apply(m);
|
|
|
|
{
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
|
}
|
|
|
|
{
|
|
auto slice = s->full_slice();
|
|
slice.options.set<query::partition_slice::option::with_digest>();
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit(), query::full_partition_range, slice);
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
|
}
|
|
|
|
{
|
|
auto rd = mt->make_mutation_reader(s, semaphore.make_permit());
|
|
auto close_rd = deferred_close(rd);
|
|
rd().get()->as_partition_start();
|
|
clustering_row row = std::move(*rd().get()).as_clustering_row();
|
|
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_collecting_encoding_stats) {
|
|
auto random_int32_value = [] {
|
|
return int32_type->decompose(tests::random::get_int<int32_t>());
|
|
};
|
|
|
|
auto now = gc_clock::now();
|
|
|
|
auto td = tests::data_model::table_description({ { "pk", int32_type } }, { { "ck", utf8_type } });
|
|
|
|
auto td1 = td;
|
|
td1.add_static_column("s1", int32_type);
|
|
td1.add_regular_column("v1", int32_type);
|
|
td1.add_regular_column("v2", int32_type);
|
|
auto built_schema = td1.build();
|
|
auto s = built_schema.schema;
|
|
|
|
auto md1 = tests::data_model::mutation_description({ to_bytes("pk1") });
|
|
md1.add_clustered_row_marker({ to_bytes("ck1") });
|
|
md1.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value());
|
|
auto m1 = md1.build(s);
|
|
|
|
auto md2 = tests::data_model::mutation_description({ to_bytes("pk2") });
|
|
auto md2_ttl = gc_clock::duration(std::chrono::seconds(1));
|
|
api::timestamp_type md2_timestamp = -10;
|
|
md2.add_clustered_row_marker({ to_bytes("ck1") }, md2_timestamp);
|
|
md2.add_clustered_cell({ to_bytes("ck1") }, "v1", random_int32_value());
|
|
md2.add_clustered_cell({ to_bytes("ck2") }, "v2",
|
|
tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md2_ttl, now + md2_ttl));
|
|
auto m2 = md2.build(s);
|
|
|
|
auto md3 = tests::data_model::mutation_description({ to_bytes("pk3") });
|
|
auto md3_ttl = gc_clock::duration(std::chrono::seconds(2));
|
|
auto md3_expiry_point = now - std::chrono::hours(8);
|
|
md3.add_static_cell("s1",
|
|
tests::data_model::mutation_description::atomic_value(random_int32_value(), tests::data_model::data_timestamp, md3_ttl, md3_expiry_point));
|
|
auto m3 = md3.build(s);
|
|
|
|
auto md4 = tests::data_model::mutation_description({ to_bytes("pk1") });
|
|
auto md4_tombstone = tombstone(md2_timestamp - 10, now - std::chrono::hours(9));
|
|
md4.set_partition_tombstone(md4_tombstone);
|
|
auto m4 = md4.build(s);
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s);
|
|
|
|
auto stats = mt->get_encoding_stats();
|
|
BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max());
|
|
BOOST_CHECK_EQUAL(stats.min_timestamp, api::max_timestamp);
|
|
BOOST_CHECK(stats.min_ttl == gc_clock::duration::max());
|
|
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), 0);
|
|
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), 0);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), api::max_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), api::max_timestamp);
|
|
|
|
mt->apply(m1);
|
|
stats = mt->get_encoding_stats();
|
|
BOOST_CHECK(stats.min_local_deletion_time == gc_clock::time_point::max());
|
|
BOOST_CHECK_EQUAL(stats.min_timestamp, tests::data_model::data_timestamp);
|
|
BOOST_CHECK(stats.min_ttl == gc_clock::duration::max());
|
|
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), tests::data_model::data_timestamp);
|
|
|
|
mt->apply(m2);
|
|
stats = mt->get_encoding_stats();
|
|
BOOST_CHECK(stats.min_local_deletion_time == now + md2_ttl);
|
|
BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp);
|
|
BOOST_CHECK(stats.min_ttl == md2_ttl);
|
|
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
|
|
|
|
mt->apply(m3);
|
|
stats = mt->get_encoding_stats();
|
|
BOOST_CHECK(stats.min_local_deletion_time == md3_expiry_point);
|
|
BOOST_CHECK_EQUAL(stats.min_timestamp, md2_timestamp);
|
|
BOOST_CHECK(stats.min_ttl == md2_ttl);
|
|
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md2_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
|
|
|
|
mt->apply(m4);
|
|
stats = mt->get_encoding_stats();
|
|
BOOST_CHECK(stats.min_local_deletion_time == md4_tombstone.deletion_time);
|
|
BOOST_CHECK_EQUAL(stats.min_timestamp, md4_tombstone.timestamp);
|
|
BOOST_CHECK(stats.min_ttl == md2_ttl);
|
|
BOOST_CHECK_EQUAL(mt->get_min_timestamp(), md4_tombstone.timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_max_timestamp(), tests::data_model::data_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_timestamp(), md2_timestamp);
|
|
BOOST_CHECK_EQUAL(mt->get_min_live_row_marker_timestamp(), md2_timestamp);
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(memtable_flush_compresses_mutations) {
|
|
auto db_config = make_shared<db::config>();
|
|
db_config->enable_cache.set(false);
|
|
return do_with_cql_env_thread([](cql_test_env& env) {
|
|
// Create table and insert some data
|
|
char const* ks_name = "keyspace_name";
|
|
char const* table_name = "table_name";
|
|
env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get();
|
|
env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
|
|
|
|
replica::database& db = env.local_db();
|
|
replica::table& t = db.find_column_family(ks_name, table_name);
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
schema_ptr s = t.schema();
|
|
|
|
// Build expected mutation with partition key: 1, clustering_key: 2 and value of id column: 3
|
|
dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1)));
|
|
clustering_key ck = clustering_key::from_single_value(*s, serialized(2));
|
|
|
|
mutation m1 = mutation(s, pk);
|
|
m1.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp());
|
|
|
|
mutation m2 = mutation(s, pk);
|
|
m2.partition().apply_delete(*s, clustering_key_prefix::from_singular(*s, 2), tombstone{api::new_timestamp(), gc_clock::now()});
|
|
|
|
t.apply(m1);
|
|
t.apply(m2);
|
|
|
|
// Flush to make sure all the modifications make it to disk
|
|
t.flush().get();
|
|
|
|
// Treat the table as mutation_source and SCYLLA_ASSERT we get the expected mutation and end of stream
|
|
mutation_source ms = t.as_mutation_source();
|
|
assert_that(ms.make_mutation_reader(s, semaphore.make_permit()))
|
|
.produces(m2)
|
|
.produces_end_of_stream();
|
|
}, db_config);
|
|
}
|
|
|
|
static auto check_has_error_injection() {
|
|
return boost::unit_test::precondition([](auto){
|
|
return
|
|
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
|
|
true
|
|
#else
|
|
false
|
|
#endif
|
|
;
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(memtable_flush_period, *check_has_error_injection()) {
|
|
#ifdef SCYLLA_ENABLE_ERROR_INJECTION
|
|
auto db_config = make_shared<db::config>();
|
|
db_config->enable_cache.set(false);
|
|
return do_with_cql_env_thread([](cql_test_env& env) {
|
|
// Create table and insert some data
|
|
char const* ks_name = "keyspace_name";
|
|
char const* table_name = "table_name";
|
|
env.execute_cql(format("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}};", ks_name)).get();
|
|
env.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, id int, PRIMARY KEY(pk, ck));", ks_name, table_name)).get();
|
|
|
|
replica::database& db = env.local_db();
|
|
replica::table& t = db.find_column_family(ks_name, table_name);
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
auto s1 = t.schema();
|
|
|
|
dht::decorated_key pk = dht::decorate_key(*s1, partition_key::from_single_value(*s1, serialized(1)));
|
|
clustering_key ck = clustering_key::from_single_value(*s1, serialized(2));
|
|
|
|
mutation m = mutation(s1, pk);
|
|
m.set_clustered_cell(ck, to_bytes("id"), data_value(3), api::new_timestamp());
|
|
t.apply(m);
|
|
BOOST_REQUIRE_EQUAL(t.sstables_count(), 0); // add mutation and check there are no sstables for this table
|
|
|
|
auto& errj = utils::get_local_injector();
|
|
errj.enable("table_seal_post_flush_waiters", true);
|
|
|
|
// change schema to set memtable flush period
|
|
// we use small value in this test but it is impossible to set the period less than 60000ms using ALTER TABLE construction
|
|
schema_builder b(t.schema());
|
|
b.set_memtable_flush_period(200);
|
|
schema_ptr s2 = b.build();
|
|
t.set_schema(s2);
|
|
|
|
BOOST_TEST_MESSAGE("Wait for flush");
|
|
errj.inject("table_seal_post_flush_waiters", utils::wait_for_message(std::chrono::minutes(2))).get();
|
|
BOOST_TEST_MESSAGE("Flush received");
|
|
|
|
BOOST_REQUIRE(eventually_true([&] { // wait until memtable will be flushed at least once
|
|
return t.sstables_count() == 1;
|
|
}));
|
|
|
|
// Check mutation presents in the table
|
|
mutation_source ms = t.as_mutation_source();
|
|
assert_that(ms.make_mutation_reader(s2, semaphore.make_permit()))
|
|
.produces(m)
|
|
.produces_end_of_stream();
|
|
}, db_config);
|
|
#else
|
|
BOOST_TEST_MESSAGE("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev)");
|
|
return make_ready_future<>();
|
|
#endif
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) {
|
|
auto db_config = make_shared<db::config>();
|
|
db_config->enable_cache.set(false);
|
|
return do_with_cql_env_thread([](cql_test_env& env) {
|
|
replica::database& db = env.local_db();
|
|
service::migration_manager& mm = env.migration_manager().local();
|
|
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "table_name";
|
|
|
|
schema_ptr s = schema_builder(ks_name, table_name)
|
|
.with_column(to_bytes("pk"), int32_type, column_kind::partition_key)
|
|
.with_column(to_bytes("ck"), int32_type, column_kind::clustering_key)
|
|
.with_column(to_bytes("id"), int32_type)
|
|
.set_gc_grace_seconds(1)
|
|
.build();
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
|
|
|
|
replica::table& t = db.find_column_family(ks_name, table_name);
|
|
|
|
dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1)));
|
|
clustering_key ck_to_delete = clustering_key::from_single_value(*s, serialized(2));
|
|
clustering_key ck = clustering_key::from_single_value(*s, serialized(3));
|
|
|
|
api::timestamp_type insertion_timestamp_before_delete = api::new_timestamp();
|
|
forward_jump_clocks(1s);
|
|
api::timestamp_type deletion_timestamp = api::new_timestamp();
|
|
forward_jump_clocks(1s);
|
|
api::timestamp_type insertion_timestamp_after_delete = api::new_timestamp();
|
|
|
|
mutation m_delete = mutation(s, pk);
|
|
m_delete.partition().apply_delete(
|
|
*s,
|
|
ck_to_delete,
|
|
tombstone{deletion_timestamp, gc_clock::now()});
|
|
t.apply(m_delete);
|
|
|
|
// Insert data that won't be removed by tombstone to prevent compaction from skipping whole partition
|
|
mutation m_insert = mutation(s, pk);
|
|
m_insert.set_clustered_cell(ck, to_bytes("id"), data_value(3), insertion_timestamp_after_delete);
|
|
t.apply(m_insert);
|
|
|
|
// Flush and wait until the gc_grace_seconds pass
|
|
t.flush().get();
|
|
forward_jump_clocks(2s);
|
|
|
|
// Apply the past mutation to memtable to simulate repair. This row should be deleted by tombstone
|
|
mutation m_past_insert = mutation(s, pk);
|
|
m_past_insert.set_clustered_cell(
|
|
ck_to_delete,
|
|
to_bytes("id"),
|
|
data_value(4),
|
|
insertion_timestamp_before_delete);
|
|
t.apply(m_past_insert);
|
|
|
|
// Trigger compaction. If all goes well, compaction should check if a relevant row is in the memtable
|
|
// and should not purge the tombstone.
|
|
t.compact_all_sstables(tasks::task_info{}).get();
|
|
|
|
// If we get additional row (1, 2, 4), that means the tombstone was purged and data was resurrected
|
|
assert_that(env.execute_cql(format("SELECT * FROM {}.{};", ks_name, table_name)).get())
|
|
.is_rows()
|
|
.with_rows_ignore_order({
|
|
{serialized(1), serialized(3), serialized(3)},
|
|
});
|
|
}, db_config);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
|
|
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
|
std::cerr << "Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n";
|
|
return make_ready_future<>();
|
|
#else
|
|
auto db_config = make_shared<db::config>();
|
|
db_config->unspooled_dirty_soft_limit.set(1.0);
|
|
|
|
return do_with_cql_env_thread([](cql_test_env& env) {
|
|
replica::database& db = env.local_db();
|
|
service::migration_manager& mm = env.migration_manager().local();
|
|
|
|
simple_schema ss;
|
|
schema_ptr s = ss.schema();
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
|
|
|
|
replica::table& t = db.find_column_family("ks", "cf");
|
|
auto memtables = active_memtables(t);
|
|
|
|
// Insert something so that we have data in memtable to flush
|
|
// it has to be somewhat large, as automatic flushing picks the
|
|
// largest memtable to flush
|
|
mutation mt = {s, tests::generate_partition_key(s)};
|
|
for (uint32_t i = 0; i < 1000; ++i) {
|
|
ss.add_row(mt, ss.make_ckey(i), format("{}", i));
|
|
}
|
|
t.apply(mt);
|
|
|
|
BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp);
|
|
BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp);
|
|
|
|
auto failed_memtables_flushes_count = db.cf_stats()->failed_memtables_flushes_count;
|
|
|
|
utils::get_local_injector().enable("table_seal_active_memtable_add_memtable", true /* oneshot */);
|
|
utils::get_local_injector().enable("table_seal_active_memtable_start_op", true /* oneshot */);
|
|
utils::get_local_injector().enable("table_seal_active_memtable_try_flush", true /* oneshot */);
|
|
utils::get_local_injector().enable("table_seal_active_memtable_reacquire_write_permit");
|
|
|
|
// Trigger flush
|
|
auto f = t.flush();
|
|
|
|
BOOST_REQUIRE(eventually_true([&] {
|
|
return db.cf_stats()->failed_memtables_flushes_count - failed_memtables_flushes_count >= 4;
|
|
}));
|
|
|
|
// The flush failed, make sure there is still data in memtable.
|
|
BOOST_REQUIRE_LT(t.min_memtable_timestamp(), api::max_timestamp);
|
|
BOOST_REQUIRE_LT(t.min_memtable_live_timestamp(), api::max_timestamp);
|
|
utils::get_local_injector().disable("table_seal_active_memtable_reacquire_write_permit");
|
|
|
|
BOOST_REQUIRE(eventually_true([&] {
|
|
// The error above is no longer being injected, so
|
|
// seal_active_memtable retry loop should eventually succeed
|
|
return t.min_memtable_timestamp() == api::max_timestamp
|
|
&& t.min_memtable_live_timestamp() == api::max_timestamp
|
|
&& t.min_memtable_live_row_marker_timestamp() == api::max_timestamp;
|
|
}));
|
|
|
|
std::move(f).get();
|
|
}, db_config);
|
|
#endif
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
|
|
#ifdef DEBUG
|
|
// This test was observed to take multiple minutes to run in debug mode on CI machines.
|
|
// This test checks that a certain behaviour is triggered when compaction falls behind.
|
|
// Not critical to run in debug mode. Both compaction and memtable have their own
|
|
// correctness tests, which do run in debug mode.
|
|
return make_ready_future<>();
|
|
#else
|
|
BOOST_ASSERT(smp::count == 2);
|
|
// The test simulates a situation where 2 threads issue flushes to 2
|
|
// tables. Both issue small flushes, but one has injected reactor stalls.
|
|
// This can lead to a situation where lots of small sstables accumulate on
|
|
// disk, and, if compaction never has a chance to keep up, resources can be
|
|
// exhausted.
|
|
return do_with_cql_env([](cql_test_env& env) -> future<> {
|
|
struct flusher {
|
|
cql_test_env& env;
|
|
const int num_flushes;
|
|
const int sleep_ms;
|
|
|
|
static sstring cf_name(unsigned thread_id) {
|
|
return format("cf_{}", thread_id);
|
|
}
|
|
|
|
static sstring ks_name() {
|
|
return "ks";
|
|
}
|
|
|
|
future<> create_table(schema_ptr s) {
|
|
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
|
|
auto group0_guard = co_await mm.start_group0_operation();
|
|
auto ts = group0_guard.write_timestamp();
|
|
auto announcement = co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts);
|
|
co_await mm.announce(std::move(announcement), std::move(group0_guard), "");
|
|
});
|
|
}
|
|
|
|
future<> drop_table() {
|
|
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
|
|
auto group0_guard = co_await mm.start_group0_operation();
|
|
auto ts = group0_guard.write_timestamp();
|
|
auto announcement = co_await service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), ks_name(), cf_name(shard), ts);
|
|
co_await mm.announce(std::move(announcement), std::move(group0_guard), "");
|
|
});
|
|
}
|
|
|
|
future<> operator()() {
|
|
const sstring ks_name = this->ks_name();
|
|
const sstring cf_name = this->cf_name(this_shard_id());
|
|
random_mutation_generator gen{
|
|
random_mutation_generator::generate_counters::no,
|
|
local_shard_only::yes,
|
|
random_mutation_generator::generate_uncompactable::no,
|
|
std::nullopt,
|
|
ks_name.c_str(),
|
|
cf_name.c_str()
|
|
};
|
|
schema_ptr s = gen.schema();
|
|
|
|
co_await create_table(s);
|
|
replica::database& db = env.local_db();
|
|
replica::table& t = db.find_column_family(ks_name, cf_name);
|
|
|
|
for ([[maybe_unused]] int value : std::views::iota(0, num_flushes)) {
|
|
::usleep(sleep_ms * 1000);
|
|
co_await db.apply(t.schema(), freeze(gen()), tracing::trace_state_ptr(), db::commitlog::force_sync::yes, db::no_timeout);
|
|
co_await t.flush();
|
|
BOOST_ASSERT(t.sstables_count() < size_t(t.schema()->max_compaction_threshold() * 4));
|
|
}
|
|
co_await drop_table();
|
|
}
|
|
};
|
|
|
|
int sleep_ms = 2;
|
|
for ([[maybe_unused]] int i : std::views::iota(0, 8)) {
|
|
future<> f0 = smp::submit_to(0, flusher{.env=env, .num_flushes=100, .sleep_ms=0});
|
|
future<> f1 = smp::submit_to(1, flusher{.env=env, .num_flushes=3, .sleep_ms=sleep_ms});
|
|
co_await std::move(f0);
|
|
co_await std::move(f1);
|
|
sleep_ms *= 2;
|
|
}
|
|
});
|
|
#endif
|
|
}
|
|
|
|
static future<> exceptions_in_flush_helper(std::unique_ptr<sstables::file_io_extension> mep, bool& should_fail, const bool& did_fail, const schema*& schema_filter, bool expect_isolate) {
|
|
auto ext = std::make_shared<db::extensions>();
|
|
auto cfg = seastar::make_shared<db::config>(ext);
|
|
|
|
ext->add_sstable_file_io_extension("test", std::move(mep));
|
|
|
|
co_await do_with_cql_env([&](cql_test_env& env) -> future<> {
|
|
|
|
co_await env.execute_cql(fmt::format("create table t0 (pk text primary key, v text)"));
|
|
|
|
schema_filter = env.local_db().find_column_family("ks", "t0").schema().get();
|
|
|
|
should_fail = true;
|
|
|
|
int i = 0;
|
|
|
|
testlog.debug("Wait for fail");
|
|
|
|
auto f = make_ready_future<>();
|
|
|
|
while (!did_fail) {
|
|
std::string pk = "apa" + std::to_string(i++);
|
|
std::string v = "ko";
|
|
co_await env.execute_cql(fmt::format("insert into ks.t0 (pk, v) values ('{}', '{}')", pk, v));
|
|
|
|
f = f.then([&] {
|
|
return env.db().invoke_on_all([] (replica::database& db) {
|
|
return db.flush_all_memtables();
|
|
});
|
|
});
|
|
}
|
|
|
|
BOOST_REQUIRE(did_fail);
|
|
testlog.debug("Reset fail trigger");
|
|
|
|
should_fail = false;
|
|
|
|
if (expect_isolate) {
|
|
bool isolated = false;
|
|
// can't use eventually_true here, because neither we nor the invoke on shard 0 is in seastar
|
|
// thread.
|
|
for (int i = 0; i < 10; ++i) {
|
|
isolated = co_await env.get_storage_service().invoke_on(0, [&](service::storage_service& ss) {
|
|
return ss.is_isolated();
|
|
});
|
|
if (isolated) {
|
|
break;
|
|
}
|
|
// isolation is not syncnronous;
|
|
co_await sleep(2s);
|
|
}
|
|
|
|
BOOST_REQUIRE(isolated);
|
|
}
|
|
|
|
testlog.debug("Trying to stop");
|
|
|
|
co_await std::move(f);
|
|
}, cfg);
|
|
}
|
|
|
|
static future<> exceptions_in_flush_on_sstable_write_helper(std::function<void()> throw_func, bool expect_isolate = true) {
|
|
class myext : public sstables::file_io_extension {
|
|
public:
|
|
bool should_fail = false;
|
|
bool did_fail = false;
|
|
const schema* schema_filter = nullptr;
|
|
std::function<void()> throw_func;
|
|
|
|
bool match_schema_filter(const schema& s) const {
|
|
const auto ret = !schema_filter || schema_filter->id() == s.id();
|
|
testlog.info("exceptions_in_flush_on_sstable_write_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret);
|
|
return ret;
|
|
}
|
|
|
|
future<file> wrap_file(const sstable& t, component_type type, file f, open_flags flags) override {
|
|
if (should_fail && match_schema_filter(*t.get_schema())) {
|
|
class myimpl : public seastar::file_impl {
|
|
file _file;
|
|
myext& _myext;
|
|
public:
|
|
myimpl(file f, myext& ext)
|
|
: _file(std::move(f))
|
|
, _myext(ext)
|
|
{}
|
|
void fail() const {
|
|
if (_myext.should_fail) {
|
|
_myext.did_fail = true;
|
|
testlog.debug("Throwing exception");
|
|
_myext.throw_func();
|
|
}
|
|
}
|
|
future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, io_intent* intent) override {
|
|
fail();
|
|
return get_file_impl(_file)->write_dma(pos, buffer, len, intent);
|
|
}
|
|
future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
|
|
fail();
|
|
return get_file_impl(_file)->write_dma(pos, std::move(iov), intent);
|
|
}
|
|
future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, io_intent* intent) override {
|
|
fail();
|
|
return get_file_impl(_file)->read_dma(pos, buffer, len, intent);
|
|
}
|
|
future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, io_intent* intent) override {
|
|
fail();
|
|
return get_file_impl(_file)->read_dma(pos, std::move(iov), intent);
|
|
}
|
|
future<> flush(void) override {
|
|
fail();
|
|
return get_file_impl(_file)->flush();
|
|
}
|
|
future<struct stat> stat(void) override {
|
|
fail();
|
|
return get_file_impl(_file)->stat();
|
|
}
|
|
future<> truncate(uint64_t length) override {
|
|
fail();
|
|
return get_file_impl(_file)->truncate(length);
|
|
}
|
|
future<> discard(uint64_t offset, uint64_t length) override {
|
|
fail();
|
|
return get_file_impl(_file)->discard(offset, length);
|
|
}
|
|
future<> allocate(uint64_t position, uint64_t length) override {
|
|
fail();
|
|
return get_file_impl(_file)->allocate(position, length);
|
|
}
|
|
future<uint64_t> size(void) override {
|
|
fail();
|
|
return get_file_impl(_file)->size();
|
|
}
|
|
future<> close() override {
|
|
fail();
|
|
return get_file_impl(_file)->close();
|
|
}
|
|
std::unique_ptr<seastar::file_handle_impl> dup() override {
|
|
fail();
|
|
return get_file_impl(_file)->dup();
|
|
}
|
|
subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override {
|
|
fail();
|
|
return get_file_impl(_file)->list_directory(std::move(next));
|
|
}
|
|
future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, io_intent* intent) override {
|
|
fail();
|
|
return get_file_impl(_file)->dma_read_bulk(offset, range_size, intent);
|
|
}
|
|
};
|
|
co_return file(make_shared<myimpl>(std::move(f), *this));
|
|
}
|
|
co_return f;
|
|
}
|
|
};
|
|
auto mep = std::make_unique<myext>();
|
|
auto& me = *mep;
|
|
me.throw_func = std::move(throw_func);
|
|
co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_write) {
|
|
co_await exceptions_in_flush_on_sstable_write_helper(
|
|
[] { throw std::system_error(EACCES, std::system_category()); }
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_write) {
|
|
co_await exceptions_in_flush_on_sstable_write_helper(
|
|
[] { throw db::extension_storage_permission_error(get_name()); }
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_write) {
|
|
co_await exceptions_in_flush_on_sstable_write_helper(
|
|
[] { throw db::extension_storage_resource_unavailable(get_name()); }
|
|
, false // equal no ENOENT
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_write) {
|
|
co_await exceptions_in_flush_on_sstable_write_helper(
|
|
[] { throw db::extension_storage_misconfigured(get_name()); }
|
|
);
|
|
}
|
|
|
|
static future<> exceptions_in_flush_on_sstable_open_helper(std::function<void()> throw_func, bool expect_isolate = true) {
|
|
auto ext = std::make_shared<db::extensions>();
|
|
auto cfg = seastar::make_shared<db::config>(ext);
|
|
|
|
class myext : public sstables::file_io_extension {
|
|
public:
|
|
bool should_fail = false;
|
|
bool did_fail = false;
|
|
const schema* schema_filter = nullptr;
|
|
std::function<void()> throw_func;
|
|
|
|
bool match_schema_filter(const schema& s) const {
|
|
const auto ret = !schema_filter || schema_filter->id() == s.id();
|
|
testlog.info("exceptions_in_flush_on_sstable_open_helper()::match_schema_filter({}.{}#{}) -> {}", s.ks_name(), s.cf_name(), s.id(), ret);
|
|
return ret;
|
|
}
|
|
|
|
future<file> wrap_file(const sstable& t, component_type type, file f, open_flags flags) override {
|
|
if (should_fail && match_schema_filter(*t.get_schema())) {
|
|
did_fail = true;
|
|
testlog.debug("Throwing exception");
|
|
throw_func();
|
|
}
|
|
co_return f;
|
|
}
|
|
};
|
|
auto mep = std::make_unique<myext>();
|
|
auto& me = *mep;
|
|
me.throw_func = std::move(throw_func);;
|
|
co_await exceptions_in_flush_helper(std::move(mep), me.should_fail, me.did_fail, me.schema_filter, expect_isolate);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_exceptions_in_flush_on_sstable_open) {
|
|
co_await exceptions_in_flush_on_sstable_open_helper(
|
|
[] { throw std::system_error(EACCES, std::system_category()); }
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_permission_exceptions_in_flush_on_sstable_open) {
|
|
co_await exceptions_in_flush_on_sstable_open_helper(
|
|
[] { throw db::extension_storage_permission_error(get_name()); }
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_resource_exceptions_in_flush_on_sstable_open) {
|
|
co_await exceptions_in_flush_on_sstable_open_helper(
|
|
[] { throw db::extension_storage_resource_unavailable(get_name()); }
|
|
, false // equal no ENOENT
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_ext_config_exceptions_in_flush_on_sstable_open) {
|
|
co_await exceptions_in_flush_on_sstable_open_helper(
|
|
[] { throw db::extension_storage_misconfigured(get_name()); }
|
|
);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(memtable_reader_after_tablet_migration) {
|
|
cql_test_config cfg;
|
|
cfg.initial_tablets = 1;
|
|
cfg.ms_listen = true;
|
|
|
|
return do_with_cql_env_thread([](cql_test_env& env) {
|
|
replica::database& db = env.local_db();
|
|
auto& ss = env.get_storage_service().local();
|
|
|
|
// This test needs specific tablet layout, disable the load balancer to
|
|
// have manual control over it.
|
|
ss.set_tablet_balancing_enabled(false).get();
|
|
|
|
// Create table and insert some data
|
|
char const* table_name = "tbl";
|
|
env.execute_cql(format("CREATE TABLE ks.{} (pk int, ck int, v text, PRIMARY KEY(pk, ck));", table_name)).get();
|
|
auto& tbl = db.find_column_family("ks", table_name);
|
|
const auto schema = tbl.schema();
|
|
|
|
BOOST_REQUIRE(tbl.uses_tablets());
|
|
|
|
const auto& tablet_map = db.get_token_metadata().tablets().get_tablet_map(schema->id());
|
|
BOOST_REQUIRE_EQUAL(tablet_map.tablet_count(), 1);
|
|
|
|
const int32_t pk = 0;
|
|
|
|
mutation expected_mut(schema, dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(pk))));
|
|
|
|
const api::timestamp_type ts = 100;
|
|
const auto raw_v = utf8_type->decompose(sstring(1024, 'v'));
|
|
const auto& v_def = *schema->get_column_definition(to_bytes("v"));
|
|
|
|
// Add enough data to fill at least two buffers.
|
|
for (int32_t ck = 0; size_t(ck) < (2 * mutation_reader::default_max_buffer_size_in_bytes()) / 1024; ++ck) {
|
|
const auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(ck));
|
|
expected_mut.set_clustered_cell(ckey, v_def, atomic_cell::make_live(*v_def.type, ts, raw_v));
|
|
}
|
|
|
|
const auto first_tablet_id = tablet_map.first_tablet();
|
|
const auto first_tablet_info = tablet_map.get_tablet_info(first_tablet_id);
|
|
|
|
struct remote_data {
|
|
schema_ptr schema;
|
|
mutation expected_mut;
|
|
mutation_reader reader;
|
|
};
|
|
|
|
auto data_ptr = env.db().invoke_on(first_tablet_info.replicas.front().shard, [&table_name, fm = freeze(expected_mut)] (replica::database& db)
|
|
-> future<foreign_ptr<std::unique_ptr<remote_data>>> {
|
|
auto& tbl = db.find_column_family("ks", table_name);
|
|
const auto schema = tbl.schema();
|
|
|
|
co_await db.apply(schema, fm, {}, db::commitlog_force_sync::no, db::no_timeout);
|
|
|
|
testlog.info("create reader -- first buffer fill");
|
|
|
|
auto reader = tbl.make_mutation_reader(schema, co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {}), query::full_partition_range, schema->full_slice());
|
|
|
|
std::exception_ptr ex;
|
|
try {
|
|
co_await reader.fill_buffer();
|
|
co_return make_foreign(std::make_unique<remote_data>(remote_data{schema, fm.unfreeze(schema), std::move(reader)}));
|
|
} catch (...) {
|
|
ex = std::current_exception();
|
|
}
|
|
|
|
// If we are here, there was an exception, but check to be sure.
|
|
SCYLLA_ASSERT(ex);
|
|
co_await reader.close();
|
|
std::rethrow_exception(std::move(ex));
|
|
}).get();
|
|
|
|
// Migrate the tablet to another shard
|
|
{
|
|
const auto src = first_tablet_info.replicas.front();
|
|
auto dst = src;
|
|
dst.shard = (src.shard + 1) % smp::count;
|
|
// Closing the storage-group is done in the background, so it is fine
|
|
// to wait for this.
|
|
ss.move_tablet(schema->id(), tablet_map.get_last_token(first_tablet_id), src, dst).get();
|
|
}
|
|
|
|
smp::submit_to(data_ptr.get_owner_shard(), [&data_ptr] {
|
|
return async([&data_ptr] {
|
|
testlog.info("exhaust reader");
|
|
|
|
auto data = data_ptr.release();
|
|
auto close_reader = deferred_close(data->reader);
|
|
|
|
auto m_opt = read_mutation_from_mutation_reader(data->reader).get();
|
|
BOOST_REQUIRE(m_opt);
|
|
BOOST_REQUIRE(data->reader.is_end_of_stream());
|
|
|
|
assert_that(*m_opt).is_equal_to(data->expected_mut);
|
|
});
|
|
}).get();
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_memtable_reader_abort) {
|
|
simple_schema ss;
|
|
const auto s = ss.schema();
|
|
|
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
|
|
|
replica::table_stats tbl_stats;
|
|
replica::memtable_table_shared_data table_shared_data;
|
|
replica::dirty_memory_manager mgr;
|
|
|
|
auto mt = make_lw_shared<replica::memtable>(s, mgr, table_shared_data, tbl_stats);
|
|
|
|
auto pk = ss.make_pkey(0);
|
|
auto pr = dht::partition_range::make_singular(pk);
|
|
|
|
mutation m(s, pk);
|
|
for (int i = 0; i < 10; ++i) {
|
|
ss.add_row(m, ss.make_ckey(i), "v1");
|
|
}
|
|
mt->apply(m);
|
|
|
|
auto permit = semaphore.make_permit();
|
|
|
|
auto reader_opt = mt->make_mutation_reader_opt(s, permit, pr, s->full_slice());
|
|
BOOST_REQUIRE(reader_opt);
|
|
auto close_reader = deferred_close(*reader_opt);
|
|
|
|
permit.set_timeout(db::timeout_clock::now());
|
|
|
|
// Wait for timer to fire so the permit is timed out.
|
|
BOOST_REQUIRE(eventually_true([&] { return bool(permit.get_abort_exception()); }));
|
|
|
|
BOOST_REQUIRE_THROW((*reader_opt)().get(), named_semaphore_timed_out);
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|