/* * Copyright (C) 2019-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include #include #include "data_dictionary/storage_options.hh" #include "db/large_data_handler.hh" #include "db/corrupt_data_handler.hh" #include "sstables/version.hh" #include "sstables/sstable_directory.hh" #include "compaction/compaction_manager.hh" #include "test/lib/tmpdir.hh" #include "test/lib/test_services.hh" #include "test/lib/log.hh" namespace compaction { class compaction_group_view; class compaction_task_executor; } namespace sstables { class test_env_sstables_manager : public sstables_manager { using sstables_manager::sstables_manager; std::optional _promoted_index_block_size; bool _correct_pi_block_width = true; public: virtual sstable_writer_config configure_writer(sstring origin = "test") const override { auto ret = sstables_manager::configure_writer(std::move(origin)); if (_promoted_index_block_size) { ret.promoted_index_block_size = *_promoted_index_block_size; } ret.correct_pi_block_width = _correct_pi_block_width; return ret; } void set_promoted_index_block_size(size_t promoted_index_block_size) { _promoted_index_block_size = promoted_index_block_size; } void set_correct_pi_block_width(bool value) { _correct_pi_block_width = value; } void increment_total_reclaimable_memory_and_maybe_reclaim(sstable *sst) { sstables_manager::increment_total_reclaimable_memory(sst); } size_t get_total_memory_reclaimed() { return _total_memory_reclaimed; } size_t get_total_reclaimable_memory() { return _total_reclaimable_memory; } void remove_sst_from_reclaimed(sstable* sst) { _reclaimed.erase(*sst); } auto& get_active_list() { return _active; } auto& get_reclaimed_set() { return _reclaimed; } }; class test_env_compaction_manager { tasks::task_manager _tm; compaction::compaction_manager _cm; public: test_env_compaction_manager() : _cm(_tm, compaction::compaction_manager::for_testing_tag{}) {} compaction::compaction_manager& get_compaction_manager() { return _cm; } void propagate_replacement(compaction::compaction_group_view& table_s, const std::vector& removed, const std::vector& added); future<> perform_compaction(shared_ptr task); }; struct test_env_config { db::large_data_handler* large_data_handler = nullptr; db::corrupt_data_handler* corrupt_data_handler = nullptr; data_dictionary::storage_options storage; // will be local by default size_t available_memory = memory::stats().total_memory(); }; data_dictionary::storage_options make_test_object_storage_options(std::string_view type); class test_env { struct impl; std::unique_ptr _impl; public: void maybe_start_compaction_manager(bool enable = true); explicit test_env(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm = nullptr, tmpdir* tmp = nullptr); ~test_env(); test_env(test_env&&) noexcept; future<> stop(); sstables::generation_type new_generation() noexcept; shared_sstable make_sstable(schema_ptr schema, sstring dir, sstables::generation_type generation, sstable::version_types v = sstables::get_highest_sstable_version(), sstable::format_types f = sstable::format_types::big, size_t buffer_size = default_sstable_buffer_size, db_clock::time_point now = db_clock::now()); shared_sstable make_sstable(schema_ptr schema, sstring dir, sstable::version_types v = sstables::get_highest_sstable_version()); shared_sstable make_sstable(schema_ptr schema, sstables::generation_type generation, sstable::version_types v = sstables::get_highest_sstable_version(), sstable::format_types f = sstable::format_types::big, size_t buffer_size = default_sstable_buffer_size, db_clock::time_point now = db_clock::now()); shared_sstable make_sstable(schema_ptr schema, sstable::version_types v = sstables::get_highest_sstable_version()); std::function make_sst_factory(schema_ptr s); std::function make_sst_factory(schema_ptr s, sstable::version_types version); struct sst_not_found : public std::runtime_error { sst_not_found(const sstring& dir, sstables::generation_type generation) : std::runtime_error(format("no versions of sstable generation {} found in {}", generation, dir)) {} }; // reusable_sst() opens the requested sstable for reading only (sstables are // immutable, so an existing sstable cannot be opened for writing). // It returns a future because opening requires reading from disk, and // therefore may block. The future value is a shared sstable - a reference- // counting pointer to an sstable - allowing for the returned handle to // be passed around until no longer needed. future reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation, sstable::version_types version, sstable::format_types f = sstable::format_types::big, sstable_open_config cfg = { .load_first_and_last_position_metadata = true }); future reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value, sstable::version_types version, sstable::format_types f = sstable::format_types::big); future reusable_sst(schema_ptr schema, sstables::generation_type generation, sstable::version_types version, sstable::format_types f = sstable::format_types::big); future reusable_sst(schema_ptr schema, shared_sstable sst); future reusable_sst(shared_sstable sst); // looks up the sstable in the given dir future reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation = sstables::generation_type{1}); future reusable_sst(schema_ptr schema, sstables::generation_type generation); test_env_sstables_manager& manager(); test_env_compaction_manager& test_compaction_manager(); reader_concurrency_semaphore& semaphore(); db::config& db_config(); tmpdir& tempdir() noexcept; data_dictionary::storage_options get_storage_options() const noexcept; reader_permit make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout); reader_permit make_reader_permit(db::timeout_clock::time_point timeout = db::no_timeout); replica::table::config make_table_config(); static future<> do_with_async(noncopyable_function func, test_env_config cfg = {}); static future<> do_with_sharded_async(noncopyable_function&)> func); template static future do_with_async_returning(noncopyable_function func) { return seastar::async([func = std::move(func)] { auto scf = make_sstable_compressor_factory_for_tests_in_thread(); test_env env({}, *scf); auto stop = defer([&] { env.stop().get(); }); return func(env); }); } table_for_tests make_table_for_tests(schema_ptr s, sstring dir); table_for_tests make_table_for_tests(schema_ptr s = nullptr); // Must run in a thread. sstables::sstable_set make_sstable_set(compaction::compaction_strategy& cs, schema_ptr s); void request_abort(); }; } // namespace sstables