When reads arrive, they have to wait for admission on the reader concurrency semaphore. If the node is overloaded, the reads will be queued. They can time out while in the queue, but will not time out once admitted. Once the shard is sufficiently loaded, it is possible that most queued reads will time out, because the average time it takes to for a queued read to be admitted is around that of the timeout. If a read times out, any work we already did, or are about to do on it is wasted effort. Therefore, the patch tries to prevent it by checking if an admitted read has a chance to complete in time and abort it if not. It uses the following criteria: if read's remaining time <= read's timeout when arrived to the semaphore * live updateable preemptive_abort_factor; the read is rejected and the next one from the wait list is considered. Fixes https://github.com/scylladb/scylladb/issues/14909 Fixes: SCYLLADB-353 Backport is not needed. Better to first observe its impact. Closes scylladb/scylladb#21649 * github.com:scylladb/scylladb: reader_concurrency_semaphore: Check during admission if read may timeout permit_reader::impl: Replace break with return after evicting inactive permit on timeout reader_concurrency_semaphore: Add preemptive_abort_factor to constructors config: Add parameters to control reads' preemptive_abort_factor permit_reader: Add a new state: preemptive_aborted reader_concurrency_semaphore: validate waiters counter when dequeueing a waiting permit reader_concurrency_semaphore: Remove cpu_concurrency's default value
2267 lines
109 KiB
C++
2267 lines
109 KiB
C++
/*
|
|
* Copyright (C) 2016-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
|
|
#include <boost/test/tools/old/interface.hpp>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/shard_id.hh>
|
|
#include <seastar/core/smp.hh>
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/coroutine.hh>
|
|
#include <seastar/core/bitops.hh>
|
|
#include <seastar/util/file.hh>
|
|
|
|
#undef SEASTAR_TESTING_MAIN
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <utility>
|
|
#include <fmt/ranges.h>
|
|
#include <fmt/std.h>
|
|
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/result_set_assertions.hh"
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/random_utils.hh"
|
|
#include "test/lib/simple_schema.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
|
|
#include "replica/database.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/lister.hh"
|
|
#include "utils/rjson.hh"
|
|
#include "partition_slice_builder.hh"
|
|
#include "mutation/frozen_mutation.hh"
|
|
#include "test/lib/mutation_source_test.hh"
|
|
#include "schema/schema_builder.hh"
|
|
#include "service/migration_manager.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/generation_type.hh"
|
|
#include "sstables/sstable_version.hh"
|
|
#include "db/config.hh"
|
|
#include "db/commitlog/commitlog_replayer.hh"
|
|
#include "db/commitlog/commitlog.hh"
|
|
#include "test/lib/tmpdir.hh"
|
|
#include "db/data_listeners.hh"
|
|
#include "replica/multishard_query.hh"
|
|
#include "mutation_query.hh"
|
|
#include "transport/messages/result_message.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "db/snapshot-ctl.hh"
|
|
#include "db/system_keyspace.hh"
|
|
#include "db/view/view_builder.hh"
|
|
#include "replica/mutation_dump.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
using namespace sstables;
|
|
using namespace tests;
|
|
|
|
class database_test_wrapper {
|
|
replica::database& _db;
|
|
public:
|
|
explicit database_test_wrapper(replica::database& db) : _db(db) { }
|
|
|
|
reader_concurrency_semaphore& get_user_read_concurrency_semaphore() {
|
|
return _db.read_concurrency_sem();
|
|
}
|
|
reader_concurrency_semaphore& get_streaming_read_concurrency_semaphore() {
|
|
return _db._streaming_concurrency_sem;
|
|
}
|
|
reader_concurrency_semaphore& get_system_read_concurrency_semaphore() {
|
|
return _db._system_read_concurrency_sem;
|
|
}
|
|
|
|
size_t get_total_user_reader_concurrency_semaphore_memory() {
|
|
return _db._reader_concurrency_semaphores_group._total_memory;
|
|
}
|
|
|
|
size_t get_total_user_reader_concurrency_semaphore_weight() {
|
|
return _db._reader_concurrency_semaphores_group._total_weight;
|
|
}
|
|
};
|
|
|
|
static future<> apply_mutation(sharded<replica::database>& sharded_db, table_id uuid, const mutation& m, bool do_flush = false,
|
|
db::commitlog::force_sync fs = db::commitlog::force_sync::no, db::timeout_clock::time_point timeout = db::no_timeout) {
|
|
auto& t = sharded_db.local().find_column_family(uuid);
|
|
auto shard = t.shard_for_reads(m.token());
|
|
return sharded_db.invoke_on(shard, [uuid, fm = freeze(m), do_flush, fs, timeout] (replica::database& db) {
|
|
auto& t = db.find_column_family(uuid);
|
|
return db.apply(t.schema(), fm, tracing::trace_state_ptr(), fs, timeout).then([do_flush, &t] {
|
|
return do_flush ? t.flush() : make_ready_future<>();
|
|
});
|
|
});
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE(database_test)
|
|
|
|
SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([](cql_test_env& e) {
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
sstring ks_name = "ks";
|
|
sstring cf_name = "cf";
|
|
auto s = db.find_schema(ks_name, cf_name);
|
|
auto&& table = db.find_column_family(s);
|
|
auto uuid = s->id();
|
|
|
|
std::vector<size_t> keys_per_shard;
|
|
std::vector<dht::partition_range_vector> pranges_per_shard;
|
|
keys_per_shard.resize(smp::count);
|
|
pranges_per_shard.resize(smp::count);
|
|
for (uint32_t i = 1; i <= 1000; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key{}", i)));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
|
auto shard = table.shard_for_reads(m.token());
|
|
keys_per_shard[shard]++;
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
}
|
|
|
|
auto assert_query_result = [&] (const std::vector<size_t>& expected_sizes) {
|
|
auto max_size = std::numeric_limits<size_t>::max();
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(1000));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto&& [result, cache_tempature] = co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_sizes[shard]);
|
|
}).get();
|
|
};
|
|
assert_query_result(keys_per_shard);
|
|
|
|
replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf").get();
|
|
|
|
for (auto it = keys_per_shard.begin(); it < keys_per_shard.end(); ++it) {
|
|
*it = 0;
|
|
}
|
|
assert_query_result(keys_per_shard);
|
|
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto cl = db.commitlog();
|
|
auto rp = co_await db::commitlog_replayer::create_replayer(e.db(), e.get_system_keyspace());
|
|
auto paths = co_await cl->list_existing_segments();
|
|
co_await rp.recover(paths, db::commitlog::descriptor::FILENAME_PREFIX);
|
|
}).get();
|
|
|
|
assert_query_result(keys_per_shard);
|
|
return make_ready_future<>();
|
|
}, cfg);
|
|
}
|
|
|
|
// Reproducer for:
|
|
// https://github.com/scylladb/scylla/issues/10421
|
|
// https://github.com/scylladb/scylla/issues/10423
|
|
SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
sstring ks_name = "ks";
|
|
sstring cf_name = "cf";
|
|
e.execute_cql(fmt::format("create table {}.{} (k text, v int, primary key (k));", ks_name, cf_name)).get();
|
|
auto& db = e.local_db();
|
|
auto uuid = db.find_uuid(ks_name, cf_name);
|
|
auto s = db.find_column_family(uuid).schema();
|
|
int count = 0;
|
|
|
|
auto insert_data = [&] (uint32_t begin, uint32_t end) {
|
|
return parallel_for_each(std::views::iota(begin, end), [&] (auto i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(fmt::format("key-{}", tests::random::get_int<uint64_t>())));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), {});
|
|
return apply_mutation(e.db(), uuid, m, true /* do_flush */).finally([&] {
|
|
++count;
|
|
});
|
|
});
|
|
};
|
|
|
|
uint32_t num_keys = 1000;
|
|
|
|
auto f0 = insert_data(0, num_keys);
|
|
auto f1 = do_until([&] { return std::cmp_greater_equal(count, num_keys); }, [&, ts = db_clock::now()] {
|
|
return replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf", ts, false /* with_snapshot */).then([] {
|
|
return yield();
|
|
});
|
|
});
|
|
f0.get();
|
|
f1.get();
|
|
}, cfg);
|
|
}
|
|
|
|
// Reproducer for:
|
|
// https://github.com/scylladb/scylla/issues/21719
|
|
SEASTAR_TEST_CASE(test_truncate_saves_replay_position) {
|
|
auto cfg = make_shared<db::config>();
|
|
cfg->auto_snapshot.set(false);
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
BOOST_REQUIRE_GT(smp::count, 1);
|
|
const sstring ks_name = "ks";
|
|
const sstring cf_name = "cf";
|
|
e.execute_cql(fmt::format("CREATE TABLE {}.{} (k TEXT PRIMARY KEY, v INT);", ks_name, cf_name)).get();
|
|
const table_id uuid = e.local_db().find_uuid(ks_name, cf_name);
|
|
|
|
replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, cf_name, db_clock::now(), false /* with_snapshot */).get();
|
|
|
|
auto res = e.execute_cql(fmt::format("SELECT * FROM system.truncated WHERE table_uuid = {}", uuid)).get();
|
|
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
|
|
BOOST_REQUIRE(rows);
|
|
auto row_count = rows->rs().result_set().size();
|
|
BOOST_REQUIRE_EQUAL(row_count, smp::count);
|
|
}, cfg);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_querying_with_limits) {
|
|
return do_with_cql_env_thread([](cql_test_env& e) {
|
|
// FIXME: restore indent.
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
auto s = db.find_schema("ks", "cf");
|
|
auto&& table = db.find_column_family(s);
|
|
auto uuid = s->id();
|
|
std::vector<size_t> keys_per_shard;
|
|
std::vector<dht::partition_range_vector> pranges_per_shard;
|
|
keys_per_shard.resize(smp::count);
|
|
pranges_per_shard.resize(smp::count);
|
|
for (uint32_t i = 1; i <= 3 * smp::count; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
|
mutation m(s, pkey);
|
|
m.partition().apply(tombstone(api::timestamp_type(1), gc_clock::now()));
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
auto shard = table.shard_for_reads(m.token());
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
}
|
|
for (uint32_t i = 3 * smp::count; i <= 8 * smp::count; ++i) {
|
|
auto pkey = partition_key::from_single_value(*s, to_bytes(format("key{:d}", i)));
|
|
mutation m(s, pkey);
|
|
m.set_clustered_cell(clustering_key_prefix::make_empty(), "v", int32_t(42), 1);
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
auto shard = table.shard_for_reads(m.token());
|
|
keys_per_shard[shard]++;
|
|
pranges_per_shard[shard].emplace_back(dht::partition_range::make_singular(dht::decorate_key(*s, std::move(pkey))));
|
|
}
|
|
|
|
auto max_size = std::numeric_limits<size_t>::max();
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(3));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(5));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 5);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
|
|
{
|
|
auto cmd = query::read_command(s->id(), s->version(), partition_slice_builder(*s).build(), query::max_result_size(max_size),
|
|
query::tombstone_limit::max, query::row_limit(query::max_rows), query::partition_limit(3));
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto shard = this_shard_id();
|
|
auto s = db.find_schema(uuid);
|
|
auto result = std::get<0>(co_await db.query(s, cmd, query::result_options::only_result(), pranges_per_shard[shard], nullptr, db::no_timeout));
|
|
auto expected_size = std::min<size_t>(keys_per_shard[shard], 3);
|
|
assert_that(query::result_set::from_raw_result(s, cmd.slice, *result)).has_size(expected_size);
|
|
}).get();
|
|
}
|
|
});
|
|
}
|
|
|
|
static void test_database(void (*run_tests)(populate_fn_ex, bool)) {
|
|
do_with_cql_env_thread([run_tests] (cql_test_env& e) {
|
|
run_tests([&] (schema_ptr s, const utils::chunked_vector<mutation>& partitions, gc_clock::time_point) -> mutation_source {
|
|
auto& mm = e.migration_manager().local();
|
|
try {
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
e.local_db().find_column_family(s->ks_name(), s->cf_name());
|
|
mm.announce(service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard), "").get();
|
|
} catch (const replica::no_such_column_family&) {
|
|
// expected
|
|
}
|
|
auto group0_guard = mm.start_group0_operation().get();
|
|
auto ts = group0_guard.write_timestamp();
|
|
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard), "").get();
|
|
replica::column_family& cf = e.local_db().find_column_family(s);
|
|
auto uuid = cf.schema()->id();
|
|
for (auto&& m : partitions) {
|
|
apply_mutation(e.db(), uuid, m).get();
|
|
}
|
|
cf.flush().get();
|
|
cf.get_row_cache().invalidate(row_cache::external_updater([] {})).get();
|
|
return mutation_source([&] (schema_ptr s,
|
|
reader_permit permit,
|
|
const dht::partition_range& range,
|
|
const query::partition_slice& slice,
|
|
tracing::trace_state_ptr trace_state,
|
|
streamed_mutation::forwarding fwd,
|
|
mutation_reader::forwarding fwd_mr) {
|
|
return cf.make_mutation_reader(s, std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
|
|
});
|
|
}, true);
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_basic) {
|
|
test_database(run_mutation_source_tests_plain_basic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_reader_conversion) {
|
|
test_database(run_mutation_source_tests_plain_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_fragments_monotonic) {
|
|
test_database(run_mutation_source_tests_plain_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_plain_read_back) {
|
|
test_database(run_mutation_source_tests_plain_read_back);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_basic) {
|
|
test_database(run_mutation_source_tests_reverse_basic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_reader_conversion) {
|
|
test_database(run_mutation_source_tests_reverse_reader_conversion);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_fragments_monotonic) {
|
|
test_database(run_mutation_source_tests_reverse_fragments_monotonic);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_source_reverse_read_back) {
|
|
test_database(run_mutation_source_tests_reverse_read_back);
|
|
}
|
|
|
|
static void require_exist(const sstring& filename, bool should) {
|
|
auto exists = file_exists(filename).get();
|
|
BOOST_REQUIRE_EQUAL(exists, should);
|
|
}
|
|
|
|
static void touch_dir(const sstring& dirname) {
|
|
recursive_touch_directory(dirname).get();
|
|
require_exist(dirname, true);
|
|
}
|
|
|
|
static void touch_file(const sstring& filename) {
|
|
tests::touch_file(filename).get();
|
|
require_exist(filename, true);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_incomplete_sstables) {
|
|
using sst = sstables::sstable;
|
|
|
|
tmpdir data_dir;
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
auto& db_cfg = *db_cfg_ptr;
|
|
|
|
db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine);
|
|
|
|
// Create incomplete sstables in test data directory
|
|
sstring ks = "system";
|
|
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
|
|
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
|
|
|
|
auto temp_sst_dir_2 = fmt::format("{}/{}{}", sst_dir, generation_from_value(2), tempdir_extension);
|
|
touch_dir(temp_sst_dir_2);
|
|
|
|
auto temp_sst_dir_3 = fmt::format("{}/{}{}", sst_dir, generation_from_value(3), tempdir_extension);
|
|
touch_dir(temp_sst_dir_3);
|
|
|
|
auto temp_file_name = sst::filename(temp_sst_dir_3, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(3), sst::format_types::big, component_type::TemporaryTOC);
|
|
touch_file(temp_file_name);
|
|
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC);
|
|
touch_file(temp_file_name);
|
|
// Reproducer for #scylladb/scylladb#26393
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryHashes);
|
|
touch_file(temp_file_name);
|
|
temp_file_name = sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::Data);
|
|
touch_file(temp_file_name);
|
|
|
|
auto test_config = cql_test_config(db_cfg_ptr);
|
|
test_config.clean_data_dir_before_test = false;
|
|
do_with_cql_env_thread([&sst_dir, &ks, &cf, &temp_sst_dir_2, &temp_sst_dir_3] (cql_test_env& e) {
|
|
require_exist(temp_sst_dir_2, false);
|
|
require_exist(temp_sst_dir_3, false);
|
|
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryTOC), false);
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::TemporaryHashes), false);
|
|
require_exist(sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), generation_from_value(4), sst::format_types::big, component_type::Data), false);
|
|
}, test_config).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_distributed_loader_with_pending_delete) {
|
|
using sst = sstables::sstable;
|
|
|
|
tmpdir data_dir;
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
auto& db_cfg = *db_cfg_ptr;
|
|
|
|
db_cfg.data_file_directories({data_dir.path().string()}, db::config::config_source::CommandLine);
|
|
|
|
// Create incomplete sstables in test data directory
|
|
sstring ks = "system";
|
|
sstring cf = "peers-37f71aca7dc2383ba70672528af04d4f";
|
|
sstring sst_dir = (data_dir.path() / std::string_view(ks) / std::string_view(cf)).string();
|
|
sstring pending_delete_dir = sst_dir + "/" + sstables::pending_delete_dir;
|
|
|
|
auto write_file = [] (const sstring& file_name, const sstring& text) {
|
|
auto f = open_file_dma(file_name, open_flags::wo | open_flags::create | open_flags::truncate).get();
|
|
auto os = make_file_output_stream(f, file_output_stream_options{}).get();
|
|
os.write(text).get();
|
|
os.flush().get();
|
|
os.close().get();
|
|
require_exist(file_name, true);
|
|
};
|
|
|
|
auto component_basename = [&ks, &cf] (sstables::generation_type gen, component_type ctype) {
|
|
return sst::component_basename(ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
|
|
};
|
|
|
|
auto gen_filename = [&sst_dir, &ks, &cf] (sstables::generation_type gen, component_type ctype) {
|
|
return sst::filename(sst_dir, ks, cf, sstables::get_highest_sstable_version(), gen, sst::format_types::big, ctype);
|
|
};
|
|
|
|
touch_dir(pending_delete_dir);
|
|
|
|
// Empty log file
|
|
touch_file(pending_delete_dir + "/sstables-0-0.log");
|
|
|
|
// Empty temporary log file
|
|
touch_file(pending_delete_dir + "/sstables-1-1.log.tmp");
|
|
|
|
const sstring toc_text = "TOC.txt\nData.db\n";
|
|
|
|
sstables::sstable_generation_generator gen_generator;
|
|
std::vector<sstables::generation_type> gen;
|
|
constexpr size_t num_gens = 9;
|
|
std::generate_n(std::back_inserter(gen), num_gens, [&] {
|
|
return gen_generator();
|
|
});
|
|
|
|
// Regular log file with single entry
|
|
write_file(gen_filename(gen[2], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[2], component_type::Data));
|
|
write_file(pending_delete_dir + "/sstables-2-2.log",
|
|
component_basename(gen[2], component_type::TOC) + "\n");
|
|
|
|
// Temporary log file with single entry
|
|
write_file(pending_delete_dir + "/sstables-3-3.log.tmp",
|
|
component_basename(gen[3], component_type::TOC) + "\n");
|
|
|
|
// Regular log file with multiple entries
|
|
write_file(gen_filename(gen[4], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[4], component_type::Data));
|
|
write_file(gen_filename(gen[5], component_type::TOC), toc_text);
|
|
touch_file(gen_filename(gen[5], component_type::Data));
|
|
write_file(pending_delete_dir + "/sstables-4-5.log",
|
|
component_basename(gen[4], component_type::TOC) + "\n" +
|
|
component_basename(gen[5], component_type::TOC) + "\n");
|
|
|
|
// Regular log file with multiple entries and some deleted sstables
|
|
write_file(gen_filename(gen[6], component_type::TemporaryTOC), toc_text);
|
|
touch_file(gen_filename(gen[6], component_type::Data));
|
|
write_file(gen_filename(gen[7], component_type::TemporaryTOC), toc_text);
|
|
write_file(pending_delete_dir + "/sstables-6-8.log",
|
|
component_basename(gen[6], component_type::TOC) + "\n" +
|
|
component_basename(gen[7], component_type::TOC) + "\n" +
|
|
component_basename(gen[8], component_type::TOC) + "\n");
|
|
|
|
auto test_config = cql_test_config(db_cfg_ptr);
|
|
test_config.clean_data_dir_before_test = false;
|
|
do_with_cql_env_thread([&] (cql_test_env& e) {
|
|
// Empty log filesst_dir
|
|
// Empty temporary log file
|
|
require_exist(pending_delete_dir + "/sstables-1-1.log.tmp", false);
|
|
|
|
// Regular log file with single entry
|
|
require_exist(gen_filename(gen[2], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[2], component_type::Data), false);
|
|
require_exist(pending_delete_dir + "/sstables-2-2.log", false);
|
|
|
|
// Temporary log file with single entry
|
|
require_exist(pending_delete_dir + "/sstables-3-3.log.tmp", false);
|
|
|
|
// Regular log file with multiple entries
|
|
require_exist(gen_filename(gen[4], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[4], component_type::Data), false);
|
|
require_exist(gen_filename(gen[5], component_type::TOC), false);
|
|
require_exist(gen_filename(gen[5], component_type::Data), false);
|
|
require_exist(pending_delete_dir + "/sstables-4-5.log", false);
|
|
|
|
// Regular log file with multiple entries and some deleted sstables
|
|
require_exist(gen_filename(gen[6], component_type::TemporaryTOC), false);
|
|
require_exist(gen_filename(gen[6], component_type::Data), false);
|
|
require_exist(gen_filename(gen[7], component_type::TemporaryTOC), false);
|
|
require_exist(pending_delete_dir + "/sstables-6-8.log", false);
|
|
}, test_config).get();
|
|
}
|
|
|
|
// Snapshot tests and their helpers
|
|
// \param func: function to be called back, in a seastar thread.
|
|
future<> do_with_some_data_in_thread(std::vector<sstring> cf_names, std::function<void (cql_test_env&)> func, bool create_mvs = false, shared_ptr<db::config> db_cfg_ptr = {}, size_t num_keys = 2) {
|
|
return seastar::async([cf_names = std::move(cf_names), func = std::move(func), create_mvs, db_cfg_ptr = std::move(db_cfg_ptr), num_keys] () mutable {
|
|
lw_shared_ptr<tmpdir> tmpdir_for_data;
|
|
if (!db_cfg_ptr) {
|
|
tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
}
|
|
do_with_cql_env_thread([cf_names = std::move(cf_names), func = std::move(func), create_mvs, num_keys] (cql_test_env& e) {
|
|
for (const auto& cf_name : cf_names) {
|
|
e.create_table([&cf_name] (std::string_view ks_name) {
|
|
return *schema_builder(ks_name, cf_name)
|
|
.with_column("p1", utf8_type, column_kind::partition_key)
|
|
.with_column("c1", int32_type, column_kind::clustering_key)
|
|
.with_column("c2", int32_type, column_kind::clustering_key)
|
|
.with_column("r1", int32_type)
|
|
.build();
|
|
}).get();
|
|
auto stmt = e.prepare(fmt::format("insert into {} (p1, c1, c2, r1) values (?, ?, ?, ?)", cf_name)).get();
|
|
auto make_key = [] (int64_t k) {
|
|
std::string s = fmt::format("key{}", k);
|
|
return cql3::raw_value::make_value(utf8_type->decompose(s));
|
|
};
|
|
auto make_val = [] (int64_t x) {
|
|
return cql3::raw_value::make_value(int32_type->decompose(int32_t{x}));
|
|
};
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
auto key = tests::random::get_int<int32_t>(1, 1000000);
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key), make_val(key + 1), make_val(key + 2)}).get();
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key + 1), make_val(key + 1), make_val(key + 2)}).get();
|
|
e.execute_prepared(stmt, {make_key(key), make_val(key + 2), make_val(key + 1), make_val(key + 2)}).get();
|
|
}
|
|
|
|
if (create_mvs) {
|
|
auto f1 = e.local_view_builder().wait_until_built("ks", seastar::format("view_{}", cf_name));
|
|
e.execute_cql(seastar::format("create materialized view view_{0} as select * from {0} where p1 is not null and c1 is not null and c2 is "
|
|
"not null primary key (p1, c1, c2)",
|
|
cf_name))
|
|
.get();
|
|
f1.get();
|
|
|
|
auto f2 = e.local_view_builder().wait_until_built("ks", "index_cf_index");
|
|
e.execute_cql(seastar::format("CREATE INDEX index_{0} ON {0} (r1);", cf_name)).get();
|
|
f2.get();
|
|
}
|
|
}
|
|
|
|
func(e);
|
|
}, db_cfg_ptr).get();
|
|
});
|
|
}
|
|
|
|
future<> do_with_some_data(std::vector<sstring> cf_names, std::function<future<> (cql_test_env&)> func, bool create_mvs = false, shared_ptr<db::config> db_cfg_ptr = {}) {
|
|
co_await do_with_some_data_in_thread(cf_names, [&] (cql_test_env& e) {
|
|
func(e).get();
|
|
}, create_mvs, db_cfg_ptr);
|
|
}
|
|
|
|
future<> take_snapshot(cql_test_env& e, sstring ks_name = "ks", sstring cf_name = "cf", sstring snapshot_name = "test", db::snapshot_options opts = {}) {
|
|
try {
|
|
auto uuid = e.db().local().find_uuid(ks_name, cf_name);
|
|
co_await replica::database::snapshot_table_on_all_shards(e.db(), uuid, snapshot_name, opts);
|
|
} catch (...) {
|
|
testlog.error("Could not take snapshot for {}.{} snapshot_name={} skip_flush={}: {}",
|
|
ks_name, cf_name, snapshot_name, opts.skip_flush, std::current_exception());
|
|
throw;
|
|
}
|
|
}
|
|
|
|
future<std::set<sstring>> collect_files(fs::path path) {
|
|
std::set<sstring> ret;
|
|
directory_lister lister(path, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
while (auto de = co_await lister.get()) {
|
|
ret.insert(de->name);
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
static bool is_component(const sstring& fname, const sstring& suffix) {
|
|
return fname.ends_with(suffix);
|
|
}
|
|
|
|
static std::set<sstring> collect_sstables(const std::set<sstring>& all_files, const sstring& suffix) {
|
|
// Verify manifest against the files in the snapshots dir
|
|
auto pred = [&suffix] (const sstring& fname) {
|
|
return is_component(fname, suffix);
|
|
};
|
|
return std::ranges::filter_view(all_files, pred) | std::ranges::to<std::set<sstring>>();
|
|
}
|
|
|
|
// Validate that the manifest.json lists exactly the SSTables present in the snapshot directory
|
|
static future<> validate_manifest(const locator::topology& topology, const fs::path& snapshot_dir, const std::set<sstring>& in_snapshot_dir, gc_clock::time_point min_time, bool tablets_enabled) {
|
|
sstring suffix = "-TOC.txt";
|
|
auto sstables_in_snapshot = collect_sstables(in_snapshot_dir, suffix);
|
|
std::set<sstring> sstables_in_manifest;
|
|
std::set<sstring> non_sstables_in_manifest;
|
|
|
|
auto manifest_str = co_await util::read_entire_file_contiguous(snapshot_dir / "manifest.json");
|
|
testlog.debug("manifest.json: {}", manifest_str);
|
|
auto manifest_json = rjson::parse(manifest_str);
|
|
BOOST_REQUIRE(manifest_json.IsObject());
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("manifest"));
|
|
auto& manifest_info = manifest_json["manifest"];
|
|
BOOST_REQUIRE(manifest_info.IsObject());
|
|
BOOST_REQUIRE(manifest_info.HasMember("version"));
|
|
auto& manifest_version = manifest_info["version"];
|
|
BOOST_REQUIRE(manifest_version.IsString());
|
|
BOOST_REQUIRE_EQUAL(manifest_version.GetString(), "1.0");
|
|
BOOST_REQUIRE(manifest_info.HasMember("scope"));
|
|
auto& manifest_scope = manifest_info["scope"];
|
|
BOOST_REQUIRE(manifest_scope.IsString());
|
|
BOOST_REQUIRE_EQUAL(manifest_scope.GetString(), "node");
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("node"));
|
|
auto& node_info = manifest_json["node"];
|
|
BOOST_REQUIRE(node_info.IsObject());
|
|
BOOST_REQUIRE(node_info.HasMember("host_id"));
|
|
auto& host_id_json = node_info["host_id"];
|
|
BOOST_REQUIRE(host_id_json.IsString());
|
|
auto id = utils::UUID(host_id_json.GetString());
|
|
BOOST_REQUIRE_EQUAL(id, topology.my_host_id().uuid());
|
|
BOOST_REQUIRE(node_info.HasMember("datacenter"));
|
|
auto& datacenter = node_info["datacenter"];
|
|
BOOST_REQUIRE(datacenter.IsString());
|
|
BOOST_REQUIRE_EQUAL(datacenter.GetString(), topology.get_location().dc);
|
|
BOOST_REQUIRE(node_info.HasMember("rack"));
|
|
auto& rack = node_info["rack"];
|
|
BOOST_REQUIRE(rack.IsString());
|
|
BOOST_REQUIRE_EQUAL(rack.GetString(), topology.get_location().rack);
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("snapshot"));
|
|
auto& manifest_snapshot = manifest_json["snapshot"];
|
|
BOOST_REQUIRE(manifest_snapshot.IsObject());
|
|
BOOST_REQUIRE(manifest_snapshot.HasMember("name"));
|
|
auto& snapshot_name = manifest_snapshot["name"];
|
|
BOOST_REQUIRE(snapshot_name.IsString());
|
|
BOOST_REQUIRE_EQUAL(snapshot_dir.filename(), snapshot_name.GetString());
|
|
BOOST_REQUIRE(manifest_snapshot.HasMember("created_at"));
|
|
auto& created_at = manifest_snapshot["created_at"];
|
|
BOOST_REQUIRE(created_at.IsNumber());
|
|
time_t created_at_seconds = created_at.GetInt64();
|
|
BOOST_REQUIRE_GE(created_at_seconds, min_time.time_since_epoch().count());
|
|
BOOST_REQUIRE_LT(created_at_seconds, min_time.time_since_epoch().count() + 60);
|
|
if (manifest_snapshot.HasMember("expires_at")) {
|
|
BOOST_REQUIRE(created_at_seconds > 0);
|
|
auto& expires_at = manifest_snapshot["expires_at"];
|
|
BOOST_REQUIRE(expires_at.IsNumber());
|
|
BOOST_REQUIRE_GE(expires_at.GetInt64(), created_at_seconds);
|
|
}
|
|
|
|
BOOST_REQUIRE(manifest_json.HasMember("table"));
|
|
auto& manifest_table = manifest_json["table"];
|
|
BOOST_REQUIRE(manifest_table.IsObject());
|
|
BOOST_REQUIRE(manifest_table.HasMember("keyspace_name"));
|
|
auto& manifest_table_ks_name = manifest_table["keyspace_name"];
|
|
BOOST_REQUIRE(manifest_table_ks_name.IsString());
|
|
BOOST_REQUIRE_EQUAL(snapshot_dir.parent_path().parent_path().parent_path().filename().native(), manifest_table_ks_name.GetString());
|
|
auto& manifest_table_table_name = manifest_table["table_name"];
|
|
BOOST_REQUIRE(manifest_table_table_name.IsString());
|
|
BOOST_REQUIRE(snapshot_dir.parent_path().parent_path().filename().native().starts_with(manifest_table_table_name.GetString()));
|
|
std::optional<sstring> tablets_type;
|
|
if (manifest_table.HasMember("tablets_type")) {
|
|
auto& tablets_type_json = manifest_table["tablets_type"];
|
|
BOOST_REQUIRE(tablets_type_json.IsString());
|
|
tablets_type = tablets_type_json.GetString();
|
|
}
|
|
if (tablets_enabled) {
|
|
BOOST_REQUIRE(tablets_type.has_value());
|
|
BOOST_REQUIRE_EQUAL(*tablets_type, "powof2");
|
|
BOOST_REQUIRE(manifest_table.HasMember("tablet_count"));
|
|
auto& tablet_count_json = manifest_table["tablet_count"];
|
|
BOOST_REQUIRE(tablet_count_json.IsNumber());
|
|
uint64_t tablet_count = tablet_count_json.GetInt64();
|
|
BOOST_REQUIRE_EQUAL(tablet_count, 1 << log2ceil(tablet_count));
|
|
} else {
|
|
if (tablets_type) {
|
|
BOOST_REQUIRE_EQUAL(*tablets_type, "none");
|
|
}
|
|
if (manifest_table.HasMember("tablet_count")) {
|
|
auto& tablet_count = manifest_table["tablet_count"];
|
|
if (!tablet_count.IsNull()) {
|
|
BOOST_REQUIRE(tablet_count.IsNumber());
|
|
BOOST_REQUIRE_EQUAL(tablet_count.GetInt64(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (manifest_json.HasMember("sstables")) {
|
|
auto& sstables = manifest_json["sstables"];
|
|
BOOST_REQUIRE(sstables.IsArray());
|
|
for (auto& sst_json : sstables.GetArray()) {
|
|
BOOST_REQUIRE(sst_json.IsObject());
|
|
|
|
auto& id = sst_json["id"];
|
|
BOOST_REQUIRE(id.IsString());
|
|
auto uuid = utils::UUID(id.GetString());
|
|
BOOST_REQUIRE(!uuid.is_null());
|
|
|
|
auto& toc_name = sst_json["toc_name"];
|
|
BOOST_REQUIRE(toc_name.IsString());
|
|
BOOST_REQUIRE(is_component(toc_name.GetString(), suffix));
|
|
sstables_in_manifest.insert(toc_name.GetString());
|
|
|
|
auto& data_size = sst_json["data_size"];
|
|
BOOST_REQUIRE(data_size.IsNumber());
|
|
auto& index_size = sst_json["index_size"];
|
|
BOOST_REQUIRE(index_size.IsNumber());
|
|
|
|
if (sst_json.HasMember("first_token")) {
|
|
auto& first_token = sst_json["first_token"];
|
|
BOOST_REQUIRE(first_token.IsNumber());
|
|
|
|
BOOST_REQUIRE(sst_json.HasMember("last_token"));
|
|
auto& last_token = sst_json["last_token"];
|
|
BOOST_REQUIRE(last_token.IsNumber());
|
|
BOOST_REQUIRE_LE(first_token.GetInt64(), last_token.GetInt64());
|
|
} else {
|
|
BOOST_REQUIRE(!sst_json.HasMember("last_token"));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (manifest_json.HasMember("files")) {
|
|
auto& manifest_files = manifest_json["files"];
|
|
BOOST_REQUIRE(manifest_files.IsArray());
|
|
for (auto& f : manifest_files.GetArray()) {
|
|
non_sstables_in_manifest.insert(f.GetString());
|
|
}
|
|
}
|
|
|
|
BOOST_REQUIRE_EQUAL(sstables_in_manifest, sstables_in_snapshot);
|
|
BOOST_REQUIRE_EQUAL(non_sstables_in_manifest, std::set<sstring>{});
|
|
}
|
|
|
|
static future<> snapshot_works(const std::string& table_name, bool create_mvs, bool tablets_enabled = false) {
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->tablets_mode_for_new_keyspaces(tablets_enabled ? db::tablets_mode_t::mode::enabled : db::tablets_mode_t::mode::disabled);
|
|
return do_with_some_data_in_thread({"cf"}, [table_name] (cql_test_env& e) {
|
|
auto min_time = gc_clock::now();
|
|
take_snapshot(e, "ks", table_name).get();
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", table_name);
|
|
auto table_directory = table_dir(cf);
|
|
auto snapshot_dir = table_directory / sstables::snapshots_dir / "test";
|
|
|
|
auto in_table_dir = collect_files(table_directory).get();
|
|
// snapshot triggered a flush and wrote the data down.
|
|
BOOST_REQUIRE_GE(in_table_dir.size(), 9);
|
|
|
|
auto in_snapshot_dir = collect_files(snapshot_dir).get();
|
|
|
|
in_table_dir.insert("manifest.json");
|
|
in_table_dir.insert("schema.cql");
|
|
// all files were copied and manifest was generated
|
|
BOOST_REQUIRE_EQUAL(in_table_dir, in_snapshot_dir);
|
|
|
|
const auto& topology = e.local_db().get_token_metadata().get_topology();
|
|
validate_manifest(topology, snapshot_dir, in_snapshot_dir, min_time, cf.uses_tablets()).get();
|
|
}, create_mvs, db_cfg_ptr, 100);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(table_snapshot_works) {
|
|
return snapshot_works("cf", true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(table_snapshot_works_with_tablets) {
|
|
// FIXME: do_with_some_data does not work with views and tablets yet
|
|
return snapshot_works("cf", false, true);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(view_snapshot_works) {
|
|
return snapshot_works("view_cf", true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(index_snapshot_works) {
|
|
return snapshot_works(::secondary_index::index_table_name("index_cf"), true, false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_skip_flush_works) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
db::snapshot_options opts = {.skip_flush = true};
|
|
take_snapshot(e, "ks", "cf", "test", opts).get();
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
|
|
auto in_table_dir = collect_files(table_dir(cf)).get();
|
|
// Snapshot did not trigger a flush.
|
|
BOOST_REQUIRE(in_table_dir.empty());
|
|
auto in_snapshot_dir = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get();
|
|
BOOST_REQUIRE_EQUAL(in_snapshot_dir, std::set<sstring>({"manifest.json", "schema.cql"}));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_okay) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_contains_dropped_tables) {
|
|
return do_with_some_data_in_thread({"cf1", "cf2", "cf3", "cf4"}, [] (cql_test_env& e) {
|
|
e.execute_cql("DROP TABLE ks.cf1;").get();
|
|
|
|
auto details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
BOOST_REQUIRE_EQUAL(details.begin()->second.size(), 1);
|
|
|
|
const auto& sd = details.begin()->second.front().details;
|
|
BOOST_REQUIRE_GT(sd.live, 0);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd.live);
|
|
|
|
take_snapshot(e, "ks", "cf2", "test2").get();
|
|
take_snapshot(e, "ks", "cf3", "test3").get();
|
|
|
|
details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 3);
|
|
|
|
e.execute_cql("DROP TABLE ks.cf4;").get();
|
|
|
|
details = e.local_db().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 4);
|
|
|
|
for (const auto& [name, r] : details) {
|
|
BOOST_REQUIRE_EQUAL(r.size(), 1);
|
|
const auto& result = r.front();
|
|
const auto& sd = result.details;
|
|
|
|
if (name == "test2" || name == "test3") {
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
} else {
|
|
BOOST_REQUIRE_GT(sd.live, 0);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd.live);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_list_inexistent) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 0);
|
|
});
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(clear_snapshot) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
take_snapshot(e).get();
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
|
|
unsigned count = collect_files(table_dir(cf) / sstables::snapshots_dir / "test").get().size();
|
|
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
|
|
|
|
e.local_db().clear_snapshot("test", {"ks"}, "").get();
|
|
count = 0;
|
|
|
|
BOOST_REQUIRE_EQUAL(fs::exists(table_dir(cf) / sstables::snapshots_dir / "test"), false);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(clear_multiple_snapshots) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "cf";
|
|
auto num_snapshots = 2;
|
|
|
|
auto snapshot_name = [] (int idx) {
|
|
return format("test-snapshot-{}", idx);
|
|
};
|
|
|
|
co_await do_with_some_data_in_thread({table_name}, [&] (cql_test_env& e) {
|
|
auto& t = e.local_db().find_column_family(ks_name, table_name);
|
|
auto tdir = table_dir(t);
|
|
auto snapshots_dir = tdir / sstables::snapshots_dir;
|
|
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
testlog.debug("Taking snapshot {} on {}.{}", snapshot_name(i), ks_name, table_name);
|
|
take_snapshot(e, ks_name, table_name, snapshot_name(i)).get();
|
|
}
|
|
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
testlog.debug("Verifying {}", snapshots_dir / snapshot_name(i));
|
|
unsigned count = collect_files(snapshots_dir / snapshot_name(i)).get().size();
|
|
BOOST_REQUIRE_GT(count, 1); // expect more than the manifest alone
|
|
}
|
|
|
|
// non-existent tag
|
|
testlog.debug("Clearing bogus tag");
|
|
e.local_db().clear_snapshot("bogus", {ks_name}, table_name).get();
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), true);
|
|
}
|
|
|
|
// clear single tag
|
|
testlog.debug("Clearing snapshot={} of {}.{}", snapshot_name(0), ks_name, table_name);
|
|
e.local_db().clear_snapshot(snapshot_name(0), {ks_name}, table_name).get();
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(0)), false);
|
|
for (auto i = 1; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), true);
|
|
}
|
|
|
|
// clear all tags (all tables)
|
|
testlog.debug("Clearing all snapshots in {}", ks_name);
|
|
e.local_db().clear_snapshot("", {ks_name}, "").get();
|
|
for (auto i = 0; i < num_snapshots; i++) {
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(i)), false);
|
|
}
|
|
|
|
testlog.debug("Taking an extra {} of {}.{}", snapshot_name(num_snapshots), ks_name, table_name);
|
|
take_snapshot(e, ks_name, table_name, snapshot_name(num_snapshots)).get();
|
|
|
|
// existing snapshots expected to remain after dropping the table
|
|
testlog.debug("Dropping table {}.{}", ks_name, table_name);
|
|
replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name).get();
|
|
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(num_snapshots)), true);
|
|
|
|
// clear all tags
|
|
testlog.debug("Clearing all snapshots in {}.{} after it had been dropped", ks_name, table_name);
|
|
e.local_db().clear_snapshot("", {ks_name}, table_name).get();
|
|
|
|
SCYLLA_ASSERT(!fs::exists(tdir));
|
|
|
|
// after all snapshots had been cleared,
|
|
// the dropped table directory is expected to be removed.
|
|
BOOST_REQUIRE_EQUAL(fs::exists(tdir), false);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(clear_nonexistent_snapshot) {
|
|
// no crashes, no exceptions
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
e.local_db().clear_snapshot("test", {"ks"}, "").get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_snapshot_ctl_details) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
sharded<db::snapshot_ctl> sc;
|
|
sc.start(std::ref(e.db()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
|
auto stop_sc = deferred_stop(sc);
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto sc_details = sc.local().get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(sc_details.size(), 1);
|
|
|
|
auto sc_sd_vec = sc_details["test"];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_vec.size(), 1);
|
|
const auto &sc_sd = sc_sd_vec[0];
|
|
BOOST_REQUIRE_EQUAL(sc_sd.ks, "ks");
|
|
BOOST_REQUIRE_EQUAL(sc_sd.cf, "cf");
|
|
BOOST_REQUIRE_EQUAL(sc_sd.details.live, sd.live);
|
|
BOOST_REQUIRE_EQUAL(sc_sd.details.total, sd.total);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
|
|
sc_details = sc.local().get_snapshot_details().get();
|
|
auto sc_sd_post_deletion_vec = sc_details["test"];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion_vec.size(), 1);
|
|
const auto &sc_sd_post_deletion = sc_sd_post_deletion_vec[0];
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.ks, "ks");
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.cf, "cf");
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.details.live, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sc_sd_post_deletion.details.total, sd_post_deletion.total);
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_snapshot_ctl_true_snapshots_size) {
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
sharded<db::snapshot_ctl> sc;
|
|
sc.start(std::ref(e.db()), std::ref(e.get_task_manager()), std::ref(e.get_sstorage_manager()), db::snapshot_ctl::config{}).get();
|
|
auto stop_sc = deferred_stop(sc);
|
|
|
|
auto& cf = e.local_db().find_column_family("ks", "cf");
|
|
take_snapshot(e).get();
|
|
|
|
auto details = cf.get_snapshot_details().get();
|
|
BOOST_REQUIRE_EQUAL(details.size(), 1);
|
|
|
|
auto sd = details["test"];
|
|
BOOST_REQUIRE_EQUAL(sd.live, 0);
|
|
BOOST_REQUIRE_GT(sd.total, 0);
|
|
|
|
auto sc_live_size = sc.local().true_snapshots_size().get();
|
|
BOOST_REQUIRE_EQUAL(sc_live_size, sd.live);
|
|
|
|
auto table_directory = table_dir(cf);
|
|
for (auto& f : collect_files(table_directory).get()) {
|
|
fs::remove(table_directory / f);
|
|
}
|
|
|
|
auto sd_post_deletion = cf.get_snapshot_details().get().at("test");
|
|
|
|
BOOST_REQUIRE_EQUAL(sd_post_deletion.total, sd_post_deletion.live);
|
|
BOOST_REQUIRE_EQUAL(sd.total, sd_post_deletion.live);
|
|
|
|
sc_live_size = sc.local().true_snapshots_size().get();
|
|
BOOST_REQUIRE_EQUAL(sc_live_size, sd_post_deletion.live);
|
|
});
|
|
}
|
|
|
|
// toppartitions_query caused a lw_shared_ptr to cross shards when moving results, #5104
|
|
SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE ks.tab (id int PRIMARY KEY)").get();
|
|
db::toppartitions_query tq(e.db(), {{"ks", "tab"}}, {}, 1s, 100, 100);
|
|
tq.scatter().get();
|
|
auto q = e.prepare("INSERT INTO ks.tab(id) VALUES(?)").get();
|
|
// Generate many values to ensure crossing shards
|
|
for (auto i = 0; i != 100; ++i) {
|
|
e.execute_prepared(q, {cql3::raw_value::make_value(int32_type->decompose(i))}).get();
|
|
}
|
|
// This should trigger the bug in debug mode
|
|
tq.gather().get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(read_max_size) {
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
|
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tab = db.find_column_family("ks", "test");
|
|
auto s = tab.schema();
|
|
|
|
auto pk = tests::generate_partition_key(s);
|
|
const auto cql3_pk = cql3::raw_value::make_value(pk.key().explode().front());
|
|
|
|
const auto value = sstring(1024, 'a');
|
|
const auto raw_value = utf8_type->decompose(data_value(value));
|
|
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
|
|
|
const int num_rows = 1024;
|
|
|
|
for (int i = 0; i != num_rows; ++i) {
|
|
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
|
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
|
}
|
|
|
|
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
|
|
|
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
|
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
|
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
|
return std::get<0>(res).memory_usage();
|
|
});
|
|
}},
|
|
{"query()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query(s, cmd, query::result_options::only_result(), partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<lw_shared_ptr<query::result>, cache_temperature>& res) {
|
|
return size_t(std::get<0>(res)->buf().size());
|
|
});
|
|
}},
|
|
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
|
return std::get<0>(res)->memory_usage();
|
|
});
|
|
}}
|
|
};
|
|
|
|
for (auto [query_method_name, query_method] : query_methods) {
|
|
for (auto allow_short_read : {true, false}) {
|
|
for (auto max_size : {1024u, 1024u * 1024u, 1024u * 1024u * 1024u}) {
|
|
const auto should_throw = max_size < (num_rows * value.size() * 2) && !allow_short_read;
|
|
testlog.info("checking: query_method={}, allow_short_read={}, max_size={}, should_throw={}", query_method_name, allow_short_read, max_size, should_throw);
|
|
auto slice = s->full_slice();
|
|
if (allow_short_read) {
|
|
slice.options.set<query::partition_slice::option::allow_short_read>();
|
|
} else {
|
|
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
|
}
|
|
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
|
try {
|
|
auto size = query_method(s, cmd).get();
|
|
// Just to ensure we are not interpreting empty results as success.
|
|
BOOST_REQUIRE(size != 0);
|
|
if (should_throw) {
|
|
BOOST_FAIL("Expected exception, but none was thrown.");
|
|
} else {
|
|
testlog.trace("No exception thrown, as expected.");
|
|
}
|
|
} catch (std::runtime_error& e) {
|
|
if (should_throw) {
|
|
testlog.trace("Exception thrown, as expected: {}", e.what());
|
|
} else {
|
|
BOOST_FAIL(fmt::format("Expected no exception, but caught: {}", e.what()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}).get();
|
|
}
|
|
|
|
// Check that mutation queries, those that are stopped when the memory
|
|
// consumed by their results reach the local/global limit, are aborted
|
|
// instead of silently terminated when this happens.
|
|
SEASTAR_THREAD_TEST_CASE(unpaged_mutation_read_global_limit) {
|
|
auto cfg = cql_test_config{};
|
|
cfg.dbcfg.emplace();
|
|
// The memory available to the result memory limiter (global limit) is
|
|
// configured based on the available memory, so give a small amount to
|
|
// the "node", so we don't have to work with large amount of data.
|
|
cfg.dbcfg->available_memory = 2 * 1024 * 1024;
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("CREATE TABLE test (pk text, ck int, v text, PRIMARY KEY (pk, ck));").get();
|
|
auto id = e.prepare("INSERT INTO test (pk, ck, v) VALUES (?, ?, ?);").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tab = db.find_column_family("ks", "test");
|
|
auto s = tab.schema();
|
|
|
|
auto pk = tests::generate_partition_key(s);
|
|
const auto cql3_pk = cql3::raw_value::make_value(pk.key().explode().front());
|
|
|
|
const auto value = sstring(1024, 'a');
|
|
const auto raw_value = utf8_type->decompose(data_value(value));
|
|
const auto cql3_value = cql3::raw_value::make_value(raw_value);
|
|
|
|
const int num_rows = 1024;
|
|
const auto max_size = 1024u * 1024u * 1024u;
|
|
|
|
for (int i = 0; i != num_rows; ++i) {
|
|
const auto cql3_ck = cql3::raw_value::make_value(int32_type->decompose(data_value(i)));
|
|
e.execute_prepared(id, {cql3_pk, cql3_ck, cql3_value}).get();
|
|
}
|
|
|
|
const auto partition_ranges = std::vector<dht::partition_range>{query::full_partition_range};
|
|
|
|
const std::vector<std::pair<sstring, std::function<future<size_t>(schema_ptr, const query::read_command&)>>> query_methods{
|
|
{"query_mutations()", [&db, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return db.query_mutations(s, cmd, partition_ranges.front(), {}, db::no_timeout).then(
|
|
[] (const std::tuple<reconcilable_result, cache_temperature>& res) {
|
|
return std::get<0>(res).memory_usage();
|
|
});
|
|
}},
|
|
{"query_mutations_on_all_shards()", [&e, &partition_ranges] (schema_ptr s, const query::read_command& cmd) -> future<size_t> {
|
|
return query_mutations_on_all_shards(e.db(), s, cmd, partition_ranges, {}, db::no_timeout).then(
|
|
[] (const std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature>& res) {
|
|
return std::get<0>(res)->memory_usage();
|
|
});
|
|
}}
|
|
};
|
|
|
|
for (auto [query_method_name, query_method] : query_methods) {
|
|
testlog.info("checking: query_method={}", query_method_name);
|
|
auto slice = s->full_slice();
|
|
slice.options.remove<query::partition_slice::option::allow_short_read>();
|
|
query::read_command cmd(s->id(), s->version(), slice, query::max_result_size(max_size), query::tombstone_limit::max);
|
|
try {
|
|
auto size = query_method(s, cmd).get();
|
|
// Just to ensure we are not interpreting empty results as success.
|
|
BOOST_REQUIRE(size != 0);
|
|
BOOST_FAIL("Expected exception, but none was thrown.");
|
|
} catch (std::runtime_error& e) {
|
|
testlog.trace("Exception thrown, as expected: {}", e.what());
|
|
}
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_selection_test) {
|
|
cql_test_config cfg;
|
|
|
|
scheduling_group unknown_scheduling_group = create_scheduling_group("unknown", 800).get();
|
|
auto cleanup_unknown_scheduling_group = defer([&unknown_scheduling_group] {
|
|
destroy_scheduling_group(unknown_scheduling_group).get();
|
|
});
|
|
|
|
const auto user_semaphore = std::mem_fn(&database_test_wrapper::get_user_read_concurrency_semaphore);
|
|
const auto system_semaphore = std::mem_fn(&database_test_wrapper::get_system_read_concurrency_semaphore);
|
|
const auto streaming_semaphore = std::mem_fn(&database_test_wrapper::get_streaming_read_concurrency_semaphore);
|
|
|
|
std::vector<std::pair<scheduling_group, std::function<reader_concurrency_semaphore&(database_test_wrapper&)>>> scheduling_group_and_expected_semaphore{
|
|
{default_scheduling_group(), system_semaphore}
|
|
};
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.compaction_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memory_compaction_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.streaming_scheduling_group, streaming_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.statement_scheduling_group, user_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memtable_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.memtable_to_cache_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(sched_groups.gossip_scheduling_group, system_semaphore);
|
|
scheduling_group_and_expected_semaphore.emplace_back(unknown_scheduling_group, user_semaphore);
|
|
|
|
do_with_cql_env_thread([&scheduling_group_and_expected_semaphore] (cql_test_env& e) {
|
|
auto& db = e.local_db();
|
|
database_test_wrapper tdb(db);
|
|
for (const auto& [sched_group, expected_sem_getter] : scheduling_group_and_expected_semaphore) {
|
|
with_scheduling_group(sched_group, [&db, sched_group = sched_group, &tdb, &expected_sem_getter = expected_sem_getter] {
|
|
auto expected_sem_ptr = &expected_sem_getter(tdb);
|
|
auto& sem = db.get_reader_concurrency_semaphore();
|
|
if (&sem != expected_sem_ptr) {
|
|
BOOST_FAIL(fmt::format("Unexpected semaphore for scheduling group {}, expected {}, got {}", sched_group.name(), expected_sem_ptr->name(), sem.name()));
|
|
}
|
|
}).get();
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(max_result_size_for_query_selection_test) {
|
|
cql_test_config cfg;
|
|
|
|
cfg.db_config->max_memory_for_unlimited_query_soft_limit(1 * 1024 * 1024, utils::config_file::config_source::CommandLine);
|
|
cfg.db_config->max_memory_for_unlimited_query_hard_limit(2 * 1024 * 1024, utils::config_file::config_source::CommandLine);
|
|
|
|
scheduling_group unknown_scheduling_group = create_scheduling_group("unknown", 800).get();
|
|
auto cleanup_unknown_scheduling_group = defer([&unknown_scheduling_group] {
|
|
destroy_scheduling_group(unknown_scheduling_group).get();
|
|
});
|
|
|
|
const auto user_max_result_size = query::max_result_size(
|
|
cfg.db_config->max_memory_for_unlimited_query_soft_limit(),
|
|
cfg.db_config->max_memory_for_unlimited_query_hard_limit(),
|
|
query::result_memory_limiter::maximum_result_size);
|
|
const auto system_max_result_size = query::max_result_size(
|
|
query::result_memory_limiter::unlimited_result_size,
|
|
query::result_memory_limiter::unlimited_result_size,
|
|
query::result_memory_limiter::maximum_result_size);
|
|
const auto maintenance_max_result_size = system_max_result_size;
|
|
|
|
std::vector<std::pair<scheduling_group, query::max_result_size>> scheduling_group_and_expected_max_result_size{
|
|
{default_scheduling_group(), system_max_result_size}
|
|
};
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.compaction_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memory_compaction_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.streaming_scheduling_group, maintenance_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.statement_scheduling_group, user_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memtable_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.memtable_to_cache_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(sched_groups.gossip_scheduling_group, system_max_result_size);
|
|
scheduling_group_and_expected_max_result_size.emplace_back(unknown_scheduling_group, user_max_result_size);
|
|
|
|
do_with_cql_env_thread([&scheduling_group_and_expected_max_result_size] (cql_test_env& e) {
|
|
auto& db = e.local_db();
|
|
database_test_wrapper tdb(db);
|
|
for (const auto& [sched_group, expected_max_size] : scheduling_group_and_expected_max_result_size) {
|
|
with_scheduling_group(sched_group, [&db, sched_group = sched_group, expected_max_size = expected_max_size] {
|
|
const auto max_size = db.get_query_max_result_size();
|
|
if (max_size != expected_max_size) {
|
|
BOOST_FAIL(fmt::format("Unexpected max_size for scheduling group {}, expected {{{}, {}}}, got {{{}, {}}}",
|
|
sched_group.name(),
|
|
expected_max_size.soft_limit,
|
|
expected_max_size.hard_limit,
|
|
max_size.soft_limit,
|
|
max_size.hard_limit));
|
|
}
|
|
}).get();
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
// Check that during a multi-page range scan:
|
|
// * semaphore mismatch is detected
|
|
// * code is exception safe w.r.t. to the mismatch exception, e.g. readers are closed properly
|
|
SEASTAR_TEST_CASE(multipage_range_scan_semaphore_mismatch) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const auto do_abort = set_abort_on_internal_error(false);
|
|
auto reset_abort = defer([do_abort] {
|
|
set_abort_on_internal_error(do_abort);
|
|
});
|
|
e.execute_cql("CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));").get();
|
|
|
|
auto insert_id = e.prepare("INSERT INTO ks.tbl(pk, ck, v) VALUES(?, ?, ?)").get();
|
|
|
|
auto& db = e.local_db();
|
|
auto& tbl = db.find_column_family("ks", "tbl");
|
|
auto s = tbl.schema();
|
|
|
|
auto dk = tests::generate_partition_key(tbl.schema());
|
|
const auto pk = cql3::raw_value::make_value(managed_bytes(*dk.key().begin(*s)));
|
|
const auto v = cql3::raw_value::make_value(int32_type->decompose(0));
|
|
for (int32_t ck = 0; ck < 100; ++ck) {
|
|
e.execute_prepared(insert_id, {pk, cql3::raw_value::make_value(int32_type->decompose(ck)), v}).get();
|
|
}
|
|
|
|
auto sched_groups = get_scheduling_groups().get();
|
|
|
|
query::read_command cmd1(
|
|
s->id(),
|
|
s->version(),
|
|
s->full_slice(),
|
|
query::max_result_size(std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max()),
|
|
query::tombstone_limit::max,
|
|
query::row_limit(4),
|
|
query::partition_limit::max,
|
|
gc_clock::now(),
|
|
std::nullopt,
|
|
query_id::create_random_id(),
|
|
query::is_first_page::yes);
|
|
|
|
auto cmd2 = cmd1;
|
|
auto cr = query::clustering_range::make_starting_with({clustering_key::from_single_value(*s, int32_type->decompose(3)), false});
|
|
cmd2.slice = partition_slice_builder(*s).set_specific_ranges(query::specific_ranges(dk.key(), {cr})).build();
|
|
cmd2.is_first_page = query::is_first_page::no;
|
|
|
|
auto pr = dht::partition_range::make_starting_with({dk, true});
|
|
auto prs = dht::partition_range_vector{pr};
|
|
|
|
auto read_page = [&] (scheduling_group sg, const query::read_command& cmd) {
|
|
with_scheduling_group(sg, [&] {
|
|
return query_data_on_all_shards(e.db(), s, cmd, prs, query::result_options::only_result(), {}, db::no_timeout);
|
|
}).get();
|
|
};
|
|
|
|
read_page(default_scheduling_group(), cmd1);
|
|
BOOST_REQUIRE_EXCEPTION(read_page(sched_groups.statement_scheduling_group, cmd2), std::runtime_error,
|
|
testing::exception_predicate::message_contains("semaphore mismatch detected, dropping reader"));
|
|
});
|
|
}
|
|
|
|
// Test `upgrade_sstables` on all keyspaces (including the system keyspace).
|
|
// Refs: #9494 (https://github.com/scylladb/scylla/issues/9494)
|
|
SEASTAR_TEST_CASE(upgrade_sstables) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.db().invoke_on_all([] (replica::database& db) -> future<> {
|
|
auto& cm = db.get_compaction_manager();
|
|
for (auto& [ks_name, ks] : db.get_keyspaces()) {
|
|
const auto& erm = ks.get_static_effective_replication_map();
|
|
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(co_await db.get_keyspace_local_ranges(erm));
|
|
for (auto& [cf_name, schema] : ks.metadata()->cf_meta_data()) {
|
|
auto& t = db.find_column_family(schema->id());
|
|
constexpr bool exclude_current_version = false;
|
|
co_await t.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) {
|
|
return cm.perform_sstable_upgrade(owned_ranges_ptr, ts, exclude_current_version, tasks::task_info{});
|
|
});
|
|
}
|
|
}
|
|
}).get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(per_service_level_reader_concurrency_semaphore_test) {
|
|
cql_test_config cfg;
|
|
do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const size_t num_service_levels = 3;
|
|
const size_t num_keys_to_insert = 10;
|
|
const size_t num_individual_reads_to_test = 50;
|
|
auto& db = e.local_db();
|
|
database_test_wrapper dbt(db);
|
|
size_t total_memory = dbt.get_total_user_reader_concurrency_semaphore_memory();
|
|
sharded<qos::service_level_controller>& sl_controller = e.service_level_controller_service();
|
|
std::array<sstring, num_service_levels> sl_names;
|
|
qos::service_level_options slo;
|
|
size_t expected_total_weight = 200; // 200 from `sl:driver`
|
|
auto index_to_weight = [] (size_t i) -> size_t {
|
|
return (i + 1)*100;
|
|
};
|
|
|
|
// make the default service level take as little memory as possible
|
|
slo.shares.emplace<int32_t>(1);
|
|
expected_total_weight += 1;
|
|
sl_controller.local().add_service_level(qos::service_level_controller::default_service_level_name, slo).get();
|
|
|
|
// Just to make the code more readable.
|
|
auto get_reader_concurrency_semaphore_for_sl = [&] (sstring sl_name) -> reader_concurrency_semaphore& {
|
|
return *sl_controller.local().with_service_level(sl_name, noncopyable_function<reader_concurrency_semaphore*()>([&] {
|
|
return &db.get_reader_concurrency_semaphore();
|
|
})).get();
|
|
};
|
|
|
|
for (unsigned i = 0; i < num_service_levels; i++) {
|
|
sstring sl_name = format("sl{}", i);
|
|
slo.shares.emplace<int32_t>(index_to_weight(i));
|
|
sl_controller.local().add_service_level(sl_name, slo).get();
|
|
expected_total_weight += index_to_weight(i);
|
|
// Make sure that the total weight is tracked correctly in the semaphore group
|
|
BOOST_REQUIRE_EQUAL(expected_total_weight, dbt.get_total_user_reader_concurrency_semaphore_weight());
|
|
sl_names[i] = sl_name;
|
|
size_t total_distributed_memory = 0;
|
|
// Include `sl:driver` in computations
|
|
total_distributed_memory += get_reader_concurrency_semaphore_for_sl("driver").available_resources().memory;
|
|
for (unsigned j = 0 ; j <= i ; j++) {
|
|
reader_concurrency_semaphore& sem = get_reader_concurrency_semaphore_for_sl(sl_names[j]);
|
|
// Make sure that all semaphores that has been created until now - have the right amount of available memory
|
|
// after the operation has ended.
|
|
// We allow for a small delta of up to num_service_levels. This allows an off-by-one for each semaphore,
|
|
// the remainder being added to one of the semaphores.
|
|
// We make sure this didn't leak/create memory by checking the total below.
|
|
const auto delta = std::abs(ssize_t((index_to_weight(j) * total_memory) / expected_total_weight) - sem.available_resources().memory);
|
|
BOOST_REQUIRE_LE(delta, num_service_levels);
|
|
total_distributed_memory += sem.available_resources().memory;
|
|
}
|
|
total_distributed_memory += get_reader_concurrency_semaphore_for_sl(qos::service_level_controller::default_service_level_name).available_resources().memory;
|
|
BOOST_REQUIRE_EQUAL(total_distributed_memory, total_memory);
|
|
}
|
|
|
|
auto get_semaphores_stats_snapshot = [&] () {
|
|
std::unordered_map<sstring, reader_concurrency_semaphore::stats> snapshot;
|
|
for (auto&& sl_name : sl_names) {
|
|
snapshot[sl_name] = get_reader_concurrency_semaphore_for_sl(sl_name).get_stats();
|
|
}
|
|
return snapshot;
|
|
};
|
|
e.execute_cql("CREATE TABLE tbl (a int, b int, PRIMARY KEY (a));").get();
|
|
|
|
for (unsigned i = 0; i < num_keys_to_insert; i++) {
|
|
for (unsigned j = 0; j < num_keys_to_insert; j++) {
|
|
e.execute_cql(format("INSERT INTO tbl(a, b) VALUES ({}, {});", i, j)).get();
|
|
}
|
|
}
|
|
|
|
for (unsigned i = 0; i < num_individual_reads_to_test; i++) {
|
|
int random_service_level = tests::random::get_int(num_service_levels - 1);
|
|
auto snapshot_before = get_semaphores_stats_snapshot();
|
|
|
|
sl_controller.local().with_service_level(sl_names[random_service_level], noncopyable_function<future<>()> ([&] {
|
|
return e.execute_cql("SELECT * FROM tbl;").discard_result();
|
|
})).get();
|
|
auto snapshot_after = get_semaphores_stats_snapshot();
|
|
for (auto& [sl_name, stats] : snapshot_before) {
|
|
// Make sure that the only semaphore that experienced any activity (at least measured activity) is
|
|
// the semaphore that belongs to the current service level.
|
|
BOOST_REQUIRE((stats == snapshot_after[sl_name] && sl_name != sl_names[random_service_level]) ||
|
|
(stats != snapshot_after[sl_name] && sl_name == sl_names[random_service_level]));
|
|
}
|
|
}
|
|
}, std::move(cfg)).get();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(populate_from_quarantine_works) {
|
|
auto tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
locator::host_id host_id;
|
|
|
|
// populate tmpdir_for_data and
|
|
// move a random sstable to quarantine
|
|
co_await do_with_some_data({"cf"}, [&host_id] (cql_test_env& e) -> future<> {
|
|
host_id = e.local_db().get_token_metadata().get_my_id();
|
|
auto& db = e.db();
|
|
co_await db.invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
});
|
|
auto shard = tests::random::get_int<unsigned>(0, smp::count);
|
|
auto found = false;
|
|
for (unsigned i = 0; i < smp::count && !found; i++) {
|
|
found = co_await db.invoke_on((shard + i) % smp::count, [] (replica::database& db) -> future<bool> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
bool found = false;
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
|
|
testlog.debug("Moving sstable #{} out of {} to quarantine", idx, sstables.size());
|
|
auto sst = sstables[idx];
|
|
co_await sst->change_state(sstables::sstable_state::quarantine);
|
|
found |= true;
|
|
});
|
|
co_return found;
|
|
});
|
|
}
|
|
BOOST_REQUIRE(found);
|
|
}, false, db_cfg_ptr);
|
|
|
|
// reload the table from tmpdir_for_data and
|
|
// verify that all rows are still there
|
|
size_t row_count = 0;
|
|
cql_test_config test_config(db_cfg_ptr);
|
|
test_config.host_id = host_id;
|
|
co_await do_with_cql_env([&row_count] (cql_test_env& e) -> future<> {
|
|
auto res = co_await e.execute_cql("select * from ks.cf;");
|
|
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(res);
|
|
BOOST_REQUIRE(rows);
|
|
row_count = rows->rs().result_set().size();
|
|
}, std::move(test_config));
|
|
BOOST_REQUIRE_EQUAL(row_count, 6);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
|
return do_with_some_data({"cf"}, [] (cql_test_env& e) -> future<> {
|
|
auto& db = e.db();
|
|
co_await db.invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
});
|
|
|
|
std::set<sstring> expected = {
|
|
"manifest.json",
|
|
};
|
|
|
|
// move a random sstable to quarantine
|
|
auto shard = tests::random::get_int<unsigned>(0, smp::count);
|
|
auto found = false;
|
|
for (unsigned i = 0; i < smp::count; i++) {
|
|
co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
// collect all expected sstable data files
|
|
for (auto sst : sstables) {
|
|
expected.insert(sst->component_basename(sstables::component_type::Data));
|
|
}
|
|
if (std::exchange(found, true)) {
|
|
co_return;
|
|
}
|
|
auto idx = tests::random::get_int<size_t>(0, sstables.size() - 1);
|
|
auto sst = sstables[idx];
|
|
co_await sst->change_state(sstables::sstable_state::quarantine);
|
|
});
|
|
});
|
|
}
|
|
BOOST_REQUIRE(found);
|
|
|
|
co_await take_snapshot(e, "ks", "cf", "test", db::snapshot_options{.skip_flush = true});
|
|
|
|
testlog.debug("Expected: {}", expected);
|
|
|
|
// snapshot triggered a flush and wrote the data down.
|
|
BOOST_REQUIRE_GT(expected.size(), 1);
|
|
|
|
auto& cf = db.local().find_column_family("ks", "cf");
|
|
|
|
auto in_snap_dir = co_await collect_files(table_dir(cf) / sstables::snapshots_dir / "test");
|
|
// all files were copied and manifest was generated
|
|
BOOST_REQUIRE(std::includes(in_snap_dir.begin(), in_snap_dir.end(), expected.begin(), expected.end()));
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("create table ks.cf (k text, v int, primary key (k));").get();
|
|
auto& db = e.local_db();
|
|
auto& tbl = db.find_column_family("ks", "cf");
|
|
|
|
auto op = std::optional(tbl.read_in_progress());
|
|
auto s = tbl.schema();
|
|
auto q = replica::querier(
|
|
tbl.as_mutation_source(),
|
|
tbl.schema(),
|
|
database_test_wrapper(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}),
|
|
query::full_partition_range,
|
|
s->full_slice(),
|
|
nullptr,
|
|
tombstone_gc_state(nullptr));
|
|
|
|
auto f = replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
|
|
|
|
// we add a querier to the querier cache while the drop is ongoing
|
|
auto& qc = db.get_querier_cache();
|
|
qc.insert_data_querier(query_id::create_random_id(), std::move(q), nullptr);
|
|
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 1);
|
|
|
|
op.reset(); // this should allow the drop to finish
|
|
f.get();
|
|
|
|
// the drop should have cleaned up all entries belonging to that table
|
|
BOOST_REQUIRE_EQUAL(qc.get_stats().population, 0);
|
|
});
|
|
}
|
|
|
|
static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = format("table_with_auto_snapshot_{}", auto_snapshot ? "enabled" : "disabled");
|
|
auto tmpdir_for_data = make_lw_shared<tmpdir>();
|
|
auto db_cfg_ptr = make_shared<db::config>();
|
|
db_cfg_ptr->data_file_directories(std::vector<sstring>({ tmpdir_for_data->path().string() }));
|
|
db_cfg_ptr->auto_snapshot(auto_snapshot);
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// Pass `with_snapshot=true` to drop_table_on_all
|
|
// to allow auto_snapshot (based on the configuration above).
|
|
// The table directory should therefore exist after the table is dropped if auto_snapshot is disabled in the configuration.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, auto_snapshot);
|
|
co_return;
|
|
}, false, db_cfg_ptr);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_auto_snapshot_enabled) {
|
|
return test_drop_table_with_auto_snapshot(true);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_auto_snapshot_disabled) {
|
|
return test_drop_table_with_auto_snapshot(false);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "table_with_no_snapshot";
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// Pass `with_snapshot=false` to drop_table_on_all
|
|
// to disallow auto_snapshot.
|
|
// The table directory should therefore not exist after the table is dropped.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, false);
|
|
co_return;
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
|
|
sstring ks_name = "ks";
|
|
sstring table_name = "table_with_explicit_snapshot";
|
|
|
|
co_await do_with_some_data({table_name}, [&] (cql_test_env& e) -> future<> {
|
|
auto snapshot_tag = format("test-{}", db_clock::now().time_since_epoch().count());
|
|
co_await take_snapshot(e, ks_name, table_name, snapshot_tag);
|
|
auto cf_dir = table_dir(e.local_db().find_column_family(ks_name, table_name)).native();
|
|
|
|
// With explicit snapshot and with_snapshot=false
|
|
// dir should still be kept, regardless of the
|
|
// with_snapshot parameter and auto_snapshot config.
|
|
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
|
auto cf_dir_exists = co_await file_exists(cf_dir);
|
|
BOOST_REQUIRE_EQUAL(cf_dir_exists, true);
|
|
co_return;
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(mutation_dump_generated_schema_deterministic_id_version) {
|
|
simple_schema s;
|
|
auto os1 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
|
|
auto os2 = replica::mutation_dump::generate_output_schema_from_underlying_schema(s.schema());
|
|
|
|
BOOST_REQUIRE_EQUAL(os1->id(), os2->id());
|
|
BOOST_REQUIRE_EQUAL(os1->version(), os2->version());
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(enable_drained_compaction_manager) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.db().invoke_on_all([] (replica::database& db) -> future<> {
|
|
auto& cm = db.get_compaction_manager();
|
|
co_await cm.drain();
|
|
cm.enable();
|
|
}).get();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_drop_quarantined_sstables) {
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
e.execute_cql("create table cf (p text PRIMARY KEY, c int)").get();
|
|
for (int i = 0; i < 100; i++) {
|
|
e.execute_cql(format("insert into cf (p, c) values ('key{}', {})", i * i, i)).get();
|
|
e.db().invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.flush();
|
|
}).get();
|
|
}
|
|
|
|
auto initial_sstable_count = e.db().map_reduce0(
|
|
[] (replica::database& db) -> future<size_t> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
co_return cf.sstables_count();
|
|
},
|
|
0,
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_GT(initial_sstable_count, 0);
|
|
|
|
auto quarantined_count = e.db().map_reduce0(
|
|
[] (replica::database& _db) -> future<size_t> {
|
|
auto& cf = _db.find_column_family("ks", "cf");
|
|
size_t quarantined_on_shard = 0;
|
|
auto& cm = cf.get_compaction_manager();
|
|
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
|
return cm.run_with_compaction_disabled(ts, [&] () -> future<> {
|
|
auto sstables = co_await in_strategy_sstables(ts);
|
|
if (sstables.empty()) {
|
|
co_return;
|
|
}
|
|
auto quarantine_n = 1 + tests::random::get_int<size_t>(sstables.size() / 5);
|
|
quarantined_on_shard += quarantine_n;
|
|
for (size_t i = 0; i < quarantine_n; i++) {
|
|
co_await sstables[i]->change_state(sstables::sstable_state::quarantine);
|
|
}
|
|
});
|
|
});
|
|
co_return quarantined_on_shard;
|
|
},
|
|
size_t(0),
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_GT(quarantined_count, 0);
|
|
|
|
e.db().invoke_on_all([] (replica::database& db) {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
return cf.drop_quarantined_sstables();
|
|
}).get();
|
|
|
|
auto remaining_quarantined = e.db().map_reduce0(
|
|
[] (replica::database& db) -> future<size_t> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
auto& sstables = *cf.get_sstables();
|
|
co_return std::count_if(sstables.begin(), sstables.end(), [] (shared_sstable sst) {
|
|
return sst->is_quarantined();
|
|
});
|
|
},
|
|
size_t(0),
|
|
std::plus<size_t>{}
|
|
).get();
|
|
BOOST_REQUIRE_EQUAL(remaining_quarantined, 0);
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_tombstone_gc_state_snapshot) {
|
|
auto table_gc_mode_timeout = schema_builder("test", "table_gc_mode_timeout")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "timeout"} }))
|
|
.set_gc_grace_seconds(10)
|
|
.build();
|
|
auto table_gc_mode_disabled = schema_builder("test", "table_gc_mode_disabled")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "disabled"} }))
|
|
.build();
|
|
auto table_gc_mode_immediate = schema_builder("test", "table_gc_mode_immediate")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "immediate"} }))
|
|
.build();
|
|
auto table_gc_mode_repair1 = schema_builder("test", "table_gc_mode_repair1")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "188"} }))
|
|
.build();
|
|
auto table_gc_mode_repair2 = schema_builder("test", "table_gc_mode_repair2")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "288"} }))
|
|
.build();
|
|
auto table_gc_mode_repair3 = schema_builder("test", "table_gc_mode_repair3")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({ {"mode", "repair"}, {"propagation_delay_in_seconds", "388"} }))
|
|
.build();
|
|
|
|
schema_builder::register_schema_initializer([] (schema_builder& builder) {
|
|
if (builder.ks_name() == "test" && builder.cf_name() == "table_gc_mode_group0") {
|
|
builder.set_is_group0_table(true);
|
|
}
|
|
});
|
|
auto table_gc_mode_group0 = schema_builder("test", "table_gc_mode_group0")
|
|
.with_column("pk", utf8_type, column_kind::partition_key)
|
|
.build();
|
|
|
|
BOOST_REQUIRE(table_gc_mode_group0->static_props().is_group0_table);
|
|
|
|
// One pk to rule them all, all schemas have the same partition key, so we
|
|
// can reuse a single key for this test.
|
|
const auto pk = partition_key::from_single_value(*table_gc_mode_timeout, utf8_type->decompose(data_value("pk")));
|
|
const auto dk = dht::decorate_key(*table_gc_mode_timeout, pk);
|
|
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dk.token());
|
|
|
|
shared_tombstone_gc_state shared_state;
|
|
|
|
const auto first_repair_time = gc_clock::now() - gc_clock::duration(std::chrono::hours(6));
|
|
|
|
shared_state.update_repair_time(table_gc_mode_repair1->id(), repair_range, first_repair_time);
|
|
shared_state.update_repair_time(table_gc_mode_repair2->id(), repair_range, first_repair_time);
|
|
|
|
shared_state.update_group0_refresh_time(first_repair_time);
|
|
|
|
auto snapshot = shared_state.snapshot();
|
|
BOOST_REQUIRE_LE(gc_clock::now() - snapshot.query_time(), gc_clock::duration(std::chrono::seconds(1)));
|
|
|
|
// Advance gc clock and change the gc state to simulate a later point in time.
|
|
// Then check that gc-before against the shared-state yields the current
|
|
// state, while gc-before against the snapshot yields the before state.
|
|
|
|
const auto now = gc_clock::now() + gc_clock::duration(std::chrono::hours(6));
|
|
const auto gc_state = tombstone_gc_state(shared_state).with_commitlog_check_disabled();
|
|
|
|
const auto second_repair_time = gc_clock::now() + gc_clock::duration(std::chrono::hours(3));
|
|
|
|
shared_state.update_repair_time(table_gc_mode_repair1->id(), repair_range, second_repair_time);
|
|
shared_state.drop_repair_history_for_table(table_gc_mode_repair2->id());
|
|
shared_state.update_repair_time(table_gc_mode_repair3->id(), repair_range, second_repair_time);
|
|
|
|
shared_state.update_group0_refresh_time(second_repair_time);
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_timeout, dk, now), now - table_gc_mode_timeout->gc_grace_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_timeout, dk, false), snapshot.query_time() - table_gc_mode_timeout->gc_grace_seconds());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_disabled, dk, now), gc_clock::time_point::min());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_disabled, dk, false), gc_clock::time_point::min());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_immediate, dk, now), now);
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_immediate, dk, false), snapshot.query_time());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair1, dk, now), second_repair_time - table_gc_mode_repair1->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair2, dk, now), gc_clock::time_point::min());
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_repair3, dk, now), second_repair_time - table_gc_mode_repair3->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair1, dk, false), first_repair_time - table_gc_mode_repair1->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair2, dk, false), first_repair_time - table_gc_mode_repair2->tombstone_gc_options().propagation_delay_in_seconds());
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_repair3, dk, false), gc_clock::time_point::min());
|
|
|
|
BOOST_REQUIRE_EQUAL(gc_state.get_gc_before_for_key(table_gc_mode_group0, dk, now), second_repair_time);
|
|
BOOST_REQUIRE_EQUAL(snapshot.get_gc_before_for_key(table_gc_mode_group0, dk, false), first_repair_time);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_max_purgeable_combine) {
|
|
const gc_clock::time_point t_pre_treshold = gc_clock::now();
|
|
const gc_clock::time_point t1 = t_pre_treshold + std::chrono::seconds(10);
|
|
const gc_clock::time_point t2 = t1 + std::chrono::seconds(10);
|
|
const gc_clock::time_point t_post_treshold = t2 + std::chrono::seconds(10);
|
|
|
|
auto check_tombstone = [] (const max_purgeable& mp, const max_purgeable& combined, tombstone t, bool expected_can_purge, std::source_location loc = std::source_location::current()) {
|
|
testlog.trace("check_tombstone({}, {}, {}, {}) @ {}:{}", mp, combined, t, expected_can_purge, loc.file_name(), loc.line());
|
|
BOOST_REQUIRE_EQUAL(expected_can_purge, mp.can_purge(t).can_purge);
|
|
if (!expected_can_purge) {
|
|
// The combined max_purgeable can be weaker than this input, if
|
|
// the other input had no expiry threshold.
|
|
BOOST_REQUIRE(!combined.can_purge(t).can_purge);
|
|
}
|
|
};
|
|
auto check_tombstones = [&] (const max_purgeable& mp, const max_purgeable& combined) {
|
|
if (mp) {
|
|
check_tombstone(mp, combined, {mp.timestamp() - 1, t_post_treshold}, true);
|
|
check_tombstone(mp, combined, {mp.timestamp() + 1, t_post_treshold}, false);
|
|
if (mp.expiry_threshold()) {
|
|
check_tombstone(mp, combined, {mp.timestamp() - 1, t_pre_treshold}, true);
|
|
check_tombstone(mp, combined, {mp.timestamp() + 1, t_pre_treshold}, true);
|
|
}
|
|
} else {
|
|
check_tombstone(mp, combined, {1, t_post_treshold}, true);
|
|
}
|
|
};
|
|
|
|
auto check = [&] (const max_purgeable& a, const max_purgeable& b, const max_purgeable& expected, std::source_location loc = std::source_location::current()) {
|
|
auto combined = a;
|
|
combined.combine(b);
|
|
|
|
testlog.debug("combine({}, {}) => {} == {} @ {}:{}", a, b, combined, expected, loc.file_name(), loc.line());
|
|
|
|
BOOST_REQUIRE(a || b);
|
|
BOOST_REQUIRE_EQUAL(combined, expected);
|
|
|
|
check_tombstones(a, combined);
|
|
check_tombstones(b, combined);
|
|
};
|
|
|
|
check({}, max_purgeable{100}, max_purgeable{100});
|
|
check(max_purgeable{100}, {}, max_purgeable{100});
|
|
check(max_purgeable{10}, max_purgeable{100}, max_purgeable{10});
|
|
|
|
const auto ts_mt = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
|
|
const auto ts_sst = max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data;
|
|
|
|
check({}, max_purgeable{100, ts_mt}, max_purgeable{100, ts_mt});
|
|
check(max_purgeable{100, ts_mt}, {}, max_purgeable{100, ts_mt});
|
|
check(max_purgeable{10, ts_sst}, max_purgeable{100, ts_mt}, max_purgeable{10, ts_sst});
|
|
|
|
check({}, max_purgeable{100, t1, ts_mt}, max_purgeable{100, t1, ts_mt});
|
|
check(max_purgeable{10, ts_mt}, max_purgeable{100, t1, ts_sst}, max_purgeable{10, ts_mt});
|
|
check(max_purgeable{100, ts_mt}, max_purgeable{10, t1, ts_sst}, max_purgeable{10, ts_sst});
|
|
check(max_purgeable{10, t1, ts_mt}, max_purgeable{100, t2, ts_sst}, max_purgeable{10, t1, ts_mt});
|
|
check(max_purgeable{100, t1, ts_mt}, max_purgeable{10, t2, ts_sst}, max_purgeable{10, t1, ts_sst});
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_max_purgeable_can_purge) {
|
|
const gc_clock::time_point t_pre_treshold = gc_clock::now();
|
|
const gc_clock::time_point t1 = t_pre_treshold + std::chrono::seconds(10);
|
|
const gc_clock::time_point t_post_treshold = t1 + std::chrono::seconds(10);
|
|
|
|
const auto ts_mt = max_purgeable::timestamp_source::memtable_possibly_shadowing_data;
|
|
const auto ts_sst = max_purgeable::timestamp_source::other_sstables_possibly_shadowing_data;
|
|
|
|
auto check = [] (const max_purgeable& mp, tombstone t, bool expected_can_gc) {
|
|
const auto res = mp.can_purge(t);
|
|
BOOST_REQUIRE_EQUAL(res.can_purge, expected_can_gc);
|
|
BOOST_REQUIRE_EQUAL(res.timestamp_source, mp.source());
|
|
};
|
|
|
|
check({}, {100, t1}, true);
|
|
|
|
check(max_purgeable{10, ts_mt}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{100, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{200, ts_sst}, tombstone{100, t_post_treshold}, true);
|
|
|
|
check(max_purgeable{10, t1, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{100, t1, ts_sst}, tombstone{100, t_post_treshold}, false);
|
|
check(max_purgeable{200, t1, ts_sst}, tombstone{100, t_post_treshold}, true);
|
|
|
|
check(max_purgeable{10, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
check(max_purgeable{100, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
check(max_purgeable{200, t1, ts_sst}, tombstone{100, t_pre_treshold}, true);
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_query_tombstone_gc) {
|
|
return do_with_cql_env_thread([] (cql_test_env& env) {
|
|
const auto keyspace_name = get_name();
|
|
const auto table_name = "tbl";
|
|
|
|
// Can use tablets and RF=1 after #21623 is fixed.
|
|
env.execute_cql(std::format("CREATE KEYSPACE {} WITH"
|
|
" replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND"
|
|
" tablets = {{'enabled': 'false'}}", keyspace_name)).get();
|
|
env.execute_cql(std::format("CREATE TABLE {}.{} (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
|
|
" WITH compaction = {{'class': 'NullCompactionStrategy'}}"
|
|
" AND tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 0}}", keyspace_name, table_name)).get();
|
|
|
|
auto& db = env.local_db();
|
|
auto& tbl = db.find_column_family(keyspace_name, table_name);
|
|
const auto schema = tbl.schema();
|
|
const auto tid = schema->id();
|
|
|
|
const auto pk_value = 1;
|
|
const auto pk = partition_key::from_exploded(*schema, {data_value(pk_value).serialize_nonnull()});
|
|
const auto dk = dht::decorate_key(*schema, pk);
|
|
const auto key_shard = tbl.shard_for_reads(dk.token());
|
|
|
|
env.execute_cql(format("DELETE FROM {}.{} WHERE pk = 1 AND ck = 1", keyspace_name, table_name, pk_value)).get();
|
|
|
|
env.db().invoke_on(key_shard, [] (replica::database& db) {
|
|
return db.flush_commitlog();
|
|
}).get();
|
|
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
const auto repair_time = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
|
|
env.db().invoke_on_all([tid, &repair_range, &repair_time] (replica::database& db) {
|
|
auto& tbl = db.find_column_family(tid);
|
|
tbl.get_compaction_manager().get_shared_tombstone_gc_state().update_repair_time(tid, repair_range, repair_time);
|
|
}).get();
|
|
|
|
testlog.info("repair_time: {}", repair_time);
|
|
|
|
auto slice = partition_slice_builder(*schema, schema->full_slice())
|
|
.with_option<query::partition_slice::option::bypass_cache>()
|
|
.build();
|
|
const auto cmd = query::read_command(schema->id(), schema->version(), slice, db.get_query_max_result_size(), query::tombstone_limit::max);
|
|
const auto pr = dht::partition_range::make_singular(dk);
|
|
|
|
env.db().invoke_on(key_shard, [tid, &cmd, &pr] (replica::database& db) -> future<> {
|
|
auto& tbl = db.find_column_family(tid);
|
|
const auto schema = tbl.schema();
|
|
auto permit = co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {});
|
|
auto accounter = co_await db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, query::short_read::no);
|
|
|
|
const auto res = co_await tbl.mutation_query(schema, std::move(permit), cmd, pr, {}, std::move(accounter), db::no_timeout);
|
|
BOOST_CHECK_EQUAL(res.partitions().size(), 0);
|
|
}).get();
|
|
|
|
{
|
|
const auto res = replica::query_mutations_on_all_shards(env.db(), schema, cmd, {pr}, {}, db::no_timeout).get();
|
|
BOOST_CHECK_EQUAL(std::get<0>(res)->partitions().size(), 0);
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_tombstone_gc_state_gc_mode) {
|
|
const auto compact_range = dht::token_range::make(dht::token(100), dht::token(200));
|
|
|
|
auto check = [&] (tombstone_gc_state tombstone_gc, schema_ptr schema, dht::decorated_key dk, gc_clock::time_point now,
|
|
gc_clock::time_point expected_gc_before, std::source_location loc = std::source_location::current()) {
|
|
testlog.info("check() @ {}:{}", loc.file_name(), loc.line());
|
|
|
|
auto gc_before = tombstone_gc.get_gc_before_for_key(schema, dk, now);
|
|
BOOST_REQUIRE_EQUAL(gc_before, expected_gc_before);
|
|
|
|
auto gc_res = tombstone_gc.get_gc_before_for_range(schema, compact_range, now);
|
|
BOOST_REQUIRE_EQUAL(gc_res.min_gc_before, expected_gc_before);
|
|
BOOST_REQUIRE_EQUAL(gc_res.max_gc_before, expected_gc_before);
|
|
BOOST_REQUIRE_EQUAL(gc_res.knows_entire_range, true);
|
|
};
|
|
|
|
shared_tombstone_gc_state shared_state;
|
|
|
|
for (auto gc_mode : {tombstone_gc_mode::timeout, tombstone_gc_mode::disabled, tombstone_gc_mode::immediate, tombstone_gc_mode::repair}) {
|
|
auto schema = schema_builder("ks", "tbl")
|
|
.with_column("pk", int32_type, column_kind::partition_key)
|
|
.with_tombstone_gc_options(tombstone_gc_options({{"mode", fmt::to_string(gc_mode)}}))
|
|
.build();
|
|
|
|
const auto repair_time = gc_clock::now() - gc_clock::duration(std::chrono::hours(6));
|
|
const auto repair_range = dht::token_range::make(dht::first_token(), dht::last_token());
|
|
shared_state.update_repair_time(schema->id(), repair_range, repair_time);
|
|
|
|
const auto now = gc_clock::now();
|
|
|
|
const auto pk = partition_key::from_single_value(*schema, serialized(1));
|
|
const auto dk = dht::decorate_key(*schema, pk);
|
|
|
|
// These constructors overrides gc_mode
|
|
check(tombstone_gc_state::no_gc(), schema, dk, now, gc_clock::time_point::min());
|
|
check(tombstone_gc_state::gc_all(), schema, dk, now, gc_clock::time_point::max());
|
|
|
|
switch (gc_mode) {
|
|
case tombstone_gc_mode::timeout:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, now - std::chrono::seconds(schema->gc_grace_seconds()));
|
|
break;
|
|
case tombstone_gc_mode::disabled:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, gc_clock::time_point::min());
|
|
break;
|
|
case tombstone_gc_mode::immediate:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, now);
|
|
break;
|
|
case tombstone_gc_mode::repair:
|
|
check(tombstone_gc_state(shared_state, false), schema, dk, now, repair_time - schema->tombstone_gc_options().propagation_delay_in_seconds());
|
|
break;
|
|
}
|
|
}
|
|
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_flush_empty_table_waits_on_outstanding_flush) {
|
|
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
|
testlog.debug("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
|
return make_ready_future();
|
|
#endif
|
|
return do_with_some_data_in_thread({"cf"}, [] (cql_test_env& e) {
|
|
auto found = e.db().map_reduce0([&] (replica::database& db) -> future<bool> {
|
|
auto& cf = db.find_column_family("ks", "cf");
|
|
|
|
if (!cf.needs_flush()) {
|
|
co_return false;
|
|
}
|
|
|
|
utils::get_local_injector().enable("flush_memtable_to_sstable_wait");
|
|
|
|
auto flushed_0 = cf.flush();
|
|
if (flushed_0.available()) {
|
|
testlog.error("Table flush completed too early");
|
|
BOOST_REQUIRE(!flushed_0.available());
|
|
}
|
|
|
|
if (!cf.needs_flush()) {
|
|
testlog.error("Expecting needs_flush when waiting in flush_memtable_to_sstable_wait");
|
|
BOOST_REQUIRE(cf.needs_flush());
|
|
}
|
|
|
|
// Now flush again when the active memtable is empty.
|
|
// Expect that this waits on the ongoing flush
|
|
auto flushed_1 = cf.flush();
|
|
|
|
// While flush_0 is blocked, flush_1 should be blocked behind it
|
|
if (flushed_0.available()) {
|
|
testlog.error("First table flush expected to be blocked on injected wait");
|
|
BOOST_REQUIRE(!flushed_0.available());
|
|
}
|
|
if (flushed_1.available()) {
|
|
testlog.error("Second table flush expected to be blocked behind first flush");
|
|
BOOST_REQUIRE(!flushed_1.available());
|
|
}
|
|
if (!cf.needs_flush()) {
|
|
testlog.error("Expecting needs_flush when waiting in second flush");
|
|
BOOST_REQUIRE(cf.needs_flush());
|
|
}
|
|
|
|
utils::get_local_injector().receive_message("flush_memtable_to_sstable_wait");
|
|
co_await std::move(flushed_0);
|
|
co_await std::move(flushed_1);
|
|
|
|
if (cf.needs_flush()) {
|
|
testlog.error("Table is not expected to need flush after flush completed");
|
|
BOOST_REQUIRE(!cf.needs_flush());
|
|
}
|
|
|
|
co_return true;
|
|
}, false, std::logical_or<bool>()).get();
|
|
BOOST_REQUIRE(found);
|
|
});
|
|
}
|
|
|
|
struct scoped_execption_log_level {
|
|
scoped_execption_log_level() {
|
|
smp::invoke_on_all([] {
|
|
global_logger_registry().set_logger_level("exception", log_level::debug);
|
|
}).get();
|
|
}
|
|
~scoped_execption_log_level() {
|
|
smp::invoke_on_all([] {
|
|
global_logger_registry().set_logger_level("exception", log_level::info);
|
|
}).get();
|
|
}
|
|
};
|
|
|
|
SEASTAR_TEST_CASE(replica_read_timeout_no_exception) {
|
|
cql_test_config cfg;
|
|
cfg.db_config->reader_concurrency_semaphore_preemptive_abort_factor.set(0.0);
|
|
const auto read_timeout = 10ms;
|
|
const auto write_timeout = 10s;
|
|
cfg.query_timeout.emplace(timeout_config{
|
|
.read_timeout = read_timeout,
|
|
.write_timeout = write_timeout,
|
|
.range_read_timeout = read_timeout,
|
|
.counter_write_timeout = write_timeout,
|
|
.truncate_timeout = write_timeout,
|
|
.cas_timeout = write_timeout,
|
|
.other_timeout = read_timeout});
|
|
|
|
return do_with_cql_env_thread([] (cql_test_env& e) {
|
|
const sstring ks_name = get_name();
|
|
const sstring tbl_name = "tbl";
|
|
|
|
// Disable tablets because we want to exercise the legacy range-scan path too.
|
|
// With tablets, only the table::query() path is used. Vnodes cover both.
|
|
e.execute_cql(format("CREATE KEYSPACE {} WITH"
|
|
" replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND"
|
|
" tablets = {{'enabled': 'false'}}", ks_name)).get();
|
|
|
|
e.execute_cql(format("CREATE TABLE {}.{} (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH compaction = {{'class': 'NullCompactionStrategy'}}", ks_name, tbl_name)).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 0, 0)", ks_name, tbl_name)).get();
|
|
replica::database::flush_table_on_all_shards(e.db(), ks_name, tbl_name).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 1, 1)", ks_name, tbl_name)).get();
|
|
replica::database::flush_table_on_all_shards(e.db(), ks_name, tbl_name).get();
|
|
|
|
e.execute_cql(format("INSERT INTO {}.{} (pk, ck, v) VALUES (1, 2, 2)", ks_name, tbl_name)).get();
|
|
|
|
// One successful read, so the auth cache is populated.
|
|
e.execute_cql(format("SELECT * FROM {}.{} WHERE pk = 1", ks_name, tbl_name)).get();
|
|
|
|
auto get_cxx_exceptions = [&e] {
|
|
return e.db().map_reduce0([] (replica::database&) { return engine().cxx_exceptions(); }, 0, std::plus<size_t>()).get();
|
|
};
|
|
|
|
auto full_scan_id = e.prepare(format("SELECT * FROM {}.{} BYPASS CACHE", ks_name, tbl_name)).get();
|
|
auto partition_scan_id = e.prepare(format("SELECT * FROM {}.{} WHERE pk = 1 BYPASS CACHE", ks_name, tbl_name)).get();
|
|
|
|
auto execute_test = [&] (const char* test_name, bool scan) {
|
|
testlog.info("Executing test: {} scan={}", test_name, scan);
|
|
|
|
const auto cxx_exceptions_before = get_cxx_exceptions();
|
|
const size_t num_reads = 1000;
|
|
|
|
scoped_execption_log_level exception_log_level_scope;
|
|
|
|
const auto query = scan ? full_scan_id : partition_scan_id;
|
|
|
|
testlog.info("Executing {} {}", num_reads, scan ? "full scans" : "partition scans");
|
|
|
|
std::vector<future<>> futures;
|
|
for (size_t i = 0; i < num_reads; ++i) {
|
|
futures.emplace_back(e.execute_prepared(query, {}).discard_result());
|
|
}
|
|
|
|
// Wait for all futures and check that they all fail with a timeout exception -- without throwing any exception in the process.
|
|
while (!futures.empty()) {
|
|
auto f = std::move(futures.back());
|
|
futures.pop_back();
|
|
|
|
f.wait();
|
|
BOOST_REQUIRE(f.failed());
|
|
auto ex = f.get_exception();
|
|
BOOST_REQUIRE(try_catch<exceptions::read_timeout_exception>(ex));
|
|
}
|
|
const auto cxx_exceptions_after = get_cxx_exceptions();
|
|
|
|
testlog.info("Exceptions: before={}, after={}", cxx_exceptions_before, cxx_exceptions_after);
|
|
|
|
BOOST_REQUIRE_EQUAL(cxx_exceptions_after, cxx_exceptions_before);
|
|
};
|
|
|
|
// Test 1: execute reads that reach the disk but time out while reading from the disk.
|
|
// Ensure no exceptions are thrown, but the reads fail with a read timeout exception.
|
|
if constexpr (std::is_same_v<utils::error_injection_type, utils::error_injection<true>>) {
|
|
utils::get_local_injector().enable("sstables_mx_reader_fill_buffer_timeout", false, {{"table", format("{}.{}", ks_name, tbl_name)}});
|
|
execute_test("disk reads", false);
|
|
execute_test("disk reads", true);
|
|
utils::get_local_injector().disable("sstables_mx_reader_fill_buffer_timeout");
|
|
}
|
|
|
|
// Test 2: execute reads that get queued on the semaphore and time out while waiting for the permit.
|
|
// Ensure no exceptions are thrown, but the reads fail with a read timeout exception.
|
|
{
|
|
// Take all semaphore resources and add an active permit
|
|
// Ensures that all new subsequent reads will be queued.
|
|
std::vector<foreign_ptr<std::unique_ptr<reader_permit>>> dummy_permits;
|
|
for (shard_id shard = 0; shard < smp::count; ++shard) {
|
|
dummy_permits.emplace_back();
|
|
}
|
|
e.db().invoke_on_all([&] (replica::database& db) -> future<> {
|
|
auto& sem = db.get_reader_concurrency_semaphore();
|
|
sem.set_resources({1, 1});
|
|
dummy_permits[this_shard_id()] = std::make_unique<reader_permit>(co_await sem.obtain_permit(
|
|
db.find_column_family(ks_name, tbl_name).schema(),
|
|
"dummy",
|
|
1,
|
|
db::no_timeout,
|
|
{}));
|
|
}).get();
|
|
|
|
execute_test("queued reads", false);
|
|
execute_test("queued reads", true);
|
|
}
|
|
}, cfg);
|
|
}
|
|
|
|
BOOST_AUTO_TEST_SUITE_END()
|