Introduce a utility helper to set the first and last decorated keys on an SSTable. This is intended for testing purposes, making it easier to construct SSTables with defined boundaries in unit tests.
267 lines
10 KiB
C++
267 lines
10 KiB
C++
/*
|
|
* Copyright (C) 2017-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <optional>
|
|
|
|
#include "sstables/sstables.hh"
|
|
#include "sstables/shared_sstable.hh"
|
|
#include "sstables/index_reader.hh"
|
|
#include "sstables/writer.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "replica/memtable-sstable.hh"
|
|
#include "test/lib/test_services.hh"
|
|
#include "test/lib/sstable_test_env.hh"
|
|
#include "test/lib/reader_concurrency_semaphore.hh"
|
|
#include "db_clock.hh"
|
|
#include <seastar/core/coroutine.hh>
|
|
|
|
using namespace sstables;
|
|
using namespace std::chrono_literals;
|
|
|
|
using validate = bool_class<struct validate_tag>;
|
|
// Must be called in a seastar thread.
|
|
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, lw_shared_ptr<replica::memtable> mt);
|
|
sstables::shared_sstable make_sstable_containing(sstables::shared_sstable sst, lw_shared_ptr<replica::memtable> mt);
|
|
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, utils::chunked_vector<mutation> muts, validate do_validate = validate::yes);
|
|
sstables::shared_sstable make_sstable_containing(sstables::shared_sstable sst, utils::chunked_vector<mutation> muts, validate do_validate = validate::yes);
|
|
|
|
namespace sstables {
|
|
|
|
using sstable_ptr = shared_sstable;
|
|
|
|
class test {
|
|
protected:
|
|
sstable_ptr _sst;
|
|
public:
|
|
|
|
test(sstable_ptr s) : _sst(s) {}
|
|
|
|
summary& _summary() {
|
|
return _sst->_components->summary;
|
|
}
|
|
|
|
std::unique_ptr<index_reader> make_index_reader(reader_permit permit) {
|
|
return std::make_unique<index_reader>(_sst, std::move(permit));
|
|
}
|
|
|
|
struct index_entry {
|
|
sstables::key sstables_key;
|
|
partition_key key;
|
|
uint64_t promoted_index_size;
|
|
|
|
key_view get_key() const {
|
|
return sstables_key;
|
|
}
|
|
};
|
|
|
|
future<std::vector<index_entry>> read_indexes(reader_permit permit) {
|
|
std::vector<index_entry> entries;
|
|
auto s = _sst->get_schema();
|
|
auto ir = make_index_reader(std::move(permit));
|
|
std::exception_ptr err = nullptr;
|
|
try {
|
|
while (!ir->eof()) {
|
|
co_await ir->read_partition_data();
|
|
// In general the index might not be able to return the key,
|
|
// but this helper is only used for tests of indexes which are able to do that.
|
|
auto pk = ir->get_partition_key().value();
|
|
entries.emplace_back(index_entry{sstables::key::from_partition_key(*s, pk),
|
|
pk, ir->get_promoted_index_size()});
|
|
co_await ir->advance_to_next_partition();
|
|
}
|
|
} catch (...) {
|
|
err = std::current_exception();
|
|
}
|
|
co_await ir->close();
|
|
if (err) {
|
|
co_return coroutine::exception(std::move(err));
|
|
}
|
|
co_return entries;
|
|
}
|
|
|
|
future<> read_statistics() {
|
|
return _sst->read_statistics();
|
|
}
|
|
|
|
future<> read_summary() noexcept {
|
|
return _sst->read_summary();
|
|
}
|
|
|
|
future<> read_toc() {
|
|
return _sst->read_toc();
|
|
}
|
|
|
|
future<> read_filter() {
|
|
return _sst->read_filter();
|
|
}
|
|
|
|
future<summary_entry&> read_summary_entry(size_t i) {
|
|
return _sst->read_summary_entry(i);
|
|
}
|
|
|
|
auto& get_components() {
|
|
return _sst->_recognized_components;
|
|
}
|
|
|
|
void set_data_file_size(uint64_t size) {
|
|
_sst->_data_file_size = size;
|
|
}
|
|
|
|
void set_data_file_write_time(db_clock::time_point wtime) {
|
|
_sst->_data_file_write_time = wtime;
|
|
}
|
|
|
|
void set_run_identifier(sstables::run_id identifier) {
|
|
_sst->_run_identifier = identifier;
|
|
}
|
|
|
|
future<sstable_ptr> store(sstring dir, sstables::generation_type generation) {
|
|
_sst->_generation = generation;
|
|
co_await _sst->_storage->change_dir_for_test(dir);
|
|
_sst->_recognized_components.erase(component_type::Index);
|
|
_sst->_recognized_components.erase(component_type::Data);
|
|
co_await seastar::async([sst = _sst] {
|
|
sst->open_sstable("test");
|
|
sst->write_statistics();
|
|
sst->write_compression();
|
|
sst->write_filter();
|
|
sst->write_summary();
|
|
sst->seal_sstable(false).get();
|
|
});
|
|
co_return _sst;
|
|
}
|
|
|
|
// Used to create synthetic sstables for testing leveled compaction strategy.
|
|
void set_values_for_leveled_strategy(uint64_t fake_data_size, uint32_t sstable_level, int64_t max_timestamp, const partition_key& first_key, const partition_key& last_key) {
|
|
// Create a synthetic stats metadata
|
|
stats_metadata stats = {};
|
|
// leveled strategy sorts sstables by age using max_timestamp, let's set it to 0.
|
|
stats.max_timestamp = max_timestamp;
|
|
stats.sstable_level = sstable_level;
|
|
|
|
set_values(first_key, last_key, std::move(stats), fake_data_size);
|
|
}
|
|
|
|
void set_values(const partition_key& first_key, const partition_key& last_key, stats_metadata stats, uint64_t data_file_size = 1) {
|
|
_sst->_data_file_size = data_file_size;
|
|
_sst->_index_file_size = std::max(1UL, uint64_t(data_file_size * 0.1));
|
|
_sst->_metadata_size_on_disk = std::max(1UL, uint64_t(data_file_size * 0.01));
|
|
// scylla component must be present for a sstable to be considered fully expired.
|
|
_sst->_recognized_components.insert(component_type::Scylla);
|
|
_sst->_components->statistics.contents[metadata_type::Stats] = std::make_unique<stats_metadata>(std::move(stats));
|
|
_sst->_first = dht::decorate_key(*_sst->_schema, first_key);
|
|
_sst->_last = dht::decorate_key(*_sst->_schema, last_key);
|
|
_sst->_components->statistics.contents[metadata_type::Compaction] = std::make_unique<compaction_metadata>();
|
|
_sst->_run_identifier = run_id::create_random_id();
|
|
_sst->_shards.push_back(this_shard_id());
|
|
}
|
|
|
|
void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) {
|
|
_sst->_first = first_key;
|
|
_sst->_last = last_key;
|
|
}
|
|
|
|
void rewrite_toc_without_component(component_type component) {
|
|
SCYLLA_ASSERT(component != component_type::TOC);
|
|
_sst->_recognized_components.erase(component);
|
|
remove_file(fmt::to_string(_sst->toc_filename())).get();
|
|
_sst->_storage->open(*_sst);
|
|
_sst->seal_sstable(false).get();
|
|
}
|
|
|
|
future<> remove_component(component_type c) {
|
|
return remove_file(fmt::to_string(_sst->filename(c)));
|
|
}
|
|
|
|
fs::path filename(component_type c) const {
|
|
return fs::path(fmt::to_string(_sst->filename(c)));
|
|
}
|
|
|
|
void set_shards(std::vector<unsigned> shards) {
|
|
_sst->_shards = std::move(shards);
|
|
}
|
|
|
|
static future<> create_links(const sstable& sst, const sstring& dir) {
|
|
return sst._storage->create_links(sst, std::filesystem::path(dir));
|
|
}
|
|
|
|
future<> move_to_new_dir(sstring new_dir, generation_type new_generation) {
|
|
co_await _sst->_storage->move(*_sst, std::move(new_dir), new_generation, nullptr);
|
|
_sst->_generation = std::move(new_generation);
|
|
}
|
|
|
|
void create_bloom_filter(uint64_t estimated_partitions, double max_false_pos_prob = 0.1) {
|
|
_sst->_components->filter = utils::i_filter::get_filter(estimated_partitions, max_false_pos_prob, utils::filter_format::m_format);
|
|
_sst->_total_reclaimable_memory.reset();
|
|
}
|
|
|
|
void write_filter() {
|
|
_sst->_recognized_components.insert(component_type::Filter);
|
|
_sst->write_filter();
|
|
}
|
|
|
|
size_t total_reclaimable_memory_size() const {
|
|
return _sst->total_reclaimable_memory_size();
|
|
}
|
|
|
|
size_t reclaim_memory_from_components() {
|
|
return _sst->reclaim_memory_from_components();
|
|
}
|
|
|
|
void reload_reclaimed_components() {
|
|
_sst->reload_reclaimed_components().get();
|
|
}
|
|
|
|
const utils::filter_ptr& get_filter() const {
|
|
return _sst->_components->filter;
|
|
}
|
|
|
|
void set_digest(std::optional<uint32_t> digest) {
|
|
_sst->_components->digest = digest;
|
|
}
|
|
};
|
|
|
|
inline auto replacer_fn_no_op() {
|
|
return [](compaction::compaction_completion_desc desc) -> void {};
|
|
}
|
|
|
|
template<typename AsyncAction>
|
|
requires requires (AsyncAction aa, sstables::sstable::version_types& c) { { aa(c) } -> std::same_as<future<>>; }
|
|
inline
|
|
future<> for_each_sstable_version(AsyncAction action) {
|
|
return seastar::do_for_each(all_sstable_versions, std::move(action));
|
|
}
|
|
|
|
} // namespace sstables
|
|
|
|
using can_purge_tombstones = compaction::compaction_manager::can_purge_tombstones;
|
|
future<> run_compaction_task(test_env&, sstables::run_id output_run_id, compaction::compaction_group_view& table_s, noncopyable_function<future<> (compaction::compaction_data&)> job);
|
|
future<compaction::compaction_result> compact_sstables(test_env& env, compaction::compaction_descriptor descriptor, table_for_tests t,
|
|
std::function<shared_sstable()> creator, compaction::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op(),
|
|
can_purge_tombstones can_purge = can_purge_tombstones::yes);
|
|
|
|
shared_sstable make_sstable_easy(test_env& env, mutation_reader rd, sstable_writer_config cfg,
|
|
sstables::generation_type gen, const sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1, db_clock::time_point = db_clock::now());
|
|
shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
|
|
sstables::generation_type gen, const sstable::version_types v = sstables::get_highest_sstable_version(), int estimated_partitions = 1, db_clock::time_point = db_clock::now());
|
|
|
|
|
|
inline shared_sstable make_sstable_easy(test_env& env, mutation_reader rd, sstable_writer_config cfg,
|
|
const sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1) {
|
|
return make_sstable_easy(env, std::move(rd), std::move(cfg), env.new_generation(), version, expected_partition);
|
|
}
|
|
inline shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
|
|
const sstable::version_types version = sstables::get_highest_sstable_version(), int estimated_partitions = 1, db_clock::time_point query_time = db_clock::now()) {
|
|
return make_sstable_easy(env, std::move(mt), std::move(cfg), env.new_generation(), version, estimated_partitions, query_time);
|
|
}
|
|
|
|
lw_shared_ptr<replica::memtable> make_memtable(schema_ptr s, const utils::chunked_vector<mutation>& muts);
|
|
std::vector<replica::memtable*> active_memtables(replica::table& t);
|