/* * Copyright (C) 2019-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "test/lib/scylla_tests_cmdline_options.hh" #include "test/lib/test_services.hh" #include "test/lib/sstable_test_env.hh" #include "test/lib/cql_test_env.hh" #include "test/lib/test_utils.hh" #include "init.hh" #include "db/config.hh" #include "db/object_storage_endpoint_param.hh" #include "db/large_data_handler.hh" #include "db/corrupt_data_handler.hh" #include "dht/i_partitioner.hh" #include "gms/feature_service.hh" #include "repair/row_level.hh" #include "replica/compaction_group.hh" #include "utils/assert.hh" #include "utils/overloaded_functor.hh" #include #include #include #include #include "sstables/generation_type.hh" static const sstring some_keyspace("ks"); static const sstring some_column_family("cf"); class table_for_tests::compaction_group_view : public compaction::compaction_group_view { table_for_tests::data& _data; sstables::sstables_manager& _sstables_manager; std::vector _compacted_undeleted; tombstone_gc_state _tombstone_gc_state; mutable compaction::compaction_backlog_tracker _backlog_tracker; compaction::compaction_strategy_state _compaction_strategy_state; std::string _group_id; seastar::condition_variable _staging_condition; private: replica::table& table() const noexcept { return *_data.cf; } public: explicit compaction_group_view(table_for_tests::data& data, sstables::sstables_manager& sstables_manager) : _data(data) , _sstables_manager(sstables_manager) , _tombstone_gc_state(nullptr) , _backlog_tracker(get_compaction_strategy().make_backlog_tracker()) , _compaction_strategy_state(compaction::compaction_strategy_state::make(get_compaction_strategy())) , _group_id("table_for_tests::compaction_group_view") { } dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); } const schema_ptr& schema() const noexcept override { return table().schema(); } unsigned min_compaction_threshold() const noexcept override { return schema()->min_compaction_threshold(); } bool compaction_enforce_min_threshold() const noexcept override { return true; } future> main_sstable_set() const override { co_return co_await table().try_get_compaction_group_view_with_static_sharding().main_sstable_set(); } future> maintenance_sstable_set() const override { co_return co_await table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set(); } lw_shared_ptr sstable_set_for_tombstone_gc() const override { return table().try_get_compaction_group_with_static_sharding()->main_sstables(); } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return compaction::get_fully_expired_sstables(*this, sstables, query_time); } const std::vector& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted; } compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return table().get_compaction_strategy(); } compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; } reader_permit make_compaction_reader_permit() const override { return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::compaction_group_view", db::no_timeout, {}); } sstables::sstables_manager& get_sstables_manager() noexcept override { return _sstables_manager; } sstables::shared_sstable make_sstable() const override { return table().make_sstable(); } sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sstables_manager.configure_writer(std::move(origin)); } api::timestamp_type min_memtable_timestamp() const override { return table().min_memtable_timestamp(); } api::timestamp_type min_memtable_live_timestamp() const override { return table().min_memtable_live_timestamp(); } api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return table().min_memtable_live_row_marker_timestamp(); } bool memtable_has_key(const dht::decorated_key& key) const override { return false; } future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { return table().try_get_compaction_group_view_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy); } bool is_auto_compaction_disabled_by_user() const noexcept override { return table().is_auto_compaction_disabled_by_user(); } bool tombstone_gc_enabled() const noexcept override { return table().tombstone_gc_enabled(); } const tombstone_gc_state& get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; } compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } const std::string get_group_id() const noexcept override { return _group_id; } seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_condition; } dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return table().get_token_range_after_split(t); } int64_t get_sstables_repaired_at() const noexcept override { return 0; } }; table_for_tests::data::data() { } table_for_tests::data::~data() {} schema_ptr table_for_tests::make_default_schema() { return schema_builder(some_keyspace, some_column_family) .with_column(utf8_type->decompose("p1"), utf8_type, column_kind::partition_key) .build(); } table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction::compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage) : _data(make_lw_shared()) { cfg.cf_stats = &_data->cf_stats; _data->s = s ? s : make_default_schema(); _data->cf = make_lw_shared(_data->s, std::move(cfg), make_lw_shared(storage), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr); _data->cf->mark_ready_for_writes(nullptr); _data->table_s = std::make_unique(*_data, sstables_manager); cm.add(*_data->table_s); _data->storage = std::move(storage); } compaction::compaction_group_view& table_for_tests::as_compaction_group_view() noexcept { return *_data->table_s; } future<> table_for_tests::stop() { auto data = _data; co_await data->cf->get_compaction_manager().remove(*data->table_s); co_await data->cf->stop(); } void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept { _data->cf->set_tombstone_gc_enabled(tombstone_gc_enabled); } void table_for_tests::set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier) { _data->cf->for_each_compaction_group([&] (replica::compaction_group& cg) { cg.set_repair_sstable_classifier(repair_sstable_classifier); }); } namespace sstables { std::vector make_storage_options_config(const data_dictionary::storage_options& so) { std::vector endpoints; std::visit(overloaded_functor { [] (const data_dictionary::storage_options::local& loc) mutable -> void { }, [&endpoints] (const data_dictionary::storage_options::object_storage& os) mutable -> void { if (os.type == data_dictionary::storage_options::S3_NAME) { endpoints.emplace_back(os.endpoint, s3::endpoint_config { .port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")), .use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr, .region = tests::getenv_or_default("AWS_DEFAULT_REGION", "local"), }); } if (os.type == data_dictionary::storage_options::GS_NAME) { endpoints.emplace_back(db::object_storage_endpoint_param::gs_storage{ .endpoint = os.endpoint, .credentials_file = tests::getenv_or_default("GS_CREDENTIALS_FILE", "none") }); } } }, so.value); return endpoints; } std::unique_ptr make_db_config(sstring temp_dir, const data_dictionary::storage_options so) { auto cfg = std::make_unique(); cfg->data_file_directories.set({ temp_dir }); cfg->object_storage_endpoints(make_storage_options_config(so)); return cfg; } struct test_env::impl { std::optional local_dir; tmpdir& dir; std::unique_ptr db_config; directory_semaphore dir_sem; ::cache_tracker cache_tracker; gms::feature_service feature_service; db::nop_large_data_handler nop_ld_handler; db::nop_corrupt_data_handler nop_cd_handler; sstable_compressor_factory& scf; test_env_sstables_manager mgr; std::unique_ptr cmgr; reader_concurrency_semaphore semaphore; sstables::sstable_generation_generator gen; data_dictionary::storage_options storage; abort_source abort; impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir); impl(impl&&) = delete; impl(const impl&) = delete; sstables::generation_type new_generation() noexcept { return gen(); } }; test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir) : local_dir(tdir == nullptr ? std::optional(std::in_place) : std::optional(std::nullopt)) , dir(tdir == nullptr ? local_dir.value() : *tdir) , db_config(make_db_config(dir.path().native(), cfg.storage)) , dir_sem(1) , feature_service({get_disabled_features_from_db_config(*db_config)}) , nop_cd_handler(db::corrupt_data_handler::register_metrics::no) , scf(scfarg) , mgr( "test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, cfg.corrupt_data_handler == nullptr ? nop_cd_handler : *cfg.corrupt_data_handler, sstables::sstables_manager::config{ .available_memory = cfg.available_memory, .enable_sstable_key_validation = db_config->enable_sstable_key_validation(), .memory_reclaim_threshold = db_config->components_memory_reclaim_threshold, .data_file_directories = db_config->data_file_directories(), .format = db_config->sstable_format, }, feature_service, cache_tracker, dir_sem, [host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, {}, // extensions current_scheduling_group(), sstm) , semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no) , storage(std::move(cfg.storage)) { } test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp) : _impl(std::make_unique(std::move(cfg), scf, sstm, tmp)) { } test_env::test_env(test_env&&) noexcept = default; test_env::~test_env() = default; void test_env::maybe_start_compaction_manager(bool enable) { if (!_impl->cmgr) { _impl->cmgr = std::make_unique(); if (enable) { _impl->cmgr->get_compaction_manager().enable(); } } } future<> test_env::stop() { if (_impl->cmgr) { if (_impl->cmgr->get_compaction_manager().is_running()) { co_await _impl->cmgr->get_compaction_manager().stop(); } else { co_await _impl->cmgr->get_compaction_manager().get_task_manager_module().stop(); } } co_await _impl->mgr.close(); co_await _impl->semaphore.stop(); } class mock_sstables_registry : public sstables::sstables_registry { struct entry { sstring status; sstables::sstable_state state; sstables::entry_descriptor desc; }; std::map, entry> _entries; public: virtual future<> create_entry(table_id owner, sstring status, sstable_state state, sstables::entry_descriptor desc) override { _entries.emplace(std::make_pair(owner, desc.generation), entry { status, state, desc }); co_return; }; virtual future<> update_entry_status(table_id owner, sstables::generation_type gen, sstring status) override { auto it = _entries.find(std::make_pair(owner, gen)); if (it != _entries.end()) { it->second.status = status; } else { throw std::runtime_error("update_entry_status: not found"); } co_return; } virtual future<> update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) override { auto it = _entries.find(std::make_pair(owner, gen)); if (it != _entries.end()) { it->second.state = state; } else { throw std::runtime_error("update_entry_state: not found"); } co_return; } virtual future<> delete_entry(table_id owner, sstables::generation_type gen) override { auto it = _entries.find(std::make_pair(owner, gen)); if (it != _entries.end()) { _entries.erase(it); } else { throw std::runtime_error("delete_entry: not found"); } co_return; } virtual future<> sstables_registry_list(table_id owner, entry_consumer consumer) override { for (auto& [loc_and_gen, e] : _entries) { if (loc_and_gen.first == owner) { co_await consumer(e.status, e.state, e.desc); } } } }; future<> test_env::do_with_async(noncopyable_function func, test_env_config cfg) { if (!cfg.storage.is_local_type()) { auto db_cfg = make_shared(); db_cfg->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS}); db_cfg->object_storage_endpoints(make_storage_options_config(cfg.storage)); return seastar::async([func = std::move(func), cfg = std::move(cfg), db_cfg = std::move(db_cfg)] () mutable { sharded sstm; sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get(); auto stop_sstm = defer([&] { sstm.stop().get(); }); auto scf = make_sstable_compressor_factory_for_tests_in_thread(); test_env env(std::move(cfg), *scf, &sstm.local()); auto close_env = defer([&] { env.stop().get(); }); env.manager().plug_sstables_registry(std::make_unique()); auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); }); func(env); }); } return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable { auto scf = make_sstable_compressor_factory_for_tests_in_thread(); test_env env(std::move(cfg), *scf); auto close_env = defer([&] { env.stop().get(); }); func(env); }); } sstables::generation_type test_env::new_generation() noexcept { return _impl->new_generation(); } shared_sstable test_env::make_sstable(schema_ptr schema, sstring dir, sstables::generation_type generation, sstable::version_types v, sstable::format_types f, size_t buffer_size, db_clock::time_point now) { // FIXME -- most of the callers work with _impl->dir's path, so // test_env can initialize the .dir/.prefix only once, when constructed auto storage = _impl->storage; std::visit(overloaded_functor { [&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; }, [&schema] (data_dictionary::storage_options::object_storage& o) { o.location = schema->id(); }, }, storage.value); return _impl->mgr.make_sstable(std::move(schema), storage, generation, sstables::sstable_state::normal, v, f, now, default_io_error_handler_gen(), buffer_size); } shared_sstable test_env::make_sstable(schema_ptr schema, sstring dir, sstable::version_types v) { return make_sstable(std::move(schema), std::move(dir), new_generation(), std::move(v)); } shared_sstable test_env::make_sstable(schema_ptr schema, sstables::generation_type generation, sstable::version_types v, sstable::format_types f, size_t buffer_size, db_clock::time_point now) { return make_sstable(std::move(schema), _impl->dir.path().native(), generation, std::move(v), std::move(f), buffer_size, now); } shared_sstable test_env::make_sstable(schema_ptr schema, sstable::version_types v) { return make_sstable(std::move(schema), _impl->dir.path().native(), std::move(v)); } std::function test_env::make_sst_factory(schema_ptr s) { return [this, s = std::move(s)] { return make_sstable(s, new_generation()); }; } std::function test_env::make_sst_factory(schema_ptr s, sstable::version_types version) { return [this, s = std::move(s), version] { return make_sstable(s, new_generation(), version); }; } future test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation, sstable::version_types version, sstable::format_types f, sstable_open_config cfg) { auto sst = make_sstable(std::move(schema), dir, generation, version, f); return sst->load(sst->get_schema()->get_sharder(), cfg).then([sst = std::move(sst)] { return make_ready_future(std::move(sst)); }); } future test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value, sstable::version_types version, sstable::format_types f) { return reusable_sst(std::move(schema), std::move(dir), sstables::generation_type(gen_value), version, f); } future test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation, sstable::version_types version, sstable::format_types f) { return reusable_sst(std::move(schema), _impl->dir.path().native(), std::move(generation), std::move(version), std::move(f)); } future test_env::reusable_sst(schema_ptr schema, shared_sstable sst) { return reusable_sst(std::move(schema), sst->get_storage().prefix(), sst->generation(), sst->get_version()); } future test_env::reusable_sst(shared_sstable sst) { return reusable_sst(sst->get_schema(), sst); } future test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation) { return reusable_sst(std::move(schema), _impl->dir.path().native(), generation); } test_env_sstables_manager& test_env::manager() { return _impl->mgr; } test_env_compaction_manager& test_env::test_compaction_manager() { return *_impl->cmgr; } reader_concurrency_semaphore& test_env::semaphore() { return _impl->semaphore; } db::config& test_env::db_config() { return *_impl->db_config; } tmpdir& test_env::tempdir() noexcept { return _impl->dir; } data_dictionary::storage_options test_env::get_storage_options() const noexcept { return _impl->storage; } reader_permit test_env::make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) { return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {}); } reader_permit test_env::make_reader_permit(db::timeout_clock::time_point timeout) { return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {}); } replica::table::config test_env::make_table_config() { return replica::table::config{.compaction_concurrency_semaphore = &_impl->semaphore}; } future<> test_env::do_with_sharded_async(noncopyable_function&)> func) { return seastar::async([func = std::move(func)] { tmpdir tdir; sharded env; auto scf = make_sstable_compressor_factory_for_tests_in_thread(); env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get(); auto stop = defer([&] { env.stop().get(); }); func(env); }); } table_for_tests test_env::make_table_for_tests(schema_ptr s, sstring dir) { maybe_start_compaction_manager(); auto cfg = make_table_config(); cfg.enable_commitlog = false; auto storage = _impl->storage; std::visit(overloaded_functor { [&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; }, [&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); }, }, storage.value); return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), std::move(storage)); } table_for_tests test_env::make_table_for_tests(schema_ptr s) { return make_table_for_tests(std::move(s), _impl->dir.path().native()); } sstables::sstable_set test_env::make_sstable_set(compaction::compaction_strategy& cs, schema_ptr s) { auto t = make_table_for_tests(s); auto close_t = deferred_stop(t); return cs.make_sstable_set(t.as_compaction_group_view()); } void test_env::request_abort() { _impl->abort.request_abort(); } data_dictionary::storage_options make_test_object_storage_options(std::string_view type) { data_dictionary::storage_options ret; auto t = std::string(type); ret.value = data_dictionary::storage_options::object_storage { .bucket = tests::getenv_safe(t + "_BUCKET_FOR_TEST"), .endpoint = tests::getenv_safe(t + "_SERVER_ADDRESS_FOR_TEST"), .type = t }; return ret; } static sstring toc_filename(const sstring& dir, schema_ptr schema, sstables::generation_type generation, sstable_version_types v) { return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation, sstable_format_types::big, component_type::TOC); } future test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation) { for (auto v : std::views::reverse(all_sstable_versions)) { if (co_await file_exists(toc_filename(dir, schema, generation, v))) { co_return co_await reusable_sst(schema, dir, generation, v); } } throw sst_not_found(dir, generation); } void test_env_compaction_manager::propagate_replacement(compaction::compaction_group_view& table_s, const std::vector& removed, const std::vector& added) { _cm.propagate_replacement(table_s, removed, added); } // Test version of compaction_manager::perform_compaction<>() future<> test_env_compaction_manager::perform_compaction(shared_ptr task) { _cm._tasks.push_back(*task); auto unregister_task = defer([task] { if (!task->is_linked()) { testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid); } task->unlink(); task->switch_state(compaction::compaction_task_executor::state::none); }); co_await task->run_compaction(); } } static std::pair rebuild_arg_list_without(int argc, char** argv, const char* filter_out, bool exclude_positional_arg = false) { int new_argc = 0; char** new_argv = (char**) malloc(argc * sizeof(char*)); std::memset(new_argv, 0, argc * sizeof(char*)); bool exclude_next_arg = false; for (auto i = 0; i < argc; i++) { if (std::exchange(exclude_next_arg, false)) { continue; } if (strcmp(argv[i], filter_out) == 0) { // if arg filtered out has positional arg, that has to be excluded too. exclude_next_arg = exclude_positional_arg; continue; } new_argv[new_argc] = (char*) malloc(strlen(argv[i]) + 1); std::strcpy(new_argv[new_argc], argv[i]); new_argc++; } return std::make_pair(new_argc, new_argv); } static void free_arg_list(int argc, char** argv) { for (auto i = 0; i < argc; i++) { if (argv[i]) { free(argv[i]); } } free(argv); } scylla_tests_cmdline_options_processor::~scylla_tests_cmdline_options_processor() { if (_new_argv) { free_arg_list(_new_argc, _new_argv); } } std::pair scylla_tests_cmdline_options_processor::process_cmdline_options(int argc, char** argv) { namespace po = boost::program_options; // Removes -- (intended to separate boost suite args from seastar ones) which confuses boost::program_options. auto [new_argc, new_argv] = rebuild_arg_list_without(argc, argv, "--"); auto _ = defer([argc = new_argc, argv = new_argv] { free_arg_list(argc, argv); }); po::options_description desc("Scylla tests additional options"); desc.add_options() ("help", "Produces help message") ("x-log2-compaction-groups", po::value()->default_value(0), "Controls static number of compaction groups per table per shard. For X groups, set the option to log (base 2) of X. Example: Value of 3 implies 8 groups."); po::variables_map vm; po::parsed_options parsed = po::command_line_parser(new_argc, new_argv). options(desc). allow_unregistered(). run(); po::store(parsed, vm); po::notify(vm); if (vm.count("help")) { std::cout << desc << std::endl; return std::make_pair(argc, argv); } unsigned x_log2_compaction_groups = vm["x-log2-compaction-groups"].as(); if (x_log2_compaction_groups) { std::cout << "Setting x_log2_compaction_groups to " << x_log2_compaction_groups << std::endl; // TODO: perhaps we can later map it into initial_tablets. auto [_new_argc, _new_argv] = rebuild_arg_list_without(argc, argv, "--x-log2-compaction-groups", true); return std::make_pair(_new_argc, _new_argv); } return std::make_pair(argc, argv); }