This compaction group testing is useless because the machinery for it to work was removed. This was useful in the early tablet days, where we wanted to test compaction groups directly. Today groups are stressed and tested on every tablet test. I see a ~40% reduction time after this patch, since database_test is one of the most (if not the most) time consuming in boost suite. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Closes scylladb/scylladb#28324
658 lines
26 KiB
C++
658 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "test/lib/scylla_tests_cmdline_options.hh"
|
|
#include "test/lib/test_services.hh"
|
|
#include "test/lib/sstable_test_env.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "init.hh"
|
|
#include "db/config.hh"
|
|
#include "db/object_storage_endpoint_param.hh"
|
|
#include "db/large_data_handler.hh"
|
|
#include "db/corrupt_data_handler.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "repair/row_level.hh"
|
|
#include "replica/compaction_group.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
#include <boost/program_options.hpp>
|
|
#include <iostream>
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/util/defer.hh>
|
|
#include "sstables/generation_type.hh"
|
|
|
|
static const sstring some_keyspace("ks");
|
|
static const sstring some_column_family("cf");
|
|
|
|
class table_for_tests::compaction_group_view : public compaction::compaction_group_view {
|
|
table_for_tests::data& _data;
|
|
sstables::sstables_manager& _sstables_manager;
|
|
std::vector<sstables::shared_sstable> _compacted_undeleted;
|
|
tombstone_gc_state _tombstone_gc_state;
|
|
mutable compaction::compaction_backlog_tracker _backlog_tracker;
|
|
compaction::compaction_strategy_state _compaction_strategy_state;
|
|
std::string _group_id;
|
|
seastar::condition_variable _staging_condition;
|
|
private:
|
|
replica::table& table() const noexcept {
|
|
return *_data.cf;
|
|
}
|
|
public:
|
|
explicit compaction_group_view(table_for_tests::data& data, sstables::sstables_manager& sstables_manager)
|
|
: _data(data)
|
|
, _sstables_manager(sstables_manager)
|
|
, _tombstone_gc_state(nullptr)
|
|
, _backlog_tracker(get_compaction_strategy().make_backlog_tracker())
|
|
, _compaction_strategy_state(compaction::compaction_strategy_state::make(get_compaction_strategy()))
|
|
, _group_id("table_for_tests::compaction_group_view")
|
|
{
|
|
}
|
|
dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); }
|
|
const schema_ptr& schema() const noexcept override {
|
|
return table().schema();
|
|
}
|
|
unsigned min_compaction_threshold() const noexcept override {
|
|
return schema()->min_compaction_threshold();
|
|
}
|
|
bool compaction_enforce_min_threshold() const noexcept override {
|
|
return true;
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
|
co_return co_await table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
|
co_return co_await table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
|
|
}
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
|
return table().try_get_compaction_group_with_static_sharding()->main_sstables();
|
|
}
|
|
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
|
return compaction::get_fully_expired_sstables(*this, sstables, query_time);
|
|
}
|
|
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
|
|
return _compacted_undeleted;
|
|
}
|
|
compaction::compaction_strategy& get_compaction_strategy() const noexcept override {
|
|
return table().get_compaction_strategy();
|
|
}
|
|
compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override {
|
|
return _compaction_strategy_state;
|
|
}
|
|
reader_permit make_compaction_reader_permit() const override {
|
|
return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::compaction_group_view", db::no_timeout, {});
|
|
}
|
|
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
|
return _sstables_manager;
|
|
}
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state) const override {
|
|
return table().make_sstable();
|
|
}
|
|
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
|
return _sstables_manager.configure_writer(std::move(origin));
|
|
}
|
|
|
|
api::timestamp_type min_memtable_timestamp() const override {
|
|
return table().min_memtable_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_timestamp() const override {
|
|
return table().min_memtable_live_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_row_marker_timestamp() const override {
|
|
return table().min_memtable_live_row_marker_timestamp();
|
|
}
|
|
bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
|
|
future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
|
return table().try_get_compaction_group_view_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy);
|
|
}
|
|
bool is_auto_compaction_disabled_by_user() const noexcept override {
|
|
return table().is_auto_compaction_disabled_by_user();
|
|
}
|
|
bool tombstone_gc_enabled() const noexcept override {
|
|
return table().tombstone_gc_enabled();
|
|
}
|
|
const tombstone_gc_state& get_tombstone_gc_state() const noexcept override {
|
|
return _tombstone_gc_state;
|
|
}
|
|
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
|
|
return _backlog_tracker;
|
|
}
|
|
const std::string get_group_id() const noexcept override {
|
|
return _group_id;
|
|
}
|
|
seastar::condition_variable& get_staging_done_condition() noexcept override {
|
|
return _staging_condition;
|
|
}
|
|
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
|
return table().get_token_range_after_split(t);
|
|
}
|
|
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
|
};
|
|
|
|
table_for_tests::data::data()
|
|
{ }
|
|
|
|
table_for_tests::data::~data() {}
|
|
|
|
schema_ptr table_for_tests::make_default_schema() {
|
|
return schema_builder(some_keyspace, some_column_family)
|
|
.with_column(utf8_type->decompose("p1"), utf8_type, column_kind::partition_key)
|
|
.build();
|
|
}
|
|
|
|
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction::compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage)
|
|
: _data(make_lw_shared<data>())
|
|
{
|
|
cfg.cf_stats = &_data->cf_stats;
|
|
_data->s = s ? s : make_default_schema();
|
|
_data->cf = make_lw_shared<replica::column_family>(_data->s, std::move(cfg), make_lw_shared<replica::storage_options>(storage), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr);
|
|
_data->cf->mark_ready_for_writes(nullptr);
|
|
_data->table_s = std::make_unique<compaction_group_view>(*_data, sstables_manager);
|
|
cm.add(*_data->table_s);
|
|
_data->storage = std::move(storage);
|
|
}
|
|
|
|
compaction::compaction_group_view& table_for_tests::as_compaction_group_view() noexcept {
|
|
return *_data->table_s;
|
|
}
|
|
|
|
future<> table_for_tests::stop() {
|
|
auto data = _data;
|
|
co_await data->cf->get_compaction_manager().remove(*data->table_s);
|
|
co_await data->cf->stop();
|
|
}
|
|
|
|
void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
|
|
_data->cf->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
|
}
|
|
|
|
void table_for_tests::set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier) {
|
|
_data->cf->for_each_compaction_group([&] (replica::compaction_group& cg) {
|
|
cg.set_repair_sstable_classifier(repair_sstable_classifier);
|
|
});
|
|
}
|
|
|
|
namespace sstables {
|
|
|
|
std::vector<db::object_storage_endpoint_param> make_storage_options_config(const data_dictionary::storage_options& so) {
|
|
std::vector<db::object_storage_endpoint_param> endpoints;
|
|
std::visit(overloaded_functor {
|
|
[] (const data_dictionary::storage_options::local& loc) mutable -> void {
|
|
},
|
|
[&endpoints] (const data_dictionary::storage_options::object_storage& os) mutable -> void {
|
|
if (os.type == data_dictionary::storage_options::S3_NAME) {
|
|
endpoints.emplace_back(os.endpoint,
|
|
s3::endpoint_config {
|
|
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
|
|
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
|
|
.region = tests::getenv_or_default("AWS_DEFAULT_REGION", "local"),
|
|
});
|
|
}
|
|
if (os.type == data_dictionary::storage_options::GS_NAME) {
|
|
endpoints.emplace_back(db::object_storage_endpoint_param::gs_storage{
|
|
.endpoint = os.endpoint,
|
|
.credentials_file = tests::getenv_or_default("GS_CREDENTIALS_FILE", "none")
|
|
});
|
|
}
|
|
}
|
|
}, so.value);
|
|
return endpoints;
|
|
}
|
|
|
|
std::unique_ptr<db::config> make_db_config(sstring temp_dir, const data_dictionary::storage_options so) {
|
|
auto cfg = std::make_unique<db::config>();
|
|
cfg->data_file_directories.set({ temp_dir });
|
|
cfg->object_storage_endpoints(make_storage_options_config(so));
|
|
return cfg;
|
|
}
|
|
|
|
struct test_env::impl {
|
|
std::optional<tmpdir> local_dir;
|
|
tmpdir& dir;
|
|
std::unique_ptr<db::config> db_config;
|
|
directory_semaphore dir_sem;
|
|
::cache_tracker cache_tracker;
|
|
gms::feature_service feature_service;
|
|
db::nop_large_data_handler nop_ld_handler;
|
|
db::nop_corrupt_data_handler nop_cd_handler;
|
|
sstable_compressor_factory& scf;
|
|
test_env_sstables_manager mgr;
|
|
std::unique_ptr<test_env_compaction_manager> cmgr;
|
|
reader_concurrency_semaphore semaphore;
|
|
sstables::sstable_generation_generator gen;
|
|
data_dictionary::storage_options storage;
|
|
abort_source abort;
|
|
|
|
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
|
|
impl(impl&&) = delete;
|
|
impl(const impl&) = delete;
|
|
|
|
sstables::generation_type new_generation() noexcept {
|
|
return gen();
|
|
}
|
|
};
|
|
|
|
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
|
|
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
|
|
, dir(tdir == nullptr ? local_dir.value() : *tdir)
|
|
, db_config(make_db_config(dir.path().native(), cfg.storage))
|
|
, dir_sem(1)
|
|
, feature_service({get_disabled_features_from_db_config(*db_config)})
|
|
, nop_cd_handler(db::corrupt_data_handler::register_metrics::no)
|
|
, scf(scfarg)
|
|
, mgr(
|
|
"test_env",
|
|
cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler,
|
|
cfg.corrupt_data_handler == nullptr ? nop_cd_handler : *cfg.corrupt_data_handler,
|
|
sstables::sstables_manager::config{
|
|
.available_memory = cfg.available_memory,
|
|
.enable_sstable_key_validation = db_config->enable_sstable_key_validation(),
|
|
.memory_reclaim_threshold = db_config->components_memory_reclaim_threshold,
|
|
.data_file_directories = db_config->data_file_directories(),
|
|
.format = db_config->sstable_format,
|
|
},
|
|
feature_service,
|
|
cache_tracker,
|
|
dir_sem,
|
|
[host_id = locator::host_id::create_random_id()]{ return host_id; },
|
|
scf,
|
|
abort,
|
|
{}, // extensions
|
|
current_scheduling_group(),
|
|
sstm)
|
|
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
|
|
, storage(std::move(cfg.storage))
|
|
{
|
|
}
|
|
|
|
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
|
|
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
|
|
{
|
|
}
|
|
|
|
test_env::test_env(test_env&&) noexcept = default;
|
|
|
|
test_env::~test_env() = default;
|
|
|
|
void test_env::maybe_start_compaction_manager(bool enable) {
|
|
if (!_impl->cmgr) {
|
|
_impl->cmgr = std::make_unique<test_env_compaction_manager>();
|
|
if (enable) {
|
|
_impl->cmgr->get_compaction_manager().enable();
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> test_env::stop() {
|
|
if (_impl->cmgr) {
|
|
if (_impl->cmgr->get_compaction_manager().is_running()) {
|
|
co_await _impl->cmgr->get_compaction_manager().stop();
|
|
} else {
|
|
co_await _impl->cmgr->get_compaction_manager().get_task_manager_module().stop();
|
|
}
|
|
}
|
|
co_await _impl->mgr.close();
|
|
co_await _impl->semaphore.stop();
|
|
}
|
|
|
|
class mock_sstables_registry : public sstables::sstables_registry {
|
|
struct entry {
|
|
sstring status;
|
|
sstables::sstable_state state;
|
|
sstables::entry_descriptor desc;
|
|
};
|
|
std::map<std::pair<table_id, generation_type>, entry> _entries;
|
|
public:
|
|
virtual future<> create_entry(table_id owner, sstring status, sstable_state state, sstables::entry_descriptor desc) override {
|
|
_entries.emplace(std::make_pair(owner, desc.generation), entry { status, state, desc });
|
|
co_return;
|
|
};
|
|
virtual future<> update_entry_status(table_id owner, sstables::generation_type gen, sstring status) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
it->second.status = status;
|
|
} else {
|
|
throw std::runtime_error("update_entry_status: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
it->second.state = state;
|
|
} else {
|
|
throw std::runtime_error("update_entry_state: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> delete_entry(table_id owner, sstables::generation_type gen) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
_entries.erase(it);
|
|
} else {
|
|
throw std::runtime_error("delete_entry: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> sstables_registry_list(table_id owner, entry_consumer consumer) override {
|
|
for (auto& [loc_and_gen, e] : _entries) {
|
|
if (loc_and_gen.first == owner) {
|
|
co_await consumer(e.status, e.state, e.desc);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg) {
|
|
if (!cfg.storage.is_local_type()) {
|
|
auto db_cfg = make_shared<db::config>();
|
|
db_cfg->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS});
|
|
db_cfg->object_storage_endpoints(make_storage_options_config(cfg.storage));
|
|
return seastar::async([func = std::move(func), cfg = std::move(cfg), db_cfg = std::move(db_cfg)] () mutable {
|
|
sharded<sstables::storage_manager> sstm;
|
|
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
|
|
auto stop_sstm = defer([&] { sstm.stop().get(); });
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
test_env env(std::move(cfg), *scf, &sstm.local());
|
|
auto close_env = defer([&] { env.stop().get(); });
|
|
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
|
|
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
test_env env(std::move(cfg), *scf);
|
|
auto close_env = defer([&] { env.stop().get(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
sstables::generation_type
|
|
test_env::new_generation() noexcept {
|
|
return _impl->new_generation();
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstring dir, sstables::generation_type generation,
|
|
sstable::version_types v, sstable::format_types f,
|
|
size_t buffer_size, db_clock::time_point now) {
|
|
// FIXME -- most of the callers work with _impl->dir's path, so
|
|
// test_env can initialize the .dir/.prefix only once, when constructed
|
|
auto storage = _impl->storage;
|
|
std::visit(overloaded_functor {
|
|
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
|
[&schema] (data_dictionary::storage_options::object_storage& o) { o.location = schema->id(); },
|
|
}, storage.value);
|
|
return _impl->mgr.make_sstable(std::move(schema), storage, generation, sstables::sstable_state::normal, v, f, now, default_io_error_handler_gen(), buffer_size);
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstring dir, sstable::version_types v) {
|
|
return make_sstable(std::move(schema), std::move(dir), new_generation(), std::move(v));
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstables::generation_type generation,
|
|
sstable::version_types v, sstable::format_types f,
|
|
size_t buffer_size, db_clock::time_point now) {
|
|
return make_sstable(std::move(schema), _impl->dir.path().native(), generation, std::move(v), std::move(f), buffer_size, now);
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstable::version_types v) {
|
|
return make_sstable(std::move(schema), _impl->dir.path().native(), std::move(v));
|
|
}
|
|
|
|
std::function<shared_sstable()>
|
|
test_env::make_sst_factory(schema_ptr s) {
|
|
return [this, s = std::move(s)] {
|
|
return make_sstable(s, new_generation());
|
|
};
|
|
}
|
|
|
|
std::function<shared_sstable()>
|
|
test_env::make_sst_factory(schema_ptr s, sstable::version_types version) {
|
|
return [this, s = std::move(s), version] {
|
|
return make_sstable(s, new_generation(), version);
|
|
};
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation,
|
|
sstable::version_types version, sstable::format_types f, sstable_open_config cfg) {
|
|
auto sst = make_sstable(std::move(schema), dir, generation, version, f);
|
|
return sst->load(sst->get_schema()->get_sharder(), cfg).then([sst = std::move(sst)] {
|
|
return make_ready_future<shared_sstable>(std::move(sst));
|
|
});
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value,
|
|
sstable::version_types version, sstable::format_types f) {
|
|
return reusable_sst(std::move(schema), std::move(dir), sstables::generation_type(gen_value), version, f);
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation,
|
|
sstable::version_types version, sstable::format_types f) {
|
|
return reusable_sst(std::move(schema), _impl->dir.path().native(), std::move(generation), std::move(version), std::move(f));
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, shared_sstable sst) {
|
|
return reusable_sst(std::move(schema), sst->get_storage().prefix(), sst->generation(), sst->get_version());
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(shared_sstable sst) {
|
|
return reusable_sst(sst->get_schema(), sst);
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation) {
|
|
return reusable_sst(std::move(schema), _impl->dir.path().native(), generation);
|
|
}
|
|
|
|
test_env_sstables_manager&
|
|
test_env::manager() {
|
|
return _impl->mgr;
|
|
}
|
|
|
|
test_env_compaction_manager&
|
|
test_env::test_compaction_manager() {
|
|
return *_impl->cmgr;
|
|
}
|
|
|
|
reader_concurrency_semaphore&
|
|
test_env::semaphore() {
|
|
return _impl->semaphore;
|
|
}
|
|
|
|
db::config&
|
|
test_env::db_config() {
|
|
return *_impl->db_config;
|
|
}
|
|
|
|
tmpdir&
|
|
test_env::tempdir() noexcept {
|
|
return _impl->dir;
|
|
}
|
|
|
|
data_dictionary::storage_options
|
|
test_env::get_storage_options() const noexcept {
|
|
return _impl->storage;
|
|
}
|
|
|
|
reader_permit
|
|
test_env::make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) {
|
|
return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {});
|
|
}
|
|
|
|
reader_permit
|
|
test_env::make_reader_permit(db::timeout_clock::time_point timeout) {
|
|
return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {});
|
|
}
|
|
|
|
replica::table::config
|
|
test_env::make_table_config() {
|
|
return replica::table::config{.compaction_concurrency_semaphore = &_impl->semaphore};
|
|
}
|
|
|
|
future<>
|
|
test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func) {
|
|
return seastar::async([func = std::move(func)] {
|
|
tmpdir tdir;
|
|
sharded<test_env> env;
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
|
|
auto stop = defer([&] { env.stop().get(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
table_for_tests
|
|
test_env::make_table_for_tests(schema_ptr s, sstring dir) {
|
|
maybe_start_compaction_manager();
|
|
auto cfg = make_table_config();
|
|
cfg.enable_commitlog = false;
|
|
auto storage = _impl->storage;
|
|
std::visit(overloaded_functor {
|
|
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
|
[&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); },
|
|
}, storage.value);
|
|
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), std::move(storage));
|
|
}
|
|
|
|
table_for_tests
|
|
test_env::make_table_for_tests(schema_ptr s) {
|
|
return make_table_for_tests(std::move(s), _impl->dir.path().native());
|
|
}
|
|
|
|
sstables::sstable_set test_env::make_sstable_set(compaction::compaction_strategy& cs, schema_ptr s) {
|
|
auto t = make_table_for_tests(s);
|
|
auto close_t = deferred_stop(t);
|
|
return cs.make_sstable_set(t.as_compaction_group_view());
|
|
}
|
|
|
|
void test_env::request_abort() {
|
|
_impl->abort.request_abort();
|
|
}
|
|
|
|
data_dictionary::storage_options make_test_object_storage_options(std::string_view type) {
|
|
data_dictionary::storage_options ret;
|
|
auto t = std::string(type);
|
|
ret.value = data_dictionary::storage_options::object_storage {
|
|
.bucket = tests::getenv_safe(t + "_BUCKET_FOR_TEST"),
|
|
.endpoint = tests::getenv_safe(t + "_SERVER_ADDRESS_FOR_TEST"),
|
|
.type = t
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
static sstring toc_filename(const sstring& dir, schema_ptr schema, sstables::generation_type generation, sstable_version_types v) {
|
|
return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation,
|
|
sstable_format_types::big, component_type::TOC);
|
|
}
|
|
|
|
future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation) {
|
|
for (auto v : std::views::reverse(all_sstable_versions)) {
|
|
if (co_await file_exists(toc_filename(dir, schema, generation, v))) {
|
|
co_return co_await reusable_sst(schema, dir, generation, v);
|
|
}
|
|
}
|
|
throw sst_not_found(dir, generation);
|
|
}
|
|
|
|
void test_env_compaction_manager::propagate_replacement(compaction::compaction_group_view& table_s, const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
|
_cm.propagate_replacement(table_s, removed, added);
|
|
}
|
|
|
|
// Test version of compaction_manager::perform_compaction<>()
|
|
future<> test_env_compaction_manager::perform_compaction(shared_ptr<compaction::compaction_task_executor> task) {
|
|
_cm._tasks.push_back(*task);
|
|
auto unregister_task = defer([task] {
|
|
if (!task->is_linked()) {
|
|
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid);
|
|
}
|
|
task->unlink();
|
|
task->switch_state(compaction::compaction_task_executor::state::none);
|
|
});
|
|
co_await task->run_compaction();
|
|
}
|
|
|
|
}
|
|
|
|
static std::pair<int, char**> rebuild_arg_list_without(int argc, char** argv, const char* filter_out, bool exclude_positional_arg = false) {
|
|
int new_argc = 0;
|
|
char** new_argv = (char**) malloc(argc * sizeof(char*));
|
|
std::memset(new_argv, 0, argc * sizeof(char*));
|
|
bool exclude_next_arg = false;
|
|
for (auto i = 0; i < argc; i++) {
|
|
if (std::exchange(exclude_next_arg, false)) {
|
|
continue;
|
|
}
|
|
if (strcmp(argv[i], filter_out) == 0) {
|
|
// if arg filtered out has positional arg, that has to be excluded too.
|
|
exclude_next_arg = exclude_positional_arg;
|
|
continue;
|
|
}
|
|
new_argv[new_argc] = (char*) malloc(strlen(argv[i]) + 1);
|
|
std::strcpy(new_argv[new_argc], argv[i]);
|
|
new_argc++;
|
|
}
|
|
return std::make_pair(new_argc, new_argv);
|
|
}
|
|
|
|
static void free_arg_list(int argc, char** argv) {
|
|
for (auto i = 0; i < argc; i++) {
|
|
if (argv[i]) {
|
|
free(argv[i]);
|
|
}
|
|
}
|
|
free(argv);
|
|
}
|
|
|
|
scylla_tests_cmdline_options_processor::~scylla_tests_cmdline_options_processor() {
|
|
if (_new_argv) {
|
|
free_arg_list(_new_argc, _new_argv);
|
|
}
|
|
}
|
|
|
|
std::pair<int, char**> scylla_tests_cmdline_options_processor::process_cmdline_options(int argc, char** argv) {
|
|
namespace po = boost::program_options;
|
|
|
|
// Removes -- (intended to separate boost suite args from seastar ones) which confuses boost::program_options.
|
|
auto [new_argc, new_argv] = rebuild_arg_list_without(argc, argv, "--");
|
|
auto _ = defer([argc = new_argc, argv = new_argv] {
|
|
free_arg_list(argc, argv);
|
|
});
|
|
|
|
po::options_description desc("Scylla tests additional options");
|
|
desc.add_options()
|
|
("help", "Produces help message");
|
|
po::variables_map vm;
|
|
|
|
po::parsed_options parsed = po::command_line_parser(new_argc, new_argv).
|
|
options(desc).
|
|
allow_unregistered().
|
|
run();
|
|
|
|
po::store(parsed, vm);
|
|
po::notify(vm);
|
|
|
|
if (vm.count("help")) {
|
|
std::cout << desc << std::endl;
|
|
return std::make_pair(argc, argv);
|
|
}
|
|
|
|
return std::make_pair(argc, argv);
|
|
}
|