Verify that the directory listers opened by get_snapshot_details are properly closed when handling an (injected) exception. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2403 lines
116 KiB
C++
2403 lines
116 KiB
C++
/*
|
|
* Copyright (C) 2016-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
|
|
#include <boost/test/tools/old/interface.hpp>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/shard_id.hh>
|
|
#include <seastar/core/smp.hh>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/bitops.hh>
|
|
#include <seastar/util/file.hh>
|
|
|
|
#undef SEASTAR_TESTING_MAIN
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <utility>
|
|
#include <fmt/ranges.h>
|
|
#include <fmt/std.h>
|
|
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/result_set_assertions.hh"
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/random_utils.hh"
|
|
#include "test/lib/simple_schema.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
|
|
#include "replica/database.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/lister.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "partition_slice_builder.hh"
|
|
#include "mutation/frozen_mutation.hh"
|
|
#include "test/lib/mutation_source_test.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/generation_type.hh"
|
|
#include "sstables/sstable_version.hh"
|
|
#include "db/config.hh"
|
|
#include "db/commitlog/commitlog_replayer.hh"
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "test/lib/tmpdir.hh"
|
|
#include "db/data_listeners.hh"
|
|
#include "replica/multishard_query.hh"
|
|
#include "mutation_query.hh"
|
|
#include "transport/messages/result_message.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "db/snapshot-ctl.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/view/view_builder.hh"
|
|
#include "replica/mutation_dump.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
using namespace sstables;
|
|
using namespace tests;
|
|
|
|
class database_test_wrapper {
|
|
replica::database& _db;
|
|
public:
|
|
explicit database_test_wrapper(replica::database& db) : _db(db) { }
|
|
|
|
reader_concurrency_semaphore& get_user_read_concurrency_semaphore() {
|
|
return _db.read_concurrency_sem();
|
|
}
|
|
reader_concurrency_semaphore& get_streaming_read_concurrency_semaphore() {
|
|
return _db._streaming_concurrency_sem;
|
|
}
|
|
reader_concurrency_semaphore& get_system_read_concurrency_semaphore() {
|
|
return _db._system_read_concurrency_sem;
|
|
}
|
|
|
|
size_t get_total_user_reader_concurrency_semaphore_memory() {
|
|
return _db._reader_concurrency_semaphores_group._total_memory;
|
|
}
|
|
|
|
size_t get_total_user_reader_concurrency_semaphore_weight() {
|
|
return _db._reader_concurrency_semaphores_group._total_weight;
|
|
}
|
|
};
|
|
|
|
static future<> apply_mutation(sharded<replica::database>& sharded_db, table_id uuid, const mutation& m, bool do_flush = false,
|
|
db::commitlog::force_sync fs = db::commitlog::force_sync::no, db::timeout_clock::time_point timeout = db::no_timeout) {
|
|
auto& t = sharded_db.local().find_column_family(uuid);
|
|
auto shard = t.shard_for_reads(m.token());
|
|
return sharded_db.invoke_on(shard, [uuid, fm = freeze(m), do_flush, fs, timeout] (replica::database& db) {
|
|
auto& t = db.find_column_family(uuid);
|
|
return db.apply(t.schema(), fm, tracing::trace_state_ptr(), fs, timeout).then([do_flush, &t] {
|
|
return do_flush ? t.flush() : make_ready_future<>();
|
|
});
|
|
});
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE(database_test)
|
|
|
|
SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([](cql_test_env& e) {
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
sstring ks_name = "ks";
|
|
sstring cf_name = "cf";
|
|
auto s = db.find_schema(ks_name, cf_name);
|
|
auto&& table = db.find_column_family(s);
|
|
auto uuid = s->id();
|
|
|
|
std::vector<size_t> keys_per_shard;
|
|
std::vector<dht::partition_range_vector> pranges_per_shard;
|
|
keys_per_shard.resize(smp::count);
|
|
pranges_per_shard.resize(smp::count);
|
|
for (uint32_t i = 1; i <= 1000; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i)));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
|
auto shard = table.shard_for_reads(m.token());
|
|
keys_per_shard[shard]++;
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
}
|
|
|
|
auto assert_query_result = [&] (const std::vector<size_t>& expected_sizes) {
|
|
auto max_size = std::numeric_limits<size_t>::max();
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(1000));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto&& [result, cache_tempature] = co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_sizes[shard]);
|
|
}).get();
|
|
};
|
|
assert_query_result(keys_per_shard);
|
|
|
|
replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf").get();
|
|
|
|
for (auto it = keys_per_shard.begin(); it < keys_per_shard.end(); ++it) {
|
|
*it = 0;
|
|
}
|
|
assert_query_result(keys_per_shard);
|
|
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto cl = db.commitlog();
|
|
auto rp = co_await db::commitlog_replayer::create_replayer(e.db(), e.get_system_keyspace());
|
|
auto paths = co_await cl->list_existing_segments();
|
|
co_await rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX);
|
|
}).get();
|
|
|
|
assert_query_result(keys_per_shard);
|
|
return make_ready_future<>();
|
|
}, cfg);
|
|
}
|
|
|
|
// Reproducer for:
|
|
// https://github.com/scylladb/scylla/issues/10421
|
|
// https://github.com/scylladb/scylla/issues/10423
|
|
SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
sstring ks_name = "ks";
|
|
sstring cf_name = "cf";
|
|
e.execute_cql(fmt::format("create table {}.{} (k text, v int, primary key (k));", ks_name, cf_name)).get();
|
|
auto& db = e.local_db();
|
|
auto uuid = db.find_uuid(ks_name, cf_name);
|
|
auto s = db.find_column_family(uuid).schema();
|
|
int count = 0;
|
|
|
|
auto insert_data = [&] (uint32_t begin, uint32_t end) {
|
|
return parallel_for_each(std::views::iota(begin, end), [&] (auto i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key-{}", tests::random::get_int<uint64_t>())));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
|
return apply_mutation(e.db(), uuid, m, true /* do_flush */).finally([&] {
|
|
++count;
|
|
});
|
|
});
|
|
};
|
|
|
|
uint32_t num_keys = 1000;
|
|
|
|
auto f0 = insert_data(0, num_keys);
|
|
auto f1 = do_until([&] { return std::cmp_greater_equal(count, num_keys); }, [&, ts = db_clock::now()] {
|
|
return replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf", ts, false /* with_snapshot */).then([] {
|
|
return yield();
|
|
});
|
|
});
|
|
f0.get();
|
|
f1.get();
|
|
}, cfg);
|
|
}
|
|
|
|
// Reproducer for:
|
|
// https://github.com/scylladb/scylla/issues/21719
|
|
SEASTAR_TEST_CASE(test_truncate_saves_replay_position) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
BOOST_REQUIRE_GT(smp::count, 1);
|
|
const sstring ks_name = "ks";
|
|
const sstring cf_name = "cf";
|
|
e.execute_cql(fmt::format("CREATE TABLE {}.{} (k TEXT PRIMARY KEY, v INT);", ks_name, cf_name)).get();
|
|
const table_id uuid = e.local_db().find_uuid(ks_name, cf_name);
|
|
|
|
replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, cf_name, db_clock::now(), false /* with_snapshot */).get();
|
|
|
|
auto res = e.execute_cql(fmt::format("SELECT * FROM system.truncated WHERE table_uuid = {}", uuid)).get();
|
|
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
|
|
BOOST_REQUIRE(rows);
|
|
auto row_count = rows->rs().result_set().size();
|
|
BOOST_REQUIRE_EQUAL(row_count, smp::count);
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_querying_with_limits) {
|
|
return do_with_cql_env_thread([](cql_test_env& e) {
|
|
// FIXME: restore indent.
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
auto s = db.find_schema("ks", "cf");
|
|
auto&& table = db.find_column_family(s);
|
|
auto uuid = s->id();
|
|
std::vector<size_t> keys_per_shard;
|
|
std::vector<dht::partition_range_vector> pranges_per_shard;
|
|
keys_per_shard.resize(smp::count);
|
|
pranges_per_shard.resize(smp::count);
|
|
for (uint32_t i = 1; i <= 3 * smp::count; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
|
mutation m(s, pkey);
|
|
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
auto shard = table.shard_for_reads(m.token());
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
}
|
|
for (uint32_t i = 3 * smp::count; i <= 8 * smp::count; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
auto shard = table.shard_for_reads(m.token());
|
|
keys_per_shard[shard]++;
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
}
|
|
|
|
auto max_size = std::numeric_limits<size_t>::max();
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(3));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(5));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 5);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(3));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
});
|
|
}
|
|
|
|
static void test_database(void (*run_tests)(populate_fn_ex, bool)) {
|
|
do_with_cql_env_thread([run_tests] (cql_test_env& e) {
|
|
run_tests([&] (schema_ptr s, const utils::chunked_vector<mutation>& partitions, gc_clock::time_point) -> mutation_source {
|
|
auto& mm = e.migration_manager().local();
|
|
try {
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
e.local_db().find_column_family(s->ks_name(), s->cf_name());
|
|
mm.announce(service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard), "").get();
|
|
} catch (const replica::no_such_column_family&) {
|
|
// expected
|
|
}
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
|
|
replica::column_family& cf = e.local_db().find_column_family(s);
|
|
auto uuid = cf.schema()->id();
|
|
for (auto&& m : partitions) {
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
}
|
|
cf.flush().get();
|
|
cf.get_row_cache().invalidate(row_cache::external_updater([] {})).get();
|
|
return mutation_source([&] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return cf.make_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}, true);
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_basic) {
|
|
test_database(run_mutation_source_tests_plain_basic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_reader_conversion) {
|
|
test_database(run_mutation_source_tests_plain_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_fragments_monotonic) {
|
|
test_database(run_mutation_source_tests_plain_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_read_back) {
|
|
test_database(run_mutation_source_tests_plain_read_back);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_basic) {
|
|
test_database(run_mutation_source_tests_reverse_basic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_reader_conversion) {
|
|
test_database(run_mutation_source_tests_reverse_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_fragments_monotonic) {
|
|
test_database(run_mutation_source_tests_reverse_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_read_back) {
|
|
test_database(run_mutation_source_tests_reverse_read_back);
|
|
}
|
|
|
|
static void require_exist(const sstring& filename, bool should) {
|
|
auto exists = file_exists(filename).get();
|
|
BOOST_REQUIRE_EQUAL(exists, should);
|
|
}
|
|
|
|
static void touch_dir(const sstring& dirname) {
|
|
recursive_touch_directory(dirname).get();
|
|
require_exist(dirname, true);
|
|
}
|
|
|
|
static void touch_file(const sstring& filename) {
|
|
tests::touch_file(filename).get();
|
|
require_exist(filename, true);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
|
|
using sst = sstables::sstable;
|
|
|
|
tmpdir data_dir;
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
auto& db_cfg = *db_cfg_ptr;
|
|
|
|
db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine);
|
|
|
|
// Create incomplete sstables in test data directory
|
|
sstring ks = "system";
|
|
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
|
|
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
|
|
|
|
auto temp_sst_dir_2 = fmt::format("{}/{}{}", sst_dir, generation_from_value(2), tempdir_extension);
|
|
touch_dir(temp_sst_dir_2);
|
|
|
|
auto temp_sst_dir_3 = fmt::format("{}/{}{}", sst_dir, generation_from_value(3), tempdir_extension);
|
|
touch_dir(temp_sst_dir_3);
|
|
|
|
auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
|
|
touch_file(temp_file_name);
|
|
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC);
|
|
touch_file(temp_file_name);
|
|
// Reproducer for #scylladb/scylladb#26393
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryHashes);
|
|
touch_file(temp_file_name);
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::Data);
|
|
touch_file(temp_file_name);
|
|
|
|
auto test_config = cql_test_config(db_cfg_ptr);
|
|
test_config.clean_data_dir_before_test = false;
|
|
do_with_cql_env_thread([&sst_dir, &ks, &cf, &temp_sst_dir_2, &temp_sst_dir_3] (cql_test_env& e) {
|
|
require_exist(temp_sst_dir_2, false);
|
|
require_exist(temp_sst_dir_3, false);
|
|
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC), false);
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryHashes), false);
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::Data), false);
|
|
}, test_config).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
|
|
using sst = sstables::sstable;
|
|
|
|
tmpdir data_dir;
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
auto& db_cfg = *db_cfg_ptr;
|
|
|
|
db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine);
|
|
|
|
// Create incomplete sstables in test data directory
|
|
sstring ks = "system";
|
|
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
|
|
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
|
|
sstring pending_delete_dir = sst_dir + "/" + sstables::pending_delete_dir;
|
|
|
|
auto write_file = [] (const sstring& file_name, const sstring& text) {
|
|
auto f = open_file_dma(file_name, open_flags::wo | open_flags::create | open_flags::truncate).get();
|
|
auto os = make_file_output_stream(f, file_output_stream_options{}).get();
|
|
os.write(text).get();
|
|
os.flush().get();
|
|
os.close().get();
|
|
require_exist(file_name, true);
|
|
};
|
|
|
|
auto component_basename = [&ks, &cf] (sstables::generation_type gen, component_type ctype) {
|
|
return sst::component_basename(ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
|
|
};
|
|
|
|
auto gen_filename = [&sst_dir, &ks, &cf] (sstables::generation_type gen, component_type ctype) {
|
|
return sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
|
|
};
|
|
|
|
touch_dir(pending_delete_dir);
|
|
|
|
// Empty log file
|
|
touch_file(pending_delete_dir + "/sstables-0-0.log");
|
|
|
|
// Empty temporary log file
|
|
touch_file(pending_delete_dir + "/sstables-1-1.log.tmp");
|
|
|
|
const sstring toc_text = "TOC.txt\nData.db\n";
|
|
|
|
sstables::sstable_generation_generator gen_generator;
|
|
std::vector<sstables::generation_type> gen;
|
|
constexpr size_t num_gens = 9;
|
|
std::generate_n(std::back_inserter(gen), num_gens, [&] {
|
|
return gen_generator();
|
|
});
|
|
|
|
// Regular log file with single entry
|
|
write_file(gen_filename(gen[2], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[2], component_type::Data));
|
|
write_file(pending_delete_dir + "/sstables-2-2.log",
|
|
component_basename(gen[2], component_type::TOC) + "\n");
|
|
|
|
// Temporary log file with single entry
|
|
write_file(pending_delete_dir + "/sstables-3-3.log.tmp",
|
|
component_basename(gen[3], component_type::TOC) + "\n");
|
|
|
|
// Regular log file with multiple entries
|
|
write_file(gen_filename(gen[4], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[4], component_type::Data));
|
|
write_file(gen_filename(gen[5], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[5], component_type::Data));
|
|
write_file(pending_delete_dir + "/sstables-4-5.log",
|
|
component_basename(gen[4], component_type::TOC) + "\n" +
|
|
component_basename(gen[5], component_type::TOC) + "\n");
|
|
|
|
// Regular log file with multiple entries and some deleted sstables
|
|
write_file(gen_filename(gen[6], component_type::TemporaryTOC), toc_text);
|
|
touch_file(gen_filename(gen[6], component_type::Data));
|
|
write_file(gen_filename(gen[7], component_type::TemporaryTOC), toc_text);
|
|
write_file(pending_delete_dir + "/sstables-6-8.log",
|
|
component_basename(gen[6], component_type::TOC) + "\n" +
|
|
component_basename(gen[7], component_type::TOC) + "\n" +
|
|
component_basename(gen[8], component_type::TOC) + "\n");
|
|
|
|
auto test_config = cql_test_config(db_cfg_ptr);
|
|
test_config.clean_data_dir_before_test = false;
|
|
do_with_cql_env_thread([&] (cql_test_env& e) {
|
|
// Empty log filesst_dir
|
|
// Empty temporary log file
|
|
require_exist(pending_delete_dir + "/sstables-1-1.log.tmp", false);
|
|
|
|
// Regular log file with single entry
|
|
require_exist(gen_filename(gen[2], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[2], component_type::Data), false);
|
|
require_exist(pending_delete_dir + "/sstables-2-2.log", false);
|
|
|
|
// Temporary log file with single entry
|
|
require_exist(pending_delete_dir + "/sstables-3-3.log.tmp", false);
|
|
|
|
// Regular log file with multiple entries
|
|
require_exist(gen_filename(gen[4], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[4], component_type::Data), false);
|
|
require_exist(gen_filename(gen[5], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[5], component_type::Data), false);
|
|
require_exist(pending_delete_dir + "/sstables-4-5.log", false);
|
|
|
|
// Regular log file with multiple entries and some deleted sstables
|
|
require_exist(gen_filename(gen[6], component_type::TemporaryTOC), false);
|
|
require_exist(gen_filename(gen[6], component_type::Data), false);
|
|
require_exist(gen_filename(gen[7], component_type::TemporaryTOC), false);
|
|
require_exist(pending_delete_dir + "/sstables-6-8.log", false);
|
|
}, test_config).get();
|
|
}
|
|
|
|
// Snapshot tests and their helpers
|
|
// \param func: function to be called back, in a seastar thread.
|
|
future<> do_with_some_data_in_thread(std::vector<sstring> cf_names, std::function<void (cql_test_env&)> func, bool create_mvs = false, shared_ptr<db::config> db_cfg_ptr = {}, size_t num_keys = 2) {
|
|
return seastar::async([cf_names = std::move(cf_names), func = std::move(func), create_mvs, db_cfg_ptr = std::move(db_cfg_ptr), num_keys] () mutable {
|
|
lw_shared_ptr<tmpdir> tmpdir_for_data;
|
|
if (!db_cfg_ptr) {
|
|
tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
}
|
|
do_with_cql_env_thread([cf_names = std::move(cf_names), func = std::move(func), create_mvs, num_keys] (cql_test_env& e) {
|
|
for (const auto& cf_name : cf_names) {
|
|
e.create_table([&cf_name] (std::string_view ks_name) {
|
|
return *schema_builder(ks_name, cf_name)
|
|
.with_column("p1", utf8_type, column_kind::partition_key)
|
|
.with_column("c1", int32_type, column_kind::clustering_key)
|
|
.with_column("c2", int32_type, column_kind::clustering_key)
|
|
.with_column("r1", int32_type)
|
|
.build();
|
|
}).get();
|
|
auto stmt = e.prepare(fmt::format("insert into {} (p1, c1, c2, r1) values (?, ?, ?, ?)", cf_name)).get();
|
|
auto make_key = [] (int64_t k) {
|
|
std::string s = fmt::format("key{}", k);
|
|
return cql3::raw_value::make_value(utf8_type->decompose(s));
|
|
};
|
|
auto make_val = [] (int64_t x) {
|
|
return cql3::raw_value::make_value(int32_type->decompose(int32_t{x}));
|
|
};
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
auto key = tests::random::get_int<int32_t>(1, 1000000);
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key), make_val(key + 1), make_val(key + 2)}).get();
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key + 1), make_val(key + 1), make_val(key + 2)}).get();
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key + 2), make_val(key + 1), make_val(key + 2)}).get();
|
|
}
|
|
|
|
if (create_mvs) {
|
|
auto f1 = e.local_view_builder().wait_until_built("ks", seastar::format("view_{}", cf_name));
|
|
e.execute_cql(seastar::format("create materialized view view_{0} as select * from {0} where p1 is not null and c1 is not null and c2 is "
|
|
"not null primary key (p1, c1, c2)",
|
|
cf_name))
|
|
.get();
|
|
f1.get();
|
|
|
|
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
|
|
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
|
|
f2.get();
|
|
}
|
|
}
|
|
|
|
func(e);
|
|
}, db_cfg_ptr).get();
|
|
});
|
|
}
|
|
|
|
future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<> (cql_test_env&)> func, bool create_mvs = false, shared_ptr<db::config> db_cfg_ptr = {}) {
|
|
co_await do_with_some_data_in_thread(cf_names, [&] (cql_test_env& e) {
|
|
func(e).get();
|
|
}, create_mvs, db_cfg_ptr);
|
|
}
|
|
|
|
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
|
|
try {
|
|
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
|
|
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
|
|
} catch (...) {
|
|
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
|
|
ks_name, cf_name, snapshot_name, opts.skip_flush, std::current_exception());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
future<std::set<sstring>> collect_files(fs::path path) {
|
|
std::set<sstring> ret;
|
|
directory_lister lister(path, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
while (auto de = co_await lister.get()) {
|
|
ret.insert(de->name);
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
static bool is_component(const sstring& fname, const sstring& suffix) {
|
|
return fname.ends_with(suffix);
|
|
}
|
|
|
|
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
|
|
// Verify manifest against the files in the snapshots dir
|
|
auto pred = [&suffix] (const sstring& fname) {
|
|
return is_component(fname, suffix);
|
|
};
|
|
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
|
|
}
|
|
|
|
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
|
|
static future<> validate_manifest(const locator::topology& topology, const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir, gc_clock::time_point min_time, bool tablets_enabled) {
|
|
sstring suffix = "-TOC.txt";
|
|
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
|
|
std::set<sstring> sstables_in_manifest;
|
|
std::set<sstring> non_sstables_in_manifest;
|
|
|
|
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
|
|
testlog.debug("manifest.json: {}", manifest_str);
|
|
auto manifest_json = rjson::parse(manifest_str);
|
|
BOOST_REQUIRE(manifest_json.IsObject());
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("manifest"));
|
|
auto& manifest_info = manifest_json["manifest"];
|
|
BOOST_REQUIRE(manifest_info.IsObject());
|
|
BOOST_REQUIRE(manifest_info.HasMember("version"));
|
|
auto& manifest_version = manifest_info["version"];
|
|
BOOST_REQUIRE(manifest_version.IsString());
|
|
BOOST_REQUIRE_EQUAL(manifest_version.GetString(), "1.0");
|
|
BOOST_REQUIRE(manifest_info.HasMember("scope"));
|
|
auto& manifest_scope = manifest_info["scope"];
|
|
BOOST_REQUIRE(manifest_scope.IsString());
|
|
BOOST_REQUIRE_EQUAL(manifest_scope.GetString(), "node");
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("node"));
|
|
auto& node_info = manifest_json["node"];
|
|
BOOST_REQUIRE(node_info.IsObject());
|
|
BOOST_REQUIRE(node_info.HasMember("host_id"));
|
|
auto& host_id_json = node_info["host_id"];
|
|
BOOST_REQUIRE(host_id_json.IsString());
|
|
auto id = utils::UUID(host_id_json.GetString());
|
|
BOOST_REQUIRE_EQUAL(id, topology.my_host_id().uuid());
|
|
BOOST_REQUIRE(node_info.HasMember("datacenter"));
|
|
auto& datacenter = node_info["datacenter"];
|
|
BOOST_REQUIRE(datacenter.IsString());
|
|
BOOST_REQUIRE_EQUAL(datacenter.GetString(), topology.get_location().dc);
|
|
BOOST_REQUIRE(node_info.HasMember("rack"));
|
|
auto& rack = node_info["rack"];
|
|
BOOST_REQUIRE(rack.IsString());
|
|
BOOST_REQUIRE_EQUAL(rack.GetString(), topology.get_location().rack);
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("snapshot"));
|
|
auto& manifest_snapshot = manifest_json["snapshot"];
|
|
BOOST_REQUIRE(manifest_snapshot.IsObject());
|
|
BOOST_REQUIRE(manifest_snapshot.HasMember("name"));
|
|
auto& snapshot_name = manifest_snapshot["name"];
|
|
BOOST_REQUIRE(snapshot_name.IsString());
|
|
BOOST_REQUIRE_EQUAL(snapshot_dir.filename(), snapshot_name.GetString());
|
|
BOOST_REQUIRE(manifest_snapshot.HasMember("created_at"));
|
|
auto& created_at = manifest_snapshot["created_at"];
|
|
BOOST_REQUIRE(created_at.IsNumber());
|
|
time_t created_at_seconds = created_at.GetInt64();
|
|
BOOST_REQUIRE_GE(created_at_seconds, min_time.time_since_epoch().count());
|
|
BOOST_REQUIRE_LT(created_at_seconds, min_time.time_since_epoch().count() + 60);
|
|
if (manifest_snapshot.HasMember("expires_at")) {
|
|
BOOST_REQUIRE(created_at_seconds > 0);
|
|
auto& expires_at = manifest_snapshot["expires_at"];
|
|
BOOST_REQUIRE(expires_at.IsNumber());
|
|
BOOST_REQUIRE_GE(expires_at.GetInt64(), created_at_seconds);
|
|
}
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("table"));
|
|
auto& manifest_table = manifest_json["table"];
|
|
BOOST_REQUIRE(manifest_table.IsObject());
|
|
BOOST_REQUIRE(manifest_table.HasMember("keyspace_name"));
|
|
auto& manifest_table_ks_name = manifest_table["keyspace_name"];
|
|
BOOST_REQUIRE(manifest_table_ks_name.IsString());
|
|
BOOST_REQUIRE_EQUAL(snapshot_dir.parent_path().parent_path().parent_path().filename().native(), manifest_table_ks_name.GetString());
|
|
auto& manifest_table_table_name = manifest_table["table_name"];
|
|
BOOST_REQUIRE(manifest_table_table_name.IsString());
|
|
BOOST_REQUIRE(snapshot_dir.parent_path().parent_path().filename().native().starts_with(manifest_table_table_name.GetString()));
|
|
std::optional<sstring> tablets_type;
|
|
if (manifest_table.HasMember("tablets_type")) {
|
|
auto& tablets_type_json = manifest_table["tablets_type"];
|
|
BOOST_REQUIRE(tablets_type_json.IsString());
|
|
tablets_type = tablets_type_json.GetString();
|
|
}
|
|
if (tablets_enabled) {
|
|
BOOST_REQUIRE(tablets_type.has_value());
|
|
BOOST_REQUIRE_EQUAL(*tablets_type, "powof2");
|
|
BOOST_REQUIRE(manifest_table.HasMember("tablet_count"));
|
|
auto& tablet_count_json = manifest_table["tablet_count"];
|
|
BOOST_REQUIRE(tablet_count_json.IsNumber());
|
|
uint64_t tablet_count = tablet_count_json.GetInt64();
|
|
BOOST_REQUIRE_EQUAL(tablet_count, 1 << log2ceil(tablet_count));
|
|
} else {
|
|
if (tablets_type) {
|
|
BOOST_REQUIRE_EQUAL(*tablets_type, "none");
|
|
}
|
|
if (manifest_table.HasMember("tablet_count")) {
|
|
auto& tablet_count = manifest_table["tablet_count"];
|
|
if (!tablet_count.IsNull()) {
|
|
BOOST_REQUIRE(tablet_count.IsNumber());
|
|
BOOST_REQUIRE_EQUAL(tablet_count.GetInt64(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (manifest_json.HasMember("sstables")) {
|
|
auto& sstables = manifest_json["sstables"];
|
|
BOOST_REQUIRE(sstables.IsArray());
|
|
for (auto& sst_json : sstables.GetArray()) {
|
|
BOOST_REQUIRE(sst_json.IsObject());
|
|
|
|
auto& id = sst_json["id"];
|
|
BOOST_REQUIRE(id.IsString());
|
|
auto uuid = utils::UUID(id.GetString());
|
|
BOOST_REQUIRE(!uuid.is_null());
|
|
|
|
auto& toc_name = sst_json["toc_name"];
|
|
BOOST_REQUIRE(toc_name.IsString());
|
|
BOOST_REQUIRE(is_component(toc_name.GetString(), suffix));
|
|
sstables_in_manifest.insert(toc_name.GetString());
|
|
|
|
auto& data_size = sst_json["data_size"];
|
|
BOOST_REQUIRE(data_size.IsNumber());
|
|
auto& index_size = sst_json["index_size"];
|
|
BOOST_REQUIRE(index_size.IsNumber());
|
|
|
|
if (sst_json.HasMember("first_token")) {
|
|
auto& first_token = sst_json["first_token"];
|
|
BOOST_REQUIRE(first_token.IsNumber());
|
|
|
|
BOOST_REQUIRE(sst_json.HasMember("last_token"));
|
|
auto& last_token = sst_json["last_token"];
|
|
BOOST_REQUIRE(last_token.IsNumber());
|
|
BOOST_REQUIRE_LE(first_token.GetInt64(), last_token.GetInt64());
|
|
} else {
|
|
BOOST_REQUIRE(!sst_json.HasMember("last_token"));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (manifest_json.HasMember("files")) {
|
|
auto& manifest_files = manifest_json["files"];
|
|
BOOST_REQUIRE(manifest_files.IsArray());
|
|
for (auto& f : manifest_files.GetArray()) {
|
|
non_sstables_in_manifest.insert(f.GetString());
|
|
}
|
|
}
|
|
|
|
BOOST_REQUIRE_EQUAL(sstables_in_manifest, sstables_in_snapshot);
|
|
BOOST_REQUIRE_EQUAL(non_sstables_in_manifest, std::set<sstring>{});
|
|
}
|
|
|
|
static future<> snapshot_works(const std::string& table_name, bool create_mvs, bool tablets_enabled = false) {
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->tablets_mode_for_new_keyspaces(tablets_enabled ? db::tablets_mode_t::mode::enabled : db::tablets_mode_t::mode::disabled);
|
|
return do_with_some_data_in_thread({"cf"}, [table_name] (cql_test_env& e) {
|
|
auto min_time = gc_clock::now();
|
|
take_snapshot(e, "ks", table_name).get();
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", table_name);
|
|
auto table_directory = table_dir(cf);
|
|
auto snapshot_dir = table_directory / sstables::snapshots_dir / "test";
|
|
|
|
auto in_table_dir = collect_files(table_directory).get();
|
|
// snapshot triggered a flush and wrote the data down.
|
|
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
|
|
|
|
auto in_snapshot_dir = collect_files(snapshot_dir).get();
|
|
|
|
in_table_dir.insert("manifest.json");
|
|
in_table_dir.insert("schema.cql");
|
|
// all files were copied and manifest was generated
|
|
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
|
|
|
|
const auto& topology = e.local_db().get_token_metadata().get_topology();
|
|
validate_manifest(topology, snapshot_dir, in_snapshot_dir, min_time, cf.uses_tablets()).get();
|
|
}, create_mvs, db_cfg_ptr, 100);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(table_snapshot_works) {
|
|
return snapshot_works("cf", true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(table_snapshot_works_with_tablets) {
|
|
// FIXME: do_with_some_data does not work with views and tablets yet
|
|
return snapshot_works("cf", false, true);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(view_snapshot_works) {
|
|
return snapshot_works("view_cf", true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(index_snapshot_works) {
|
|
return snapshot_works(::secondary_index::index_table_name("index_cf"), true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
db::snapshot_options opts = {.skip_flush = true};
|
|
take_snapshot(e, "ks", "cf", "test", opts).get();
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
|
|
auto in_table_dir = collect_files(table_dir(cf)).get();
|
|
// Snapshot did not trigger a flush.
|
|
BOOST_REQUIRE(in_table_dir.empty());
|
|
auto in_snapshot_dir = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get();
|
|
BOOST_REQUIRE_EQUAL(in_snapshot_dir, std::set<sstring>({"manifest.json", "schema.cql"}));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_okay) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_contains_dropped_tables) {
|
|
return do_with_some_data_in_thread({"cf1", "cf2", "cf3", "cf4"}, [] (cql_test_env& e) {
|
|
e.execute_cql("DROP TABLE ks.cf1;").get();
|
|
|
|
auto details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
BOOST_REQUIRE_EQUAL(details.begin()->second.size(), 1);
|
|
|
|
const auto& sd = details.begin()->second.front().details;
|
|
BOOST_REQUIRE_GT(sd.live, 0);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd.live);
|
|
|
|
take_snapshot(e, "ks", "cf2", "test2").get();
|
|
take_snapshot(e, "ks", "cf3", "test3").get();
|
|
|
|
details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 3);
|
|
|
|
e.execute_cql("DROP TABLE ks.cf4;").get();
|
|
|
|
details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 4);
|
|
|
|
for (const auto& [name, r] : details) {
|
|
BOOST_REQUIRE_EQUAL(r.size(), 1);
|
|
const auto& result = r.front();
|
|
const auto& sd = result.details;
|
|
|
|
if (name == "test2" || name == "test3") {
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
} else {
|
|
BOOST_REQUIRE_GT(sd.live, 0);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd.live);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_inexistent) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 0);
|
|
});
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(clear_snapshot) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
take_snapshot(e).get();
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
|
|
unsigned count = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get().size();
|
|
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
|
|
|
|
e.local_db().clear_snapshot("test", {"ks"}, "").get();
|
|
count = 0;
|
|
|
|
BOOST_REQUIRE_EQUAL(fs::exists(table_dir(cf) / sstables::snapshots_dir / "test"), false);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(clear_multiple_snapshots) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "cf";
|
|
auto num_snapshots = 2;
|
|
|
|
auto snapshot_name = [] (int idx) {
|
|
return format("test-snapshot-{}", idx);
|
|
};
|
|
|
|
co_await do_with_some_data_in_thread({table_name}, [&] (cql_test_env& e) {
|
|
auto& t = e.local_db().find_column_family(ks_name, table_name);
|
|
auto tdir = table_dir(t);
|
|
auto snapshots_dir = tdir / sstables::snapshots_dir;
|
|
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
testlog.debug("Taking snapshot {} on {}.{}", snapshot_name(i), ks_name, table_name);
|
|
take_snapshot(e, ks_name, table_name, snapshot_name(i)).get();
|
|
}
|
|
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
testlog.debug("Verifying {}", snapshots_dir / snapshot_name(i));
|
|
unsigned count = collect_files(snapshots_dir / snapshot_name(i)).get().size();
|
|
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
|
|
}
|
|
|
|
// non-existent tag
|
|
testlog.debug("Clearing bogus tag");
|
|
e.local_db().clear_snapshot("bogus", {ks_name}, table_name).get();
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), true);
|
|
}
|
|
|
|
// clear single tag
|
|
testlog.debug("Clearing snapshot={} of {}.{}", snapshot_name(0), ks_name, table_name);
|
|
e.local_db().clear_snapshot(snapshot_name(0), {ks_name}, table_name).get();
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(0)), false);
|
|
for (auto i = 1; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), true);
|
|
}
|
|
|
|
// clear all tags (all tables)
|
|
testlog.debug("Clearing all snapshots in {}", ks_name);
|
|
e.local_db().clear_snapshot("", {ks_name}, "").get();
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), false);
|
|
}
|
|
|
|
testlog.debug("Taking an extra {} of {}.{}", snapshot_name(num_snapshots), ks_name, table_name);
|
|
take_snapshot(e, ks_name, table_name, snapshot_name(num_snapshots)).get();
|
|
|
|
// existing snapshots expected to remain after dropping the table
|
|
testlog.debug("Dropping table {}.{}", ks_name, table_name);
|
|
replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name).get();
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(num_snapshots)), true);
|
|
|
|
// clear all tags
|
|
testlog.debug("Clearing all snapshots in {}.{} after it had been dropped", ks_name, table_name);
|
|
e.local_db().clear_snapshot("", {ks_name}, table_name).get();
|
|
|
|
SCYLLA_ASSERT(!fs::exists(tdir));
|
|
|
|
// after all snapshots had been cleared,
|
|
// the dropped table directory is expected to be removed.
|
|
BOOST_REQUIRE_EQUAL(fs::exists(tdir), false);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(clear_nonexistent_snapshot) {
|
|
// no crashes, no exceptions
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
e.local_db().clear_snapshot("test", {"ks"}, "").get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
sharded<db::snapshot_ctl> sc;
|
|
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
|
auto stop_sc = deferred_stop(sc);
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto sc_details = sc.local().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(sc_details.size(), 1);
|
|
|
|
auto sc_sd_vec = sc_details["test"];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_vec.size(), 1);
|
|
const auto &sc_sd = sc_sd_vec[0];
|
|
BOOST_REQUIRE_EQUAL(sc_sd.ks, "ks");
|
|
BOOST_REQUIRE_EQUAL(sc_sd.cf, "cf");
|
|
BOOST_REQUIRE_EQUAL(sc_sd.details.live, sd.live);
|
|
BOOST_REQUIRE_EQUAL(sc_sd.details.total, sd.total);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
|
|
sc_details = sc.local().get_snapshot_details().get();
|
|
auto sc_sd_post_deletion_vec = sc_details["test"];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion_vec.size(), 1);
|
|
const auto &sc_sd_post_deletion = sc_sd_post_deletion_vec[0];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.ks, "ks");
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.cf, "cf");
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.details.live, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.details.total, sd_post_deletion.total);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_snapshot_ctl_true_snapshots_size) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
sharded<db::snapshot_ctl> sc;
|
|
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
|
auto stop_sc = deferred_stop(sc);
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto sc_live_size = sc.local().true_snapshots_size().get();
|
|
BOOST_REQUIRE_EQUAL(sc_live_size, sd.live);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
|
|
sc_live_size = sc.local().true_snapshots_size().get();
|
|
BOOST_REQUIRE_EQUAL(sc_live_size, sd_post_deletion.live);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_snapshot_ctl_details_exception_handling) {
|
|
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
|
testlog.debug("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
|
return make_ready_future();
|
|
#endif
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
sharded<db::snapshot_ctl> sc;
|
|
sc.start(std::ref(e.db()), std::ref(e.get_storage_proxy()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
|
auto stop_sc = deferred_stop(sc);
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
utils::get_local_injector().enable("get_snapshot_details", true);
|
|
BOOST_REQUIRE_THROW(cf.get_snapshot_details().get(), std::runtime_error);
|
|
|
|
utils::get_local_injector().enable("per-snapshot-get_snapshot_details", true);
|
|
BOOST_REQUIRE_THROW(cf.get_snapshot_details().get(), std::runtime_error);
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
});
|
|
}
|
|
|
|
// toppartitions_query caused a lw_shared_ptr to cross shards when moving results, #5104
|
|
SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE ks.tab (id int PRIMARY KEY)").get();
|
|
db::toppartitions_query tq(e.db(), {{"ks", "tab"}}, {}, 1s, 100, 100);
|
|
tq.scatter().get();
|
|
auto q = e.prepare("INSERT INTO ks.tab(id) VALUES(?)").get();
|
|
// Generate many values to ensure crossing shards
|
|
for (auto i = 0; i != 100; ++i) {
|
|
e.execute_prepared(q, {cql3::raw_value::make_value(int32_type->decompose(i))}).get();
|
|
}
|
|
// This should trigger the bug in debug mode
|
|
tq.gather().get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(read_max_size) {
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
|
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tab = db.find_column_family("ks", "test");
|
|
auto s = tab.schema();
|
|
|
|
auto pk = tests::generate_partition_key(s);
|
|
const auto cql3_pk = cql3::raw_value::make_value(pk.key().explode().front());
|
|
|
|
const auto value = sstring(1024, 'a');
|
|
const auto raw_value = utf8_type->decompose(data_value(value));
|
|
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
|
|
|
const int num_rows = 1024;
|
|
|
|
for (int i = 0; i != num_rows; ++i) {
|
|
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
|
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
|
}
|
|
|
|
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
|
|
|
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
|
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
|
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
|
return std::get<0>(res).memory_usage();
|
|
});
|
|
}},
|
|
{"query()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query(s, cmd, query::result_options::only_result(), partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<lw_shared_ptr<query::result>, cache_temperature>& res) {
|
|
return size_t(std::get<0>(res)->buf().size());
|
|
});
|
|
}},
|
|
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
|
return std::get<0>(res)->memory_usage();
|
|
});
|
|
}}
|
|
};
|
|
|
|
for (auto [query_method_name, query_method] : query_methods) {
|
|
for (auto allow_short_read : {true, false}) {
|
|
for (auto max_size : {1024u, 1024u * 1024u, 1024u * 1024u * 1024u}) {
|
|
const auto should_throw = max_size < (num_rows * value.size() * 2) && !allow_short_read;
|
|
testlog.info("checking: query_method={}, allow_short_read={}, max_size={}, should_throw={}", query_method_name, allow_short_read, max_size, should_throw);
|
|
auto slice = s->full_slice();
|
|
if (allow_short_read) {
|
|
slice.options.set<query::partition_slice::option::allow_short_read>();
|
|
} else {
|
|
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
|
}
|
|
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
|
try {
|
|
auto size = query_method(s, cmd).get();
|
|
// Just to ensure we are not interpreting empty results as success.
|
|
BOOST_REQUIRE(size != 0);
|
|
if (should_throw) {
|
|
BOOST_FAIL("Expected exception, but none was thrown.");
|
|
} else {
|
|
testlog.trace("No exception thrown, as expected.");
|
|
}
|
|
} catch (std::runtime_error& e) {
|
|
if (should_throw) {
|
|
testlog.trace("Exception thrown, as expected: {}", e.what());
|
|
} else {
|
|
BOOST_FAIL(fmt::format("Expected no exception, but caught: {}", e.what()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}).get();
|
|
}
|
|
|
|
// Check that mutation queries, those that are stopped when the memory
|
|
// consumed by their results reach the local/global limit, are aborted
|
|
// instead of silently terminated when this happens.
|
|
SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
|
|
auto cfg = cql_test_config{};
|
|
cfg.dbcfg.emplace();
|
|
// The memory available to the result memory limiter (global limit) is
|
|
// configured based on the available memory, so give a small amount to
|
|
// the "node", so we don't have to work with large amount of data.
|
|
cfg.dbcfg->available_memory = 2 * 1024 * 1024;
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
|
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tab = db.find_column_family("ks", "test");
|
|
auto s = tab.schema();
|
|
|
|
auto pk = tests::generate_partition_key(s);
|
|
const auto cql3_pk = cql3::raw_value::make_value(pk.key().explode().front());
|
|
|
|
const auto value = sstring(1024, 'a');
|
|
const auto raw_value = utf8_type->decompose(data_value(value));
|
|
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
|
|
|
const int num_rows = 1024;
|
|
const auto max_size = 1024u * 1024u * 1024u;
|
|
|
|
for (int i = 0; i != num_rows; ++i) {
|
|
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
|
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
|
}
|
|
|
|
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
|
|
|
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
|
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
|
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
|
return std::get<0>(res).memory_usage();
|
|
});
|
|
}},
|
|
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
|
return std::get<0>(res)->memory_usage();
|
|
});
|
|
}}
|
|
};
|
|
|
|
for (auto [query_method_name, query_method] : query_methods) {
|
|
testlog.info("checking: query_method={}", query_method_name);
|
|
auto slice = s->full_slice();
|
|
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
|
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
|
try {
|
|
auto size = query_method(s, cmd).get();
|
|
// Just to ensure we are not interpreting empty results as success.
|
|
BOOST_REQUIRE(size != 0);
|
|
BOOST_FAIL("Expected exception, but none was thrown.");
|
|
} catch (std::runtime_error& e) {
|
|
testlog.trace("Exception thrown, as expected: {}", e.what());
|
|
}
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_selection_test) {
|
|
cql_test_config cfg;
|
|
|
|
scheduling_group unknown_scheduling_group = create_scheduling_group("unknown", 800).get();
|
|
auto cleanup_unknown_scheduling_group = defer([&unknown_scheduling_group] {
|
|
destroy_scheduling_group(unknown_scheduling_group).get();
|
|
});
|
|
|
|
const auto user_semaphore = std::mem_fn(&database_test_wrapper::get_user_read_concurrency_semaphore);
|
|
const auto system_semaphore = std::mem_fn(&database_test_wrapper::get_system_read_concurrency_semaphore);
|
|
const auto streaming_semaphore = std::mem_fn(&database_test_wrapper::get_streaming_read_concurrency_semaphore);
|
|
|
|
std::vector<std::pair<scheduling_group, std::function<reader_concurrency_semaphore&(database_test_wrapper&)>>> scheduling_group_and_expected_semaphore{
|
|
{default_scheduling_group(), system_semaphore}
|
|
};
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.compaction_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memory_compaction_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.streaming_scheduling_group, streaming_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.statement_scheduling_group, user_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memtable_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memtable_to_cache_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.gossip_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(unknown_scheduling_group, user_semaphore);
|
|
|
|
do_with_cql_env_thread([&scheduling_group_and_expected_semaphore] (cql_test_env& e) {
|
|
auto& db = e.local_db();
|
|
database_test_wrapper tdb(db);
|
|
for (const auto& [sched_group, expected_sem_getter] : scheduling_group_and_expected_semaphore) {
|
|
with_scheduling_group(sched_group, [&db, sched_group = sched_group, &tdb, &expected_sem_getter = expected_sem_getter] {
|
|
auto expected_sem_ptr = &expected_sem_getter(tdb);
|
|
auto& sem = db.get_reader_concurrency_semaphore();
|
|
if (&sem != expected_sem_ptr) {
|
|
BOOST_FAIL(fmt::format("Unexpected semaphore for scheduling group {}, expected {}, got {}", sched_group.name(), expected_sem_ptr->name(), sem.name()));
|
|
}
|
|
}).get();
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(max_result_size_for_query_selection_test) {
|
|
cql_test_config cfg;
|
|
|
|
cfg.db_config->max_memory_for_unlimited_query_soft_limit(1 * 1024 * 1024, utils::config_file::config_source::CommandLine);
|
|
cfg.db_config->max_memory_for_unlimited_query_hard_limit(2 * 1024 * 1024, utils::config_file::config_source::CommandLine);
|
|
|
|
scheduling_group unknown_scheduling_group = create_scheduling_group("unknown", 800).get();
|
|
auto cleanup_unknown_scheduling_group = defer([&unknown_scheduling_group] {
|
|
destroy_scheduling_group(unknown_scheduling_group).get();
|
|
});
|
|
|
|
const auto user_max_result_size = query::max_result_size(
|
|
cfg.db_config->max_memory_for_unlimited_query_soft_limit(),
|
|
cfg.db_config->max_memory_for_unlimited_query_hard_limit(),
|
|
query::result_memory_limiter::maximum_result_size);
|
|
const auto system_max_result_size = query::max_result_size(
|
|
query::result_memory_limiter::unlimited_result_size,
|
|
query::result_memory_limiter::unlimited_result_size,
|
|
query::result_memory_limiter::maximum_result_size);
|
|
const auto maintenance_max_result_size = system_max_result_size;
|
|
|
|
std::vector<std::pair<scheduling_group, query::max_result_size>> scheduling_group_and_expected_max_result_size{
|
|
{default_scheduling_group(), system_max_result_size}
|
|
};
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.compaction_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memory_compaction_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.streaming_scheduling_group, maintenance_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.statement_scheduling_group, user_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memtable_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memtable_to_cache_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.gossip_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(unknown_scheduling_group, user_max_result_size);
|
|
|
|
do_with_cql_env_thread([&scheduling_group_and_expected_max_result_size] (cql_test_env& e) {
|
|
auto& db = e.local_db();
|
|
database_test_wrapper tdb(db);
|
|
for (const auto& [sched_group, expected_max_size] : scheduling_group_and_expected_max_result_size) {
|
|
with_scheduling_group(sched_group, [&db, sched_group = sched_group, expected_max_size = expected_max_size] {
|
|
const auto max_size = db.get_query_max_result_size();
|
|
if (max_size != expected_max_size) {
|
|
BOOST_FAIL(fmt::format("Unexpected max_size for scheduling group {}, expected {{{}, {}}}, got {{{}, {}}}",
|
|
sched_group.name(),
|
|
expected_max_size.soft_limit,
|
|
expected_max_size.hard_limit,
|
|
max_size.soft_limit,
|
|
max_size.hard_limit));
|
|
}
|
|
}).get();
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
// Check that during a multi-page range scan:
|
|
// * semaphore mismatch is detected
|
|
// * code is exception safe w.r.t. to the mismatch exception, e.g. readers are closed properly
|
|
SEASTAR_TEST_CASE(multipage_range_scan_semaphore_mismatch) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const auto do_abort = set_abort_on_internal_error(false);
|
|
auto reset_abort = defer([do_abort] {
|
|
set_abort_on_internal_error(do_abort);
|
|
});
|
|
e.execute_cql("CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
|
|
|
|
auto insert_id = e.prepare("INSERT INTO ks.tbl(pk, ck, v) VALUES(?, ?, ?)").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tbl = db.find_column_family("ks", "tbl");
|
|
auto s = tbl.schema();
|
|
|
|
auto dk = tests::generate_partition_key(tbl.schema());
|
|
const auto pk = cql3::raw_value::make_value(managed_bytes(*dk.key().begin(*s)));
|
|
const auto v = cql3::raw_value::make_value(int32_type->decompose(0));
|
|
for (int32_t ck = 0; ck < 100; ++ck) {
|
|
e.execute_prepared(insert_id, {pk, cql3::raw_value::make_value(int32_type->decompose(ck)), v}).get();
|
|
}
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
query::read_command cmd1(
|
|
s->id(),
|
|
s->version(),
|
|
s->full_slice(),
|
|
query::max_result_size(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()),
|
|
query::tombstone_limit::max,
|
|
query::row_limit(4),
|
|
query::partition_limit::max,
|
|
gc_clock::now(),
|
|
std::nullopt,
|
|
query_id::create_random_id(),
|
|
query::is_first_page::yes);
|
|
|
|
auto cmd2 = cmd1;
|
|
auto cr = query::clustering_range::make_starting_with({clustering_key::from_single_value(*s, int32_type->decompose(3)), false});
|
|
cmd2.slice = partition_slice_builder(*s).set_specific_ranges(query::specific_ranges(dk.key(), {cr})).build();
|
|
cmd2.is_first_page = query::is_first_page::no;
|
|
|
|
auto pr = dht::partition_range::make_starting_with({dk, true});
|
|
auto prs = dht::partition_range_vector{pr};
|
|
|
|
auto read_page = [&] (scheduling_group sg, const query::read_command& cmd) {
|
|
with_scheduling_group(sg, [&] {
|
|
return query_data_on_all_shards(e.db(), s, cmd, prs, query::result_options::only_result(), {}, db::no_timeout);
|
|
}).get();
|
|
};
|
|
|
|
read_page(default_scheduling_group(), cmd1);
|
|
BOOST_REQUIRE_EXCEPTION(read_page(sched_groups.statement_scheduling_group, cmd2), std::runtime_error,
|
|
testing::exception_predicate::message_contains("semaphore mismatch detected, dropping reader"));
|
|
});
|
|
}
|
|
|
|
// Test `upgrade_sstables` on all keyspaces (including the system keyspace).
|
|
// Refs: #9494 (https://github.com/scylladb/scylla/issues/9494)
|
|
SEASTAR_TEST_CASE(upgrade_sstables) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.db().invoke_on_all([] (replica::database& db) -> future<> {
|
|
auto& cm = db.get_compaction_manager();
|
|
for (auto& [ks_name, ks] : db.get_keyspaces()) {
|
|
const auto& erm = ks.get_static_effective_replication_map();
|
|
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(co_await db.get_keyspace_local_ranges(erm));
|
|
for (auto& [cf_name, schema] : ks.metadata()->cf_meta_data()) {
|
|
auto& t = db.find_column_family(schema->id());
|
|
constexpr bool exclude_current_version = false;
|
|
co_await t.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) {
|
|
return cm.perform_sstable_upgrade(owned_ranges_ptr, ts, exclude_current_version, tasks::task_info{});
|
|
});
|
|
}
|
|
}
|
|
}).get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(per_service_level_reader_concurrency_semaphore_test) {
|
|
cql_test_config cfg;
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const size_t num_service_levels = 3;
|
|
const size_t num_keys_to_insert = 10;
|
|
const size_t num_individual_reads_to_test = 50;
|
|
auto& db = e.local_db();
|
|
database_test_wrapper dbt(db);
|
|
size_t total_memory = dbt.get_total_user_reader_concurrency_semaphore_memory();
|
|
sharded<qos::service_level_controller>& sl_controller = e.service_level_controller_service();
|
|
std::array<sstring, num_service_levels> sl_names;
|
|
qos::service_level_options slo;
|
|
size_t expected_total_weight = 200; // 200 from `sl:driver`
|
|
auto index_to_weight = [] (size_t i) -> size_t {
|
|
return (i + 1)*100;
|
|
};
|
|
|
|
// make the default service level take as little memory as possible
|
|
slo.shares.emplace<int32_t>(1);
|
|
expected_total_weight += 1;
|
|
sl_controller.local().add_service_level(qos::service_level_controller::default_service_level_name, slo).get();
|
|
|
|
// Just to make the code more readable.
|
|
auto get_reader_concurrency_semaphore_for_sl = [&] (sstring sl_name) -> reader_concurrency_semaphore& {
|
|
return *sl_controller.local().with_service_level(sl_name, noncopyable_function<reader_concurrency_semaphore*()>([&] {
|
|
return &db.get_reader_concurrency_semaphore();
|
|
})).get();
|
|
};
|
|
|
|
for (unsigned i = 0; i < num_service_levels; i++) {
|
|
sstring sl_name = format("sl{}", i);
|
|
slo.shares.emplace<int32_t>(index_to_weight(i));
|
|
sl_controller.local().add_service_level(sl_name, slo).get();
|
|
expected_total_weight += index_to_weight(i);
|
|
// Make sure that the total weight is tracked correctly in the semaphore group
|
|
BOOST_REQUIRE_EQUAL(expected_total_weight, dbt.get_total_user_reader_concurrency_semaphore_weight());
|
|
sl_names[i] = sl_name;
|
|
size_t total_distributed_memory = 0;
|
|
// Include `sl:driver` in computations
|
|
total_distributed_memory += get_reader_concurrency_semaphore_for_sl("driver").available_resources().memory;
|
|
for (unsigned j = 0 ; j <= i ; j++) {
|
|
reader_concurrency_semaphore& sem = get_reader_concurrency_semaphore_for_sl(sl_names[j]);
|
|
// Make sure that all semaphores that has been created until now - have the right amount of available memory
|
|
// after the operation has ended.
|
|
// We allow for a small delta of up to num_service_levels. This allows an off-by-one for each semaphore,
|
|
// the remainder being added to one of the semaphores.
|
|
// We make sure this didn't leak/create memory by checking the total below.
|
|
const auto delta = std::abs(ssize_t((index_to_weight(j) * total_memory) / expected_total_weight) - sem.available_resources().memory);
|
|
BOOST_REQUIRE_LE(delta, num_service_levels);
|
|
total_distributed_memory += sem.available_resources().memory;
|
|
}
|
|
total_distributed_memory += get_reader_concurrency_semaphore_for_sl(qos::service_level_controller::default_service_level_name).available_resources().memory;
|
|
BOOST_REQUIRE_EQUAL(total_distributed_memory, total_memory);
|
|
}
|
|
|
|
auto get_semaphores_stats_snapshot = [&] () {
|
|
std::unordered_map<sstring, reader_concurrency_semaphore::stats> snapshot;
|
|
for (auto&& sl_name : sl_names) {
|
|
snapshot[sl_name] = get_reader_concurrency_semaphore_for_sl(sl_name).get_stats();
|
|
}
|
|
return snapshot;
|
|
};
|
|
e.execute_cql("CREATE TABLE tbl (a int, b int, PRIMARY KEY (a));").get();
|
|
|
|
for (unsigned i = 0; i < num_keys_to_insert; i++) {
|
|
for (unsigned j = 0; j < num_keys_to_insert; j++) {
|
|
e.execute_cql(format("INSERT INTO tbl(a, b) VALUES ({}, {});", i, j)).get();
|
|
}
|
|
}
|
|
|
|
for (unsigned i = 0; i < num_individual_reads_to_test; i++) {
|
|
int random_service_level = tests::random::get_int(num_service_levels - 1);
|
|
auto snapshot_before = get_semaphores_stats_snapshot();
|
|
|
|
sl_controller.local().with_service_level(sl_names[random_service_level], noncopyable_function<future<>()> ([&] {
|
|
return e.execute_cql("SELECT * FROM tbl;").discard_result();
|
|
})).get();
|
|
auto snapshot_after = get_semaphores_stats_snapshot();
|
|
for (auto& [sl_name, stats] : snapshot_before) {
|
|
// Make sure that the only semaphore that experienced any activity (at least measured activity) is
|
|
// the semaphore that belongs to the current service level.
|
|
BOOST_REQUIRE((stats == snapshot_after[sl_name] && sl_name != sl_names[random_service_level]) ||
|
|
(stats != snapshot_after[sl_name] && sl_name == sl_names[random_service_level]));
|
|
}
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(populate_from_quarantine_works) {
|
|
auto tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
locator::host_id host_id;
|
|
|
|
// populate tmpdir_for_data and
|
|
// move a random sstable to quarantine
|
|
co_await do_with_some_data({"cf"}, [&host_id] (cql_test_env& e) -> future<> {
|
|
host_id = e.local_db().get_token_metadata().get_my_id();
|
|
auto& db = e.db();
|
|
co_await db.invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
});
|
|
auto shard = tests::random::get_int<unsigned>(0, smp::count);
|
|
auto found = false;
|
|
for (unsigned i = 0; i < smp::count && !found; i++) {
|
|
found = co_await db.invoke_on((shard + i) % smp::count, [] (replica::database& db) -> future<bool> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
bool found = false;
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
|
|
testlog.debug("Moving sstable #{} out of {} to quarantine", idx, sstables.size());
|
|
auto sst = sstables[idx];
|
|
co_await sst->change_state(sstables::sstable_state::quarantine);
|
|
found |= true;
|
|
});
|
|
co_return found;
|
|
});
|
|
}
|
|
BOOST_REQUIRE(found);
|
|
}, false, db_cfg_ptr);
|
|
|
|
// reload the table from tmpdir_for_data and
|
|
// verify that all rows are still there
|
|
size_t row_count = 0;
|
|
cql_test_config test_config(db_cfg_ptr);
|
|
test_config.host_id = host_id;
|
|
co_await do_with_cql_env([&row_count] (cql_test_env& e) -> future<> {
|
|
auto res = co_await e.execute_cql("select * from ks.cf;");
|
|
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
|
|
BOOST_REQUIRE(rows);
|
|
row_count = rows->rs().result_set().size();
|
|
}, std::move(test_config));
|
|
BOOST_REQUIRE_EQUAL(row_count, 6);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
|
return do_with_some_data({"cf"}, [] (cql_test_env& e) -> future<> {
|
|
auto& db = e.db();
|
|
co_await db.invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
});
|
|
|
|
std::set<sstring> expected = {
|
|
"manifest.json",
|
|
};
|
|
|
|
// move a random sstable to quarantine
|
|
auto shard = tests::random::get_int<unsigned>(0, smp::count);
|
|
auto found = false;
|
|
for (unsigned i = 0; i < smp::count; i++) {
|
|
co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
// collect all expected sstable data files
|
|
for (auto sst : sstables) {
|
|
expected.insert(sst->component_basename(sstables::component_type::Data));
|
|
}
|
|
if (std::exchange(found, true)) {
|
|
co_return;
|
|
}
|
|
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
|
|
auto sst = sstables[idx];
|
|
co_await sst->change_state(sstables::sstable_state::quarantine);
|
|
});
|
|
});
|
|
}
|
|
BOOST_REQUIRE(found);
|
|
|
|
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
|
|
|
|
testlog.debug("Expected: {}", expected);
|
|
|
|
// snapshot triggered a flush and wrote the data down.
|
|
BOOST_REQUIRE_GT(expected.size(), 1);
|
|
|
|
auto& cf = db.local().find_column_family("ks", "cf");
|
|
|
|
auto in_snap_dir = co_await collect_files(table_dir(cf) / sstables::snapshots_dir / "test");
|
|
// all files were copied and manifest was generated
|
|
BOOST_REQUIRE(std::includes(in_snap_dir.begin(), in_snap_dir.end(), expected.begin(), expected.end()));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
auto& tbl = db.find_column_family("ks", "cf");
|
|
|
|
auto op = std::optional(tbl.read_in_progress());
|
|
auto s = tbl.schema();
|
|
auto q = replica::querier(
|
|
tbl.as_mutation_source(),
|
|
tbl.schema(),
|
|
database_test_wrapper(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}),
|
|
query::full_partition_range,
|
|
s->full_slice(),
|
|
nullptr,
|
|
tombstone_gc_state::no_gc());
|
|
|
|
auto f = replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
|
|
|
|
// we add a querier to the querier cache while the drop is ongoing
|
|
auto& qc = db.get_querier_cache();
|
|
qc.insert_data_querier(query_id::create_random_id(), std::move(q), nullptr);
|
|
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
|
|
|
|
op.reset(); // this should allow the drop to finish
|
|
f.get();
|
|
|
|
// the drop should have cleaned up all entries belonging to that table
|
|
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
|
|
});
|
|
}
|
|
|
|
static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = format("table_with_auto_snapshot_{}", auto_snapshot ? "enabled" : "disabled");
|
|
auto tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
db_cfg_ptr->auto_snapshot(auto_snapshot);
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// Pass `with_snapshot=true` to drop_table_on_all
|
|
// to allow auto_snapshot (based on the configuration above).
|
|
// The table directory should therefore exist after the table is dropped if auto_snapshot is disabled in the configuration.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, auto_snapshot);
|
|
co_return;
|
|
}, false, db_cfg_ptr);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_auto_snapshot_enabled) {
|
|
return test_drop_table_with_auto_snapshot(true);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_auto_snapshot_disabled) {
|
|
return test_drop_table_with_auto_snapshot(false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "table_with_no_snapshot";
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// Pass `with_snapshot=false` to drop_table_on_all
|
|
// to disallow auto_snapshot.
|
|
// The table directory should therefore not exist after the table is dropped.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, false);
|
|
co_return;
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "table_with_explicit_snapshot";
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto snapshot_tag = format("test-{}", db_clock::now().time_since_epoch().count());
|
|
co_await take_snapshot(e, ks_name, table_name, snapshot_tag);
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// With explicit snapshot and with_snapshot=false
|
|
// dir should still be kept, regardless of the
|
|
// with_snapshot parameter and auto_snapshot config.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, true);
|
|
co_return;
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(mutation_dump_generated_schema_deterministic_id_version) {
|
|
simple_schema s;
|
|
auto os1 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
|
|
auto os2 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
|
|
|
|
BOOST_REQUIRE_EQUAL(os1->id(), os2->id());
|
|
BOOST_REQUIRE_EQUAL(os1->version(), os2->version());
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(enable_drained_compaction_manager) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.db().invoke_on_all([] (replica::database& db) -> future<> {
|
|
auto& cm = db.get_compaction_manager();
|
|
co_await cm.drain();
|
|
cm.enable();
|
|
}).get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_drop_quarantined_sstables) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
|
|
for (int i = 0; i < 100; i++) {
|
|
e.execute_cql(format("insert into cf (p, c) values ('key{}', {})", i * i, i)).get();
|
|
e.db().invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
}).get();
|
|
}
|
|
|
|
auto initial_sstable_count = e.db().map_reduce0(
|
|
[] (replica::database& db) -> future<size_t> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
co_return cf.sstables_count();
|
|
},
|
|
0,
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_GT(initial_sstable_count, 0);
|
|
|
|
auto quarantined_count = e.db().map_reduce0(
|
|
[] (replica::database& _db) -> future<size_t> {
|
|
auto& cf = _db.find_column_family("ks", "cf");
|
|
size_t quarantined_on_shard = 0;
|
|
auto& cm = cf.get_compaction_manager();
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
return cm.run_with_compaction_disabled(ts, [&] () -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
auto quarantine_n = 1 + tests::random::get_int<size_t>(sstables.size() / 5);
|
|
quarantined_on_shard += quarantine_n;
|
|
for (size_t i = 0; i < quarantine_n; i++) {
|
|
co_await sstables[i]->change_state(sstables::sstable_state::quarantine);
|
|
}
|
|
});
|
|
});
|
|
co_return quarantined_on_shard;
|
|
},
|
|
size_t(0),
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_GT(quarantined_count, 0);
|
|
|
|
e.db().invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.drop_quarantined_sstables();
|
|
}).get();
|
|
|
|
auto remaining_quarantined = e.db().map_reduce0(
|
|
[] (replica::database& db) -> future<size_t> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
auto& sstables = *cf.get_sstables();
|
|
co_return std::count_if(sstables.begin(), sstables.end(), [] (shared_sstable sst) {
|
|
return sst->is_quarantined();
|
|
});
|
|
},
|
|
size_t(0),
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_EQUAL(remaining_quarantined, 0);
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_gc_state_snapshot) {
|
|
auto table_gc_mode_timeout = schema_builder("test", "table_gc_mode_timeout")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "timeout"} }))
|
|
.set_gc_grace_seconds(10)
|
|
.build();
|
|
auto table_gc_mode_disabled = schema_builder("test", "table_gc_mode_disabled")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "disabled"} }))
|
|
.build();
|
|
auto table_gc_mode_immediate = schema_builder("test", "table_gc_mode_immediate")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "immediate"} }))
|
|
.build();
|
|
auto table_gc_mode_repair1 = schema_builder("test", "table_gc_mode_repair1")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "188"} }))
|
|
.build();
|
|
auto table_gc_mode_repair2 = schema_builder("test", "table_gc_mode_repair2")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "288"} }))
|
|
.build();
|
|
auto table_gc_mode_repair3 = schema_builder("test", "table_gc_mode_repair3")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "388"} }))
|
|
.build();
|
|
|
|
schema_builder::register_schema_initializer([] (schema_builder& builder) {
|
|
if (builder.ks_name() == "test" && builder.cf_name() == "table_gc_mode_group0") {
|
|
builder.set_is_group0_table();
|
|
}
|
|
});
|
|
auto table_gc_mode_group0 = schema_builder("test", "table_gc_mode_group0")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.build();
|
|
|
|
BOOST_REQUIRE(table_gc_mode_group0->static_props().is_group0_table);
|
|
|
|
// One pk to rule them all, all schemas have the same partition key, so we
|
|
// can reuse a single key for this test.
|
|
const auto pk = partition_key::from_single_value(*table_gc_mode_timeout, utf8_type->decompose(data_value("pk")));
|
|
const auto dk = dht::decorate_key(*table_gc_mode_timeout, pk);
|
|
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dk.token());
|
|
|
|
shared_tombstone_gc_state shared_state;
|
|
|
|
const auto first_repair_time = gc_clock::now() - gc_clock::duration(std::chrono::hours(6));
|
|
|
|
shared_state.update_repair_time(table_gc_mode_repair1->id(), repair_range, first_repair_time);
|
|
shared_state.update_repair_time(table_gc_mode_repair2->id(), repair_range, first_repair_time);
|
|
|
|
shared_state.update_group0_refresh_time(first_repair_time);
|
|
|
|
auto snapshot = shared_state.snapshot();
|
|
BOOST_REQUIRE_LE(gc_clock::now() - snapshot.query_time(), gc_clock::duration(std::chrono::seconds(1)));
|
|
|
|
// Advance gc clock and change the gc state to simulate a later point in time.
|
|
// Then check that gc-before against the shared-state yields the current
|
|
// state, while gc-before against the snapshot yields the before state.
|
|
|
|
const auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(6));
|
|
const auto gc_state = tombstone_gc_state(shared_state).with_commitlog_check_disabled();
|
|
|
|
const auto second_repair_time = gc_clock::now() + gc_clock::duration(std::chrono::hours(3));
|
|
|
|
shared_state.update_repair_time(table_gc_mode_repair1->id(), repair_range, second_repair_time);
|
|
shared_state.drop_repair_history_for_table(table_gc_mode_repair2->id());
|
|
shared_state.update_repair_time(table_gc_mode_repair3->id(), repair_range, second_repair_time);
|
|
|
|
shared_state.update_group0_refresh_time(second_repair_time);
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_timeout, dk, now), now - table_gc_mode_timeout->gc_grace_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_timeout, dk, false), snapshot.query_time() - table_gc_mode_timeout->gc_grace_seconds());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_disabled, dk, now), gc_clock::time_point::min());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_disabled, dk, false), gc_clock::time_point::min());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_immediate, dk, now), now);
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_immediate, dk, false), snapshot.query_time());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair1, dk, now), second_repair_time - table_gc_mode_repair1->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair2, dk, now), gc_clock::time_point::min());
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair3, dk, now), second_repair_time - table_gc_mode_repair3->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair1, dk, false), first_repair_time - table_gc_mode_repair1->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair2, dk, false), first_repair_time - table_gc_mode_repair2->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair3, dk, false), gc_clock::time_point::min());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_group0, dk, now), second_repair_time);
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_group0, dk, false), first_repair_time);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_gc_state_snapshot_rf_one_tables) {
|
|
auto table_gc_mode_repair1 = schema_builder("test", "table_gc_mode_repair1")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "188"} }))
|
|
.build();
|
|
|
|
auto table_gc_mode_repair2 = schema_builder("test", "table_gc_mode_repair2")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "188"} }))
|
|
.build();
|
|
|
|
// One pk to rule them all, all schemas have the same partition key, so we
|
|
// can reuse a single key for this test.
|
|
const auto pk = partition_key::from_single_value(*table_gc_mode_repair1, utf8_type->decompose(data_value("pk")));
|
|
const auto dk = dht::decorate_key(*table_gc_mode_repair1, pk);
|
|
|
|
shared_tombstone_gc_state shared_state;
|
|
shared_state.set_table_rf_one(table_gc_mode_repair1->id());
|
|
|
|
const auto now = gc_clock::now();
|
|
const auto gc_state = tombstone_gc_state(shared_state).with_commitlog_check_disabled();
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair1, dk, now), now);
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair2, dk, now), gc_clock::time_point::min());
|
|
|
|
auto snapshot = shared_state.snapshot();
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair1, dk, now), now);
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair2, dk, now), gc_clock::time_point::min());
|
|
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair1, dk, false), snapshot.query_time());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair2, dk, false), gc_clock::time_point::min());
|
|
|
|
shared_state.set_table_rf_n(table_gc_mode_repair1->id());
|
|
shared_state.set_table_rf_one(table_gc_mode_repair2->id());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair1, dk, now), gc_clock::time_point::min());
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair2, dk, now), now);
|
|
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair1, dk, false), snapshot.query_time());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair2, dk, false), gc_clock::time_point::min());
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_max_purgeable_combine) {
|
|
const gc_clock::time_point t_pre_treshold = gc_clock::now();
|
|
const gc_clock::time_point t1 = t_pre_treshold + std::chrono::seconds(10);
|
|
const gc_clock::time_point t2 = t1 + std::chrono::seconds(10);
|
|
const gc_clock::time_point t_post_treshold = t2 + std::chrono::seconds(10);
|
|
|
|
auto check_tombstone = [] (const max_purgeable& mp, const max_purgeable& combined, tombstone t, bool expected_can_purge, std::source_location loc = std::source_location::current()) {
|
|
testlog.trace("check_tombstone({}, {}, {}, {}) @ {}:{}", mp, combined, t, expected_can_purge, loc.file_name(), loc.line());
|
|
BOOST_REQUIRE_EQUAL(expected_can_purge, mp.can_purge(t).can_purge);
|
|
if (!expected_can_purge) {
|
|
// The combined max_purgeable can be weaker than this input, if
|
|
// the other input had no expiry threshold.
|
|
BOOST_REQUIRE(!combined.can_purge(t).can_purge);
|
|
}
|
|
};
|
|
auto check_tombstones = [&] (const max_purgeable& mp, const max_purgeable& combined) {
|
|
if (mp) {
|
|
check_tombstone(mp, combined, {mp.timestamp() - 1, t_post_treshold}, true);
|
|
check_tombstone(mp, combined, {mp.timestamp() + 1, t_post_treshold}, false);
|
|
if (mp.expiry_threshold()) {
|
|
check_tombstone(mp, combined, {mp.timestamp() - 1, t_pre_treshold}, true);
|
|
check_tombstone(mp, combined, {mp.timestamp() + 1, t_pre_treshold}, true);
|
|
}
|
|
} else {
|
|
check_tombstone(mp, combined, {1, t_post_treshold}, true);
|
|
}
|
|
};
|
|
|
|
auto check = [&] (const max_purgeable& a, const max_purgeable& b, const max_purgeable& expected, std::source_location loc = std::source_location::current()) {
|
|
auto combined = a;
|
|
combined.combine(b);
|
|
|
|
testlog.debug("combine({}, {}) => {} == {} @ {}:{}", a, b, combined, expected, loc.file_name(), loc.line());
|
|
|
|
BOOST_REQUIRE(a || b);
|
|
BOOST_REQUIRE_EQUAL(combined, expected);
|
|
|
|
check_tombstones(a, combined);
|
|
check_tombstones(b, combined);
|
|
};
|
|
|
|
check({}, max_purgeable{100}, max_purgeable{100});
|
|
check(max_purgeable{100}, {}, max_purgeable{100});
|
|
check(max_purgeable{10}, max_purgeable{100}, max_purgeable{10});
|
|
|
|
const auto ts_mt = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
|
|
const auto ts_sst = max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data;
|
|
|
|
check({}, max_purgeable{100, ts_mt}, max_purgeable{100, ts_mt});
|
|
check(max_purgeable{100, ts_mt}, {}, max_purgeable{100, ts_mt});
|
|
check(max_purgeable{10, ts_sst}, max_purgeable{100, ts_mt}, max_purgeable{10, ts_sst});
|
|
|
|
check({}, max_purgeable{100, t1, ts_mt}, max_purgeable{100, t1, ts_mt});
|
|
check(max_purgeable{10, ts_mt}, max_purgeable{100, t1, ts_sst}, max_purgeable{10, ts_mt});
|
|
check(max_purgeable{100, ts_mt}, max_purgeable{10, t1, ts_sst}, max_purgeable{10, ts_sst});
|
|
check(max_purgeable{10, t1, ts_mt}, max_purgeable{100, t2, ts_sst}, max_purgeable{10, t1, ts_mt});
|
|
check(max_purgeable{100, t1, ts_mt}, max_purgeable{10, t2, ts_sst}, max_purgeable{10, t1, ts_sst});
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_max_purgeable_can_purge) {
|
|
const gc_clock::time_point t_pre_treshold = gc_clock::now();
|
|
const gc_clock::time_point t1 = t_pre_treshold + std::chrono::seconds(10);
|
|
const gc_clock::time_point t_post_treshold = t1 + std::chrono::seconds(10);
|
|
|
|
const auto ts_mt = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
|
|
const auto ts_sst = max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data;
|
|
|
|
auto check = [] (const max_purgeable& mp, tombstone t, bool expected_can_gc) {
|
|
const auto res = mp.can_purge(t);
|
|
BOOST_REQUIRE_EQUAL(res.can_purge, expected_can_gc);
|
|
BOOST_REQUIRE_EQUAL(res.timestamp_source, mp.source());
|
|
};
|
|
|
|
check({}, {100, t1}, true);
|
|
|
|
check(max_purgeable{10, ts_mt}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{100, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{200, ts_sst}, tombstone{100, t_post_treshold}, true);
|
|
|
|
check(max_purgeable{10, t1, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{100, t1, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{200, t1, ts_sst}, tombstone{100, t_post_treshold}, true);
|
|
|
|
check(max_purgeable{10, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
check(max_purgeable{100, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
check(max_purgeable{200, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_query_tombstone_gc) {
|
|
return do_with_cql_env_thread([] (cql_test_env& env) {
|
|
const auto keyspace_name = get_name();
|
|
const auto table_name = "tbl";
|
|
|
|
// Can use tablets and RF=1 after #21623 is fixed.
|
|
env.execute_cql(std::format("CREATE KEYSPACE {} WITH"
|
|
" replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND"
|
|
" tablets = {{'enabled': 'false'}}", keyspace_name)).get();
|
|
env.execute_cql(std::format("CREATE TABLE {}.{} (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
|
|
" WITH compaction = {{'class': 'NullCompactionStrategy'}}"
|
|
" AND tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 0}}", keyspace_name, table_name)).get();
|
|
|
|
auto& db = env.local_db();
|
|
auto& tbl = db.find_column_family(keyspace_name, table_name);
|
|
const auto schema = tbl.schema();
|
|
const auto tid = schema->id();
|
|
|
|
const auto pk_value = 1;
|
|
const auto pk = partition_key::from_exploded(*schema, {data_value(pk_value).serialize_nonnull()});
|
|
const auto dk = dht::decorate_key(*schema, pk);
|
|
const auto key_shard = tbl.shard_for_reads(dk.token());
|
|
|
|
env.execute_cql(format("DELETE FROM {}.{} WHERE pk = 1 AND ck = 1", keyspace_name, table_name, pk_value)).get();
|
|
|
|
env.db().invoke_on(key_shard, [] (replica::database& db) {
|
|
return db.flush_commitlog();
|
|
}).get();
|
|
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
const auto repair_time = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
|
|
env.db().invoke_on_all([tid, &repair_range, &repair_time] (replica::database& db) {
|
|
auto& tbl = db.find_column_family(tid);
|
|
tbl.get_compaction_manager().get_shared_tombstone_gc_state().update_repair_time(tid, repair_range, repair_time);
|
|
}).get();
|
|
|
|
testlog.info("repair_time: {}", repair_time);
|
|
|
|
auto slice = partition_slice_builder(*schema, schema->full_slice())
|
|
.with_option<query::partition_slice::option::bypass_cache>()
|
|
.build();
|
|
const auto cmd = query::read_command(schema->id(), schema->version(), slice, db.get_query_max_result_size(), query::tombstone_limit::max);
|
|
const auto pr = dht::partition_range::make_singular(dk);
|
|
|
|
env.db().invoke_on(key_shard, [tid, &cmd, &pr] (replica::database& db) -> future<> {
|
|
auto& tbl = db.find_column_family(tid);
|
|
const auto schema = tbl.schema();
|
|
auto permit = co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {});
|
|
auto accounter = co_await db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, query::short_read::no);
|
|
|
|
const auto res = co_await tbl.mutation_query(schema, std::move(permit), cmd, pr, {}, std::move(accounter), db::no_timeout);
|
|
BOOST_CHECK_EQUAL(res.partitions().size(), 0);
|
|
}).get();
|
|
|
|
{
|
|
const auto res = replica::query_mutations_on_all_shards(env.db(), schema, cmd, {pr}, {}, db::no_timeout).get();
|
|
BOOST_CHECK_EQUAL(std::get<0>(res)->partitions().size(), 0);
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_tombstone_gc_state_gc_mode) {
|
|
const auto compact_range = dht::token_range::make(dht::token(100), dht::token(200));
|
|
|
|
auto check = [&] (tombstone_gc_state tombstone_gc, schema_ptr schema, dht::decorated_key dk, gc_clock::time_point now,
|
|
gc_clock::time_point expected_gc_before, std::source_location loc = std::source_location::current()) {
|
|
testlog.info("check() @ {}:{}", loc.file_name(), loc.line());
|
|
|
|
auto gc_before = tombstone_gc.get_gc_before_for_key(schema, dk, now);
|
|
BOOST_REQUIRE_EQUAL(gc_before, expected_gc_before);
|
|
|
|
auto gc_res = tombstone_gc.get_gc_before_for_range(schema, compact_range, now);
|
|
BOOST_REQUIRE_EQUAL(gc_res.min_gc_before, expected_gc_before);
|
|
BOOST_REQUIRE_EQUAL(gc_res.max_gc_before, expected_gc_before);
|
|
BOOST_REQUIRE_EQUAL(gc_res.knows_entire_range, true);
|
|
};
|
|
|
|
shared_tombstone_gc_state shared_state;
|
|
|
|
for (auto gc_mode : {tombstone_gc_mode::timeout, tombstone_gc_mode::disabled, tombstone_gc_mode::immediate, tombstone_gc_mode::repair}) {
|
|
auto schema = schema_builder("ks", "tbl")
|
|
.with_column("pk", int32_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({{"mode", fmt::to_string(gc_mode)}}))
|
|
.build();
|
|
|
|
const auto repair_time = gc_clock::now() - gc_clock::duration(std::chrono::hours(6));
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
shared_state.update_repair_time(schema->id(), repair_range, repair_time);
|
|
|
|
const auto now = gc_clock::now();
|
|
|
|
const auto pk = partition_key::from_single_value(*schema, serialized(1));
|
|
const auto dk = dht::decorate_key(*schema, pk);
|
|
|
|
// These constructors overrides gc_mode
|
|
check(tombstone_gc_state::no_gc(), schema, dk, now, gc_clock::time_point::min());
|
|
check(tombstone_gc_state::gc_all(), schema, dk, now, gc_clock::time_point::max());
|
|
|
|
switch (gc_mode) {
|
|
case tombstone_gc_mode::timeout:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, now - std::chrono::seconds(schema->gc_grace_seconds()));
|
|
break;
|
|
case tombstone_gc_mode::disabled:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, gc_clock::time_point::min());
|
|
break;
|
|
case tombstone_gc_mode::immediate:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, now);
|
|
break;
|
|
case tombstone_gc_mode::repair:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, repair_time - schema->tombstone_gc_options().propagation_delay_in_seconds());
|
|
break;
|
|
}
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_tombstone_gc_rf_one) {
|
|
return do_with_cql_env_thread([] (cql_test_env& env) {
|
|
const std::string keyspace_name = get_name();
|
|
const std::string table_name = "tbl";
|
|
|
|
env.execute_cql(std::format("CREATE KEYSPACE {} WITH"
|
|
" replication = {{'class': 'NetworkTopologyStrategy', 'datacenter1': 1}} AND"
|
|
" tablets = {{'enabled': 'false'}}", keyspace_name)).get();
|
|
env.execute_cql(std::format("CREATE TABLE {}.{} (pk int PRIMARY KEY)"
|
|
" WITH compaction = {{'class': 'NullCompactionStrategy'}}", keyspace_name, table_name)).get();
|
|
|
|
auto& db = env.local_db();
|
|
auto& tbl = db.find_column_family(keyspace_name, table_name);
|
|
const auto schema = tbl.schema();
|
|
|
|
auto& st = tbl.get_compaction_manager().get_shared_tombstone_gc_state();
|
|
|
|
BOOST_REQUIRE(st.is_table_rf_one(schema->id()));
|
|
|
|
auto tombstone_gc = tbl.get_tombstone_gc_state();
|
|
|
|
const auto pk = partition_key::from_single_value(*schema, serialized(1));
|
|
const auto dk = dht::decorate_key(*schema, pk);
|
|
|
|
const auto now = gc_clock::now();
|
|
|
|
// With RF=1, tombstone-gc should act as if configured in 'immediate' mode.
|
|
BOOST_REQUIRE_EQUAL(tombstone_gc.get_gc_before_for_key(schema, dk, now), now);
|
|
|
|
env.execute_cql(std::format("ALTER KEYSPACE {}"
|
|
" WITH replication = {{'class': 'NetworkTopologyStrategy', 'datacenter1': 2}}",
|
|
keyspace_name)).get();
|
|
|
|
BOOST_REQUIRE(!st.is_table_rf_one(schema->id()));
|
|
|
|
// After changing RF to > 1, tombstone-gc should revert to regular
|
|
// repair-mode behaviour. Since there was no repair yet, this should
|
|
// return min timepoint (no GC).
|
|
BOOST_REQUIRE_EQUAL(tombstone_gc.get_gc_before_for_key(schema, dk, now), gc_clock::time_point::min());
|
|
|
|
// Altering back to RF=1 should re-add the table to the rf=1 table list
|
|
env.execute_cql(std::format("ALTER KEYSPACE {}"
|
|
" WITH replication = {{'class': 'NetworkTopologyStrategy', 'datacenter1': 1}}",
|
|
keyspace_name)).get();
|
|
BOOST_REQUIRE(st.is_table_rf_one(schema->id()));
|
|
|
|
// Dropping the table should clean up (removing the table from the rf=1 list).
|
|
env.execute_cql(std::format("DROP TABLE {}.{}", keyspace_name, table_name)).get();
|
|
BOOST_REQUIRE(!st.is_table_rf_one(schema->id()));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_default_tombstone_gc_local_replication) {
|
|
return do_with_cql_env_thread([] (cql_test_env& env) {
|
|
const std::string keyspace_name = get_name();
|
|
const std::string table_name = "tbl";
|
|
|
|
env.execute_cql(std::format("CREATE KEYSPACE {} WITH replication = {{'class': 'LocalStrategy'}}", keyspace_name)).get();
|
|
env.execute_cql(std::format("CREATE TABLE {}.{} (pk int PRIMARY KEY)"
|
|
" WITH compaction = {{'class': 'NullCompactionStrategy'}}", keyspace_name, table_name)).get();
|
|
|
|
auto& db = env.local_db();
|
|
auto& tbl = db.find_column_family(keyspace_name, table_name);
|
|
const auto schema = tbl.schema();
|
|
|
|
BOOST_REQUIRE_EQUAL(schema->tombstone_gc_options().mode(), tombstone_gc_mode::timeout);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_flush_empty_table_waits_on_outstanding_flush) {
|
|
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
|
testlog.debug("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
|
return make_ready_future();
|
|
#endif
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto found = e.db().map_reduce0([&] (replica::database& db) -> future<bool> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
|
|
if (!cf.needs_flush()) {
|
|
co_return false;
|
|
}
|
|
|
|
utils::get_local_injector().enable("flush_memtable_to_sstable_wait");
|
|
|
|
auto flushed_0 = cf.flush();
|
|
if (flushed_0.available()) {
|
|
testlog.error("Table flush completed too early");
|
|
BOOST_REQUIRE(!flushed_0.available());
|
|
}
|
|
|
|
if (!cf.needs_flush()) {
|
|
testlog.error("Expecting needs_flush when waiting in flush_memtable_to_sstable_wait");
|
|
BOOST_REQUIRE(cf.needs_flush());
|
|
}
|
|
|
|
// Now flush again when the active memtable is empty.
|
|
// Expect that this waits on the ongoing flush
|
|
auto flushed_1 = cf.flush();
|
|
|
|
// While flush_0 is blocked, flush_1 should be blocked behind it
|
|
if (flushed_0.available()) {
|
|
testlog.error("First table flush expected to be blocked on injected wait");
|
|
BOOST_REQUIRE(!flushed_0.available());
|
|
}
|
|
if (flushed_1.available()) {
|
|
testlog.error("Second table flush expected to be blocked behind first flush");
|
|
BOOST_REQUIRE(!flushed_1.available());
|
|
}
|
|
if (!cf.needs_flush()) {
|
|
testlog.error("Expecting needs_flush when waiting in second flush");
|
|
BOOST_REQUIRE(cf.needs_flush());
|
|
}
|
|
|
|
utils::get_local_injector().receive_message("flush_memtable_to_sstable_wait");
|
|
co_await std::move(flushed_0);
|
|
co_await std::move(flushed_1);
|
|
|
|
if (cf.needs_flush()) {
|
|
testlog.error("Table is not expected to need flush after flush completed");
|
|
BOOST_REQUIRE(!cf.needs_flush());
|
|
}
|
|
|
|
co_return true;
|
|
}, false, std::logical_or<bool>()).get();
|
|
BOOST_REQUIRE(found);
|
|
});
|
|
}
|
|
|
|
struct scoped_execption_log_level {
|
|
scoped_execption_log_level() {
|
|
smp::invoke_on_all([] {
|
|
global_logger_registry().set_logger_level("exception", log_level::debug);
|
|
}).get();
|
|
}
|
|
~scoped_execption_log_level() {
|
|
smp::invoke_on_all([] {
|
|
global_logger_registry().set_logger_level("exception", log_level::info);
|
|
}).get();
|
|
}
|
|
};
|
|
|
|
SEASTAR_TEST_CASE(replica_read_timeout_no_exception) {
|
|
cql_test_config cfg;
|
|
cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0);
|
|
const auto read_timeout = 10ms;
|
|
const auto write_timeout = 10s;
|
|
cfg.query_timeout.emplace(timeout_config{
|
|
.read_timeout = read_timeout,
|
|
.write_timeout = write_timeout,
|
|
.range_read_timeout = read_timeout,
|
|
.counter_write_timeout = write_timeout,
|
|
.truncate_timeout = write_timeout,
|
|
.cas_timeout = write_timeout,
|
|
.other_timeout = read_timeout});
|
|
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const sstring ks_name = get_name();
|
|
const sstring tbl_name = "tbl";
|
|
|
|
// Disable tablets because we want to exercise the legacy range-scan path too.
|
|
// With tablets, only the table::query() path is used. Vnodes cover both.
|
|
e.execute_cql(format("CREATE KEYSPACE {} WITH"
|
|
" replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND"
|
|
" tablets = {{'enabled': 'false'}}", ks_name)).get();
|
|
|
|
e.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH compaction = {{'class': 'NullCompactionStrategy'}}", ks_name, tbl_name)).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 0, 0)", ks_name, tbl_name)).get();
|
|
replica::database::flush_table_on_all_shards(e.db(), ks_name, tbl_name).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 1, 1)", ks_name, tbl_name)).get();
|
|
replica::database::flush_table_on_all_shards(e.db(), ks_name, tbl_name).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 2, 2)", ks_name, tbl_name)).get();
|
|
|
|
// One successful read, so the auth cache is populated.
|
|
e.execute_cql(format("SELECT * FROM {}.{} WHERE pk = 1", ks_name, tbl_name)).get();
|
|
|
|
auto get_cxx_exceptions = [&e] {
|
|
return e.db().map_reduce0([] (replica::database&) { return engine().cxx_exceptions(); }, 0, std::plus<size_t>()).get();
|
|
};
|
|
|
|
auto full_scan_id = e.prepare(format("SELECT * FROM {}.{} BYPASS CACHE", ks_name, tbl_name)).get();
|
|
auto partition_scan_id = e.prepare(format("SELECT * FROM {}.{} WHERE pk = 1 BYPASS CACHE", ks_name, tbl_name)).get();
|
|
|
|
auto execute_test = [&] (const char* test_name, bool scan) {
|
|
testlog.info("Executing test: {} scan={}", test_name, scan);
|
|
|
|
const auto cxx_exceptions_before = get_cxx_exceptions();
|
|
const size_t num_reads = 1000;
|
|
|
|
scoped_execption_log_level exception_log_level_scope;
|
|
|
|
const auto query = scan ? full_scan_id : partition_scan_id;
|
|
|
|
testlog.info("Executing {} {}", num_reads, scan ? "full scans" : "partition scans");
|
|
|
|
std::vector<future<>> futures;
|
|
for (size_t i = 0; i < num_reads; ++i) {
|
|
futures.emplace_back(e.execute_prepared(query, {}).discard_result());
|
|
}
|
|
|
|
// Wait for all futures and check that they all fail with a timeout exception -- without throwing any exception in the process.
|
|
while (!futures.empty()) {
|
|
auto f = std::move(futures.back());
|
|
futures.pop_back();
|
|
|
|
f.wait();
|
|
BOOST_REQUIRE(f.failed());
|
|
auto ex = f.get_exception();
|
|
BOOST_REQUIRE(try_catch<exceptions::read_timeout_exception>(ex));
|
|
}
|
|
const auto cxx_exceptions_after = get_cxx_exceptions();
|
|
|
|
testlog.info("Exceptions: before={}, after={}", cxx_exceptions_before, cxx_exceptions_after);
|
|
|
|
BOOST_REQUIRE_EQUAL(cxx_exceptions_after, cxx_exceptions_before);
|
|
};
|
|
|
|
// Test 1: execute reads that reach the disk but time out while reading from the disk.
|
|
// Ensure no exceptions are thrown, but the reads fail with a read timeout exception.
|
|
if constexpr (std::is_same_v<utils::error_injection_type, utils::error_injection<true>>) {
|
|
utils::get_local_injector().enable("sstables_mx_reader_fill_buffer_timeout", false, {{"table", format("{}.{}", ks_name, tbl_name)}});
|
|
execute_test("disk reads", false);
|
|
execute_test("disk reads", true);
|
|
utils::get_local_injector().disable("sstables_mx_reader_fill_buffer_timeout");
|
|
}
|
|
|
|
// Test 2: execute reads that get queued on the semaphore and time out while waiting for the permit.
|
|
// Ensure no exceptions are thrown, but the reads fail with a read timeout exception.
|
|
{
|
|
// Take all semaphore resources and add an active permit
|
|
// Ensures that all new subsequent reads will be queued.
|
|
std::vector<foreign_ptr<std::unique_ptr<reader_permit>>> dummy_permits;
|
|
for (shard_id shard = 0; shard < smp::count; ++shard) {
|
|
dummy_permits.emplace_back();
|
|
}
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto& sem = db.get_reader_concurrency_semaphore();
|
|
sem.set_resources({1, 1});
|
|
dummy_permits[this_shard_id()] = std::make_unique<reader_permit>(co_await sem.obtain_permit(
|
|
db.find_column_family(ks_name, tbl_name).schema(),
|
|
"dummy",
|
|
1,
|
|
db::no_timeout,
|
|
{}));
|
|
}).get();
|
|
|
|
execute_test("queued reads", false);
|
|
execute_test("queued reads", true);
|
|
}
|
|
}, cfg);
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|