/* * Copyright (C) 2017-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include "sstables/sstables.hh" #include "sstables/shared_sstable.hh" #include "sstables/index_reader.hh" #include "sstables/writer.hh" #include "compaction/compaction_manager.hh" #include "replica/memtable-sstable.hh" #include "test/lib/test_services.hh" #include "test/lib/sstable_test_env.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "db_clock.hh" #include using namespace sstables; using namespace std::chrono_literals; using validate = bool_class; // Must be called in a seastar thread. sstables::shared_sstable make_sstable_containing(std::function sst_factory, lw_shared_ptr mt); sstables::shared_sstable make_sstable_containing(sstables::shared_sstable sst, lw_shared_ptr mt); sstables::shared_sstable make_sstable_containing(std::function sst_factory, utils::chunked_vector muts, validate do_validate = validate::yes); sstables::shared_sstable make_sstable_containing(sstables::shared_sstable sst, utils::chunked_vector muts, validate do_validate = validate::yes); namespace sstables { using sstable_ptr = shared_sstable; class test { protected: sstable_ptr _sst; public: test(sstable_ptr s) : _sst(s) {} summary& _summary() { return _sst->_components->summary; } std::unique_ptr make_index_reader(reader_permit permit) { return std::make_unique(_sst, std::move(permit)); } struct index_entry { sstables::key sstables_key; partition_key key; uint64_t promoted_index_size; key_view get_key() const { return sstables_key; } }; future> read_indexes(reader_permit permit) { std::vector entries; auto s = _sst->get_schema(); auto ir = make_index_reader(std::move(permit)); std::exception_ptr err = nullptr; try { while (!ir->eof()) { co_await ir->read_partition_data(); // In general the index might not be able to return the key, // but this helper is only used for tests of indexes which are able to do that. auto pk = ir->get_partition_key().value(); entries.emplace_back(index_entry{sstables::key::from_partition_key(*s, pk), pk, ir->get_promoted_index_size()}); co_await ir->advance_to_next_partition(); } } catch (...) { err = std::current_exception(); } co_await ir->close(); if (err) { co_return coroutine::exception(std::move(err)); } co_return entries; } future<> read_statistics() { return _sst->read_statistics(); } future<> read_summary() noexcept { return _sst->read_summary(); } future<> read_toc() { return _sst->read_toc(); } future<> read_filter() { return _sst->read_filter(); } future read_summary_entry(size_t i) { return _sst->read_summary_entry(i); } auto& get_components() { return _sst->_recognized_components; } void set_data_file_size(uint64_t size) { _sst->_data_file_size = size; } void set_data_file_write_time(db_clock::time_point wtime) { _sst->_data_file_write_time = wtime; } void set_run_identifier(sstables::run_id identifier) { _sst->_run_identifier = identifier; } future store(sstring dir, sstables::generation_type generation) { _sst->_generation = generation; co_await _sst->_storage->change_dir_for_test(dir); _sst->_recognized_components.erase(component_type::Index); _sst->_recognized_components.erase(component_type::Data); co_await seastar::async([sst = _sst] { sst->open_sstable("test"); sst->write_statistics(); sst->write_compression(); sst->write_filter(); sst->write_summary(); sst->seal_sstable(false).get(); }); co_return _sst; } // Used to create synthetic sstables for testing leveled compaction strategy. void set_values_for_leveled_strategy(uint64_t fake_data_size, uint32_t sstable_level, int64_t max_timestamp, const partition_key& first_key, const partition_key& last_key) { // Create a synthetic stats metadata stats_metadata stats = {}; // leveled strategy sorts sstables by age using max_timestamp, let's set it to 0. stats.max_timestamp = max_timestamp; stats.sstable_level = sstable_level; set_values(first_key, last_key, std::move(stats), fake_data_size); } void set_values(const partition_key& first_key, const partition_key& last_key, stats_metadata stats, uint64_t data_file_size = 1) { _sst->_data_file_size = data_file_size; _sst->_index_file_size = std::max(1UL, uint64_t(data_file_size * 0.1)); _sst->_metadata_size_on_disk = std::max(1UL, uint64_t(data_file_size * 0.01)); // scylla component must be present for a sstable to be considered fully expired. _sst->_recognized_components.insert(component_type::Scylla); _sst->_components->statistics.contents[metadata_type::Stats] = std::make_unique(std::move(stats)); _sst->_first = dht::decorate_key(*_sst->_schema, first_key); _sst->_last = dht::decorate_key(*_sst->_schema, last_key); _sst->_components->statistics.contents[metadata_type::Compaction] = std::make_unique(); _sst->_run_identifier = run_id::create_random_id(); _sst->_shards.push_back(this_shard_id()); } void set_first_and_last_keys(const dht::decorated_key& first_key, const dht::decorated_key& last_key) { _sst->_first = first_key; _sst->_last = last_key; } void rewrite_toc_without_component(component_type component) { SCYLLA_ASSERT(component != component_type::TOC); _sst->_recognized_components.erase(component); remove_file(fmt::to_string(_sst->toc_filename())).get(); _sst->_storage->open(*_sst); _sst->seal_sstable(false).get(); } future<> remove_component(component_type c) { return remove_file(fmt::to_string(_sst->filename(c))); } fs::path filename(component_type c) const { return fs::path(fmt::to_string(_sst->filename(c))); } void set_shards(std::vector shards) { _sst->_shards = std::move(shards); } static future<> create_links(const sstable& sst, const sstring& dir) { return sst._storage->create_links(sst, std::filesystem::path(dir)); } future<> move_to_new_dir(sstring new_dir, generation_type new_generation) { co_await _sst->_storage->move(*_sst, std::move(new_dir), new_generation, nullptr); _sst->_generation = std::move(new_generation); } void create_bloom_filter(uint64_t estimated_partitions, double max_false_pos_prob = 0.1) { _sst->_components->filter = utils::i_filter::get_filter(estimated_partitions, max_false_pos_prob, utils::filter_format::m_format); _sst->_total_reclaimable_memory.reset(); } void write_filter() { _sst->_recognized_components.insert(component_type::Filter); _sst->write_filter(); } size_t total_reclaimable_memory_size() const { return _sst->total_reclaimable_memory_size(); } size_t reclaim_memory_from_components() { return _sst->reclaim_memory_from_components(); } void reload_reclaimed_components() { _sst->reload_reclaimed_components().get(); } const utils::filter_ptr& get_filter() const { return _sst->_components->filter; } void set_digest(std::optional digest) { _sst->_components->digest = digest; } }; inline auto replacer_fn_no_op() { return [](compaction::compaction_completion_desc desc) -> void {}; } template requires requires (AsyncAction aa, sstables::sstable::version_types& c) { { aa(c) } -> std::same_as>; } inline future<> for_each_sstable_version(AsyncAction action) { return seastar::do_for_each(all_sstable_versions, std::move(action)); } } // namespace sstables using can_purge_tombstones = compaction::compaction_manager::can_purge_tombstones; future<> run_compaction_task(test_env&, sstables::run_id output_run_id, compaction::compaction_group_view& table_s, noncopyable_function (compaction::compaction_data&)> job); future compact_sstables(test_env& env, compaction::compaction_descriptor descriptor, table_for_tests t, std::function creator, compaction::compaction_sstable_replacer_fn replacer = sstables::replacer_fn_no_op(), can_purge_tombstones can_purge = can_purge_tombstones::yes); shared_sstable make_sstable_easy(test_env& env, mutation_reader rd, sstable_writer_config cfg, sstables::generation_type gen, const sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1, db_clock::time_point = db_clock::now()); shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr mt, sstable_writer_config cfg, sstables::generation_type gen, const sstable::version_types v = sstables::get_highest_sstable_version(), int estimated_partitions = 1, db_clock::time_point = db_clock::now()); inline shared_sstable make_sstable_easy(test_env& env, mutation_reader rd, sstable_writer_config cfg, const sstable::version_types version = sstables::get_highest_sstable_version(), int expected_partition = 1) { return make_sstable_easy(env, std::move(rd), std::move(cfg), env.new_generation(), version, expected_partition); } inline shared_sstable make_sstable_easy(test_env& env, lw_shared_ptr mt, sstable_writer_config cfg, const sstable::version_types version = sstables::get_highest_sstable_version(), int estimated_partitions = 1, db_clock::time_point query_time = db_clock::now()) { return make_sstable_easy(env, std::move(mt), std::move(cfg), env.new_generation(), version, estimated_partitions, query_time); } lw_shared_ptr make_memtable(schema_ptr s, const utils::chunked_vector& muts); std::vector active_memtables(replica::table& t);