This pull request adds support for calculation and storing CRC32 digests for all SSTable components. This change replaces plain file_writer with crc32_digest_file_writer for all SSTable components that should be checksummed. The resulting component digests are stored in the sstable structure and later persisted to disk as part of the Scylla metadata component during writer::consume_end_of_stream. Several test cases where introduced to verify expected behaviour. Additionally, this PR adds new rewrite component mechanism for safe sstable component rewriting. Previously, rewriting an sstable component (e.g., via rewrite_statistics) created a temporary file that was renamed to the final name after sealing. This allowed crash recovery by simply removing the temporary file on startup. However, with component digests stored in scylla_metadata (#20100), replacing a component like Statistics requires atomically updating both the component and scylla_metadata with the new digest - impossible with POSIX rename. The new mechanism creates a clone sstable with a fresh generation: - Hard-links all components from the source except the component being rewritten and scylla_metadata - Copies original sstable components pointer and recognized components from the source - Invokes a modifier callback to adjust the new sstable before rewriting - Writes the modified component along with updated scylla_metadata containing the new digest - Seals the new sstable with a temporary TOC - Replaces the old sstable atomically, the same way as it is done in compaction This is built on the rewrite_sstables compaction framework to support batch operations (e.g., following incremental repair). In case of any failure durning the whole process, sstable will be automatically deleted on the node startup due to temporary toc persistence. Backport is not required, it is a new feature Fixes https://github.com/scylladb/scylladb/issues/20100, https://github.com/scylladb/scylladb/issues/27453 Closes scylladb/scylladb#28338 * github.com:scylladb/scylladb: docs: document components_digests subcomponent and trailing digest in Scylla.db sstable_compaction_test: Add tests for perform_component_rewrite sstable_test: add verification testcases of SSTable components digests persistance sstables: store digest of all sstable components in scylla metadata sstables: replace rewrite_statistics with new rewrite component mechanism sstables: add new rewrite component mechanism for safe sstable component rewriting compaction: add compaction_group_view method to specify sstable version sstables: add null_data_sink and serialized_checksum for checksum-only calculation sstables: extract default write open flags into a constant sstables: Add write_simple_with_digest for component checksumming sstables: Extract file writer closing logic into separate methods sstables: Implement CRC32 digest-only writer
662 lines
26 KiB
C++
662 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "test/lib/scylla_tests_cmdline_options.hh"
|
|
#include "test/lib/test_services.hh"
|
|
#include "test/lib/sstable_test_env.hh"
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "test/lib/test_utils.hh"
|
|
#include "init.hh"
|
|
#include "db/config.hh"
|
|
#include "db/object_storage_endpoint_param.hh"
|
|
#include "db/large_data_handler.hh"
|
|
#include "db/corrupt_data_handler.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "gms/feature_service.hh"
|
|
#include "repair/row_level.hh"
|
|
#include "replica/compaction_group.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/overloaded_functor.hh"
|
|
#include <boost/program_options.hpp>
|
|
#include <iostream>
|
|
#include <fmt/ranges.h>
|
|
#include <seastar/util/defer.hh>
|
|
#include "sstables/generation_type.hh"
|
|
|
|
static const sstring some_keyspace("ks");
|
|
static const sstring some_column_family("cf");
|
|
|
|
class table_for_tests::compaction_group_view : public compaction::compaction_group_view {
|
|
table_for_tests::data& _data;
|
|
sstables::sstables_manager& _sstables_manager;
|
|
std::vector<sstables::shared_sstable> _compacted_undeleted;
|
|
tombstone_gc_state _tombstone_gc_state;
|
|
mutable compaction::compaction_backlog_tracker _backlog_tracker;
|
|
compaction::compaction_strategy_state _compaction_strategy_state;
|
|
std::string _group_id;
|
|
seastar::condition_variable _staging_condition;
|
|
private:
|
|
replica::table& table() const noexcept {
|
|
return *_data.cf;
|
|
}
|
|
public:
|
|
explicit compaction_group_view(table_for_tests::data& data, sstables::sstables_manager& sstables_manager)
|
|
: _data(data)
|
|
, _sstables_manager(sstables_manager)
|
|
, _tombstone_gc_state(tombstone_gc_state::for_tests())
|
|
, _backlog_tracker(get_compaction_strategy().make_backlog_tracker())
|
|
, _compaction_strategy_state(compaction::compaction_strategy_state::make(get_compaction_strategy()))
|
|
, _group_id("table_for_tests::compaction_group_view")
|
|
{
|
|
}
|
|
dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); }
|
|
const schema_ptr& schema() const noexcept override {
|
|
return table().schema();
|
|
}
|
|
unsigned min_compaction_threshold() const noexcept override {
|
|
return schema()->min_compaction_threshold();
|
|
}
|
|
bool compaction_enforce_min_threshold() const noexcept override {
|
|
return true;
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
|
co_return co_await table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
|
|
}
|
|
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
|
co_return co_await table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
|
|
}
|
|
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
|
return table().try_get_compaction_group_with_static_sharding()->main_sstables();
|
|
}
|
|
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
|
return compaction::get_fully_expired_sstables(*this, sstables, query_time);
|
|
}
|
|
const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override {
|
|
return _compacted_undeleted;
|
|
}
|
|
compaction::compaction_strategy& get_compaction_strategy() const noexcept override {
|
|
return table().get_compaction_strategy();
|
|
}
|
|
compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override {
|
|
return _compaction_strategy_state;
|
|
}
|
|
reader_permit make_compaction_reader_permit() const override {
|
|
return table().compaction_concurrency_semaphore().make_tracking_only_permit(schema(), "table_for_tests::compaction_group_view", db::no_timeout, {});
|
|
}
|
|
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
|
return _sstables_manager;
|
|
}
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state) const override {
|
|
return table().make_sstable();
|
|
}
|
|
sstables::shared_sstable make_sstable(sstables::sstable_state, sstables::sstable_version_types version) const override {
|
|
return table().make_sstable(sstables::sstable_state::normal, version);
|
|
}
|
|
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
|
return _sstables_manager.configure_writer(std::move(origin));
|
|
}
|
|
|
|
api::timestamp_type min_memtable_timestamp() const override {
|
|
return table().min_memtable_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_timestamp() const override {
|
|
return table().min_memtable_live_timestamp();
|
|
}
|
|
api::timestamp_type min_memtable_live_row_marker_timestamp() const override {
|
|
return table().min_memtable_live_row_marker_timestamp();
|
|
}
|
|
bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
|
|
future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
|
|
return table().try_get_compaction_group_view_with_static_sharding().on_compaction_completion(std::move(desc), offstrategy);
|
|
}
|
|
bool is_auto_compaction_disabled_by_user() const noexcept override {
|
|
return table().is_auto_compaction_disabled_by_user();
|
|
}
|
|
bool tombstone_gc_enabled() const noexcept override {
|
|
return table().tombstone_gc_enabled();
|
|
}
|
|
tombstone_gc_state get_tombstone_gc_state() const noexcept override {
|
|
return _tombstone_gc_state;
|
|
}
|
|
compaction::compaction_backlog_tracker& get_backlog_tracker() override {
|
|
return _backlog_tracker;
|
|
}
|
|
const std::string get_group_id() const noexcept override {
|
|
return _group_id;
|
|
}
|
|
seastar::condition_variable& get_staging_done_condition() noexcept override {
|
|
return _staging_condition;
|
|
}
|
|
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override {
|
|
return table().get_token_range_after_split(t);
|
|
}
|
|
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
|
|
};
|
|
|
|
table_for_tests::data::data()
|
|
{ }
|
|
|
|
table_for_tests::data::~data() {}
|
|
|
|
schema_ptr table_for_tests::make_default_schema() {
|
|
return schema_builder(some_keyspace, some_column_family)
|
|
.with_column(utf8_type->decompose("p1"), utf8_type, column_kind::partition_key)
|
|
.build();
|
|
}
|
|
|
|
table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, compaction::compaction_manager& cm, schema_ptr s, replica::table::config cfg, data_dictionary::storage_options storage)
|
|
: _data(make_lw_shared<data>())
|
|
{
|
|
cfg.cf_stats = &_data->cf_stats;
|
|
_data->s = s ? s : make_default_schema();
|
|
_data->cf = make_lw_shared<replica::column_family>(_data->s, std::move(cfg), make_lw_shared<replica::storage_options>(storage), cm, sstables_manager, _data->cl_stats, sstables_manager.get_cache_tracker(), nullptr);
|
|
_data->cf->mark_ready_for_writes(nullptr);
|
|
_data->table_s = std::make_unique<compaction_group_view>(*_data, sstables_manager);
|
|
cm.add(*_data->table_s);
|
|
_data->storage = std::move(storage);
|
|
}
|
|
|
|
compaction::compaction_group_view& table_for_tests::as_compaction_group_view() noexcept {
|
|
return *_data->table_s;
|
|
}
|
|
|
|
future<> table_for_tests::stop() {
|
|
auto data = _data;
|
|
co_await data->cf->get_compaction_manager().remove(*data->table_s);
|
|
co_await data->cf->stop();
|
|
}
|
|
|
|
void table_for_tests::set_tombstone_gc_enabled(bool tombstone_gc_enabled) noexcept {
|
|
_data->cf->set_tombstone_gc_enabled(tombstone_gc_enabled);
|
|
}
|
|
|
|
void table_for_tests::set_repair_sstable_classifier(replica::repair_classifier_func repair_sstable_classifier) {
|
|
_data->cf->for_each_compaction_group([&] (replica::compaction_group& cg) {
|
|
cg.set_repair_sstable_classifier(repair_sstable_classifier);
|
|
});
|
|
}
|
|
|
|
namespace sstables {
|
|
|
|
std::vector<db::object_storage_endpoint_param> make_storage_options_config(const data_dictionary::storage_options& so) {
|
|
std::vector<db::object_storage_endpoint_param> endpoints;
|
|
std::visit(overloaded_functor {
|
|
[] (const data_dictionary::storage_options::local& loc) mutable -> void {
|
|
},
|
|
[&endpoints] (const data_dictionary::storage_options::object_storage& os) mutable -> void {
|
|
if (os.type == data_dictionary::storage_options::S3_NAME) {
|
|
endpoints.emplace_back(os.endpoint,
|
|
s3::endpoint_config {
|
|
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
|
|
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
|
|
.region = tests::getenv_or_default("AWS_DEFAULT_REGION", "local"),
|
|
});
|
|
}
|
|
if (os.type == data_dictionary::storage_options::GS_NAME) {
|
|
endpoints.emplace_back(db::object_storage_endpoint_param::gs_storage{
|
|
.endpoint = os.endpoint,
|
|
.credentials_file = tests::getenv_or_default("GS_CREDENTIALS_FILE", "none")
|
|
});
|
|
}
|
|
}
|
|
}, so.value);
|
|
return endpoints;
|
|
}
|
|
|
|
std::unique_ptr<db::config> make_db_config(sstring temp_dir, const data_dictionary::storage_options so) {
|
|
auto cfg = std::make_unique<db::config>();
|
|
cfg->data_file_directories.set({ temp_dir });
|
|
cfg->object_storage_endpoints(make_storage_options_config(so));
|
|
return cfg;
|
|
}
|
|
|
|
struct test_env::impl {
|
|
std::optional<tmpdir> local_dir;
|
|
tmpdir& dir;
|
|
std::unique_ptr<db::config> db_config;
|
|
directory_semaphore dir_sem;
|
|
::cache_tracker cache_tracker;
|
|
gms::feature_service feature_service;
|
|
db::nop_large_data_handler nop_ld_handler;
|
|
db::nop_corrupt_data_handler nop_cd_handler;
|
|
sstable_compressor_factory& scf;
|
|
test_env_sstables_manager mgr;
|
|
std::unique_ptr<test_env_compaction_manager> cmgr;
|
|
reader_concurrency_semaphore semaphore;
|
|
sstables::sstable_generation_generator gen;
|
|
data_dictionary::storage_options storage;
|
|
abort_source abort;
|
|
|
|
impl(test_env_config cfg, sstable_compressor_factory&, sstables::storage_manager* sstm, tmpdir* tdir);
|
|
impl(impl&&) = delete;
|
|
impl(const impl&) = delete;
|
|
|
|
sstables::generation_type new_generation() noexcept {
|
|
return gen();
|
|
}
|
|
};
|
|
|
|
test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, sstables::storage_manager* sstm, tmpdir* tdir)
|
|
: local_dir(tdir == nullptr ? std::optional<tmpdir>(std::in_place) : std::optional<tmpdir>(std::nullopt))
|
|
, dir(tdir == nullptr ? local_dir.value() : *tdir)
|
|
, db_config(make_db_config(dir.path().native(), cfg.storage))
|
|
, dir_sem(1)
|
|
, feature_service({get_disabled_features_from_db_config(*db_config)})
|
|
, nop_cd_handler(db::corrupt_data_handler::register_metrics::no)
|
|
, scf(scfarg)
|
|
, mgr(
|
|
"test_env",
|
|
cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler,
|
|
cfg.corrupt_data_handler == nullptr ? nop_cd_handler : *cfg.corrupt_data_handler,
|
|
sstables::sstables_manager::config{
|
|
.available_memory = cfg.available_memory,
|
|
.enable_sstable_key_validation = db_config->enable_sstable_key_validation(),
|
|
.memory_reclaim_threshold = db_config->components_memory_reclaim_threshold,
|
|
.data_file_directories = db_config->data_file_directories(),
|
|
.format = db_config->sstable_format,
|
|
},
|
|
feature_service,
|
|
cache_tracker,
|
|
dir_sem,
|
|
[host_id = locator::host_id::create_random_id()]{ return host_id; },
|
|
scf,
|
|
abort,
|
|
{}, // extensions
|
|
current_scheduling_group(),
|
|
sstm)
|
|
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
|
|
, storage(std::move(cfg.storage))
|
|
{
|
|
}
|
|
|
|
test_env::test_env(test_env_config cfg, sstable_compressor_factory& scf, sstables::storage_manager* sstm, tmpdir* tmp)
|
|
: _impl(std::make_unique<impl>(std::move(cfg), scf, sstm, tmp))
|
|
{
|
|
}
|
|
|
|
test_env::test_env(test_env&&) noexcept = default;
|
|
|
|
test_env::~test_env() = default;
|
|
|
|
void test_env::maybe_start_compaction_manager(bool enable) {
|
|
if (!_impl->cmgr) {
|
|
_impl->cmgr = std::make_unique<test_env_compaction_manager>();
|
|
if (enable) {
|
|
_impl->cmgr->get_compaction_manager().enable();
|
|
}
|
|
}
|
|
}
|
|
|
|
future<> test_env::stop() {
|
|
if (_impl->cmgr) {
|
|
if (_impl->cmgr->get_compaction_manager().is_running()) {
|
|
co_await _impl->cmgr->get_compaction_manager().stop();
|
|
} else {
|
|
co_await _impl->cmgr->get_compaction_manager().get_task_manager_module().stop();
|
|
}
|
|
}
|
|
co_await _impl->mgr.close();
|
|
co_await _impl->semaphore.stop();
|
|
}
|
|
|
|
class mock_sstables_registry : public sstables::sstables_registry {
|
|
struct entry {
|
|
sstring status;
|
|
sstables::sstable_state state;
|
|
sstables::entry_descriptor desc;
|
|
};
|
|
std::map<std::pair<table_id, generation_type>, entry> _entries;
|
|
public:
|
|
virtual future<> create_entry(table_id owner, sstring status, sstable_state state, sstables::entry_descriptor desc) override {
|
|
_entries.emplace(std::make_pair(owner, desc.generation), entry { status, state, desc });
|
|
co_return;
|
|
};
|
|
virtual future<> update_entry_status(table_id owner, sstables::generation_type gen, sstring status) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
it->second.status = status;
|
|
} else {
|
|
throw std::runtime_error("update_entry_status: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> update_entry_state(table_id owner, sstables::generation_type gen, sstables::sstable_state state) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
it->second.state = state;
|
|
} else {
|
|
throw std::runtime_error("update_entry_state: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> delete_entry(table_id owner, sstables::generation_type gen) override {
|
|
auto it = _entries.find(std::make_pair(owner, gen));
|
|
if (it != _entries.end()) {
|
|
_entries.erase(it);
|
|
} else {
|
|
throw std::runtime_error("delete_entry: not found");
|
|
}
|
|
co_return;
|
|
}
|
|
virtual future<> sstables_registry_list(table_id owner, entry_consumer consumer) override {
|
|
for (auto& [loc_and_gen, e] : _entries) {
|
|
if (loc_and_gen.first == owner) {
|
|
co_await consumer(e.status, e.state, e.desc);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
future<> test_env::do_with_async(noncopyable_function<void (test_env&)> func, test_env_config cfg) {
|
|
tests::adjust_rlimit();
|
|
if (!cfg.storage.is_local_type()) {
|
|
auto db_cfg = make_shared<db::config>();
|
|
db_cfg->experimental_features({db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS});
|
|
db_cfg->object_storage_endpoints(make_storage_options_config(cfg.storage));
|
|
return seastar::async([func = std::move(func), cfg = std::move(cfg), db_cfg = std::move(db_cfg)] () mutable {
|
|
sharded<sstables::storage_manager> sstm;
|
|
sstm.start(std::ref(*db_cfg), sstables::storage_manager::config{}).get();
|
|
auto stop_sstm = defer([&] { sstm.stop().get(); });
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
test_env env(std::move(cfg), *scf, &sstm.local());
|
|
auto close_env = defer([&] { env.stop().get(); });
|
|
env.manager().plug_sstables_registry(std::make_unique<mock_sstables_registry>());
|
|
auto unplu = defer([&env] { env.manager().unplug_sstables_registry(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
return seastar::async([func = std::move(func), cfg = std::move(cfg)] () mutable {
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
test_env env(std::move(cfg), *scf);
|
|
auto close_env = defer([&] { env.stop().get(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
sstables::generation_type
|
|
test_env::new_generation() noexcept {
|
|
return _impl->new_generation();
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstring dir, sstables::generation_type generation,
|
|
sstable::version_types v, sstable::format_types f,
|
|
size_t buffer_size, db_clock::time_point now) {
|
|
// FIXME -- most of the callers work with _impl->dir's path, so
|
|
// test_env can initialize the .dir/.prefix only once, when constructed
|
|
auto storage = _impl->storage;
|
|
std::visit(overloaded_functor {
|
|
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
|
[&schema] (data_dictionary::storage_options::object_storage& o) { o.location = schema->id(); },
|
|
}, storage.value);
|
|
return _impl->mgr.make_sstable(std::move(schema), storage, generation, sstables::sstable_state::normal, v, f, now, default_io_error_handler_gen(), buffer_size);
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstring dir, sstable::version_types v) {
|
|
return make_sstable(std::move(schema), std::move(dir), new_generation(), std::move(v));
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstables::generation_type generation,
|
|
sstable::version_types v, sstable::format_types f,
|
|
size_t buffer_size, db_clock::time_point now) {
|
|
return make_sstable(std::move(schema), _impl->dir.path().native(), generation, std::move(v), std::move(f), buffer_size, now);
|
|
}
|
|
|
|
shared_sstable
|
|
test_env::make_sstable(schema_ptr schema, sstable::version_types v) {
|
|
return make_sstable(std::move(schema), _impl->dir.path().native(), std::move(v));
|
|
}
|
|
|
|
std::function<shared_sstable()>
|
|
test_env::make_sst_factory(schema_ptr s) {
|
|
return [this, s = std::move(s)] {
|
|
return make_sstable(s, new_generation());
|
|
};
|
|
}
|
|
|
|
std::function<shared_sstable()>
|
|
test_env::make_sst_factory(schema_ptr s, sstable::version_types version) {
|
|
return [this, s = std::move(s), version] {
|
|
return make_sstable(s, new_generation(), version);
|
|
};
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation,
|
|
sstable::version_types version, sstable::format_types f, sstable_open_config cfg) {
|
|
auto sst = make_sstable(std::move(schema), dir, generation, version, f);
|
|
return sst->load(sst->get_schema()->get_sharder(), cfg).then([sst = std::move(sst)] {
|
|
return make_ready_future<shared_sstable>(std::move(sst));
|
|
});
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type::int_t gen_value,
|
|
sstable::version_types version, sstable::format_types f) {
|
|
return reusable_sst(std::move(schema), std::move(dir), sstables::generation_type(gen_value), version, f);
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation,
|
|
sstable::version_types version, sstable::format_types f) {
|
|
return reusable_sst(std::move(schema), _impl->dir.path().native(), std::move(generation), std::move(version), std::move(f));
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, shared_sstable sst) {
|
|
return reusable_sst(std::move(schema), sst->get_storage().prefix(), sst->generation(), sst->get_version());
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(shared_sstable sst) {
|
|
return reusable_sst(sst->get_schema(), sst);
|
|
}
|
|
|
|
future<shared_sstable>
|
|
test_env::reusable_sst(schema_ptr schema, sstables::generation_type generation) {
|
|
return reusable_sst(std::move(schema), _impl->dir.path().native(), generation);
|
|
}
|
|
|
|
test_env_sstables_manager&
|
|
test_env::manager() {
|
|
return _impl->mgr;
|
|
}
|
|
|
|
test_env_compaction_manager&
|
|
test_env::test_compaction_manager() {
|
|
return *_impl->cmgr;
|
|
}
|
|
|
|
reader_concurrency_semaphore&
|
|
test_env::semaphore() {
|
|
return _impl->semaphore;
|
|
}
|
|
|
|
db::config&
|
|
test_env::db_config() {
|
|
return *_impl->db_config;
|
|
}
|
|
|
|
tmpdir&
|
|
test_env::tempdir() noexcept {
|
|
return _impl->dir;
|
|
}
|
|
|
|
data_dictionary::storage_options
|
|
test_env::get_storage_options() const noexcept {
|
|
return _impl->storage;
|
|
}
|
|
|
|
reader_permit
|
|
test_env::make_reader_permit(const schema_ptr &s, const char* n, db::timeout_clock::time_point timeout) {
|
|
return _impl->semaphore.make_tracking_only_permit(s, n, timeout, {});
|
|
}
|
|
|
|
reader_permit
|
|
test_env::make_reader_permit(db::timeout_clock::time_point timeout) {
|
|
return _impl->semaphore.make_tracking_only_permit(nullptr, "test", timeout, {});
|
|
}
|
|
|
|
replica::table::config
|
|
test_env::make_table_config() {
|
|
return replica::table::config{.compaction_concurrency_semaphore = &_impl->semaphore};
|
|
}
|
|
|
|
future<>
|
|
test_env::do_with_sharded_async(noncopyable_function<void (sharded<test_env>&)> func) {
|
|
return seastar::async([func = std::move(func)] {
|
|
tmpdir tdir;
|
|
sharded<test_env> env;
|
|
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
|
|
env.start(test_env_config{}, std::ref(*scf), nullptr, &tdir).get();
|
|
auto stop = defer([&] { env.stop().get(); });
|
|
func(env);
|
|
});
|
|
}
|
|
|
|
table_for_tests
|
|
test_env::make_table_for_tests(schema_ptr s, sstring dir) {
|
|
maybe_start_compaction_manager();
|
|
auto cfg = make_table_config();
|
|
cfg.enable_commitlog = false;
|
|
auto storage = _impl->storage;
|
|
std::visit(overloaded_functor {
|
|
[&dir] (data_dictionary::storage_options::local& o) { o.dir = dir; },
|
|
[&s] (data_dictionary::storage_options::object_storage& o) { o.location = s->id(); },
|
|
}, storage.value);
|
|
return table_for_tests(manager(), _impl->cmgr->get_compaction_manager(), s, std::move(cfg), std::move(storage));
|
|
}
|
|
|
|
table_for_tests
|
|
test_env::make_table_for_tests(schema_ptr s) {
|
|
return make_table_for_tests(std::move(s), _impl->dir.path().native());
|
|
}
|
|
|
|
sstables::sstable_set test_env::make_sstable_set(compaction::compaction_strategy& cs, schema_ptr s) {
|
|
auto t = make_table_for_tests(s);
|
|
auto close_t = deferred_stop(t);
|
|
return cs.make_sstable_set(t.as_compaction_group_view());
|
|
}
|
|
|
|
void test_env::request_abort() {
|
|
_impl->abort.request_abort();
|
|
}
|
|
|
|
data_dictionary::storage_options make_test_object_storage_options(std::string_view type) {
|
|
data_dictionary::storage_options ret;
|
|
auto t = std::string(type);
|
|
ret.value = data_dictionary::storage_options::object_storage {
|
|
.bucket = tests::getenv_safe(t + "_BUCKET_FOR_TEST"),
|
|
.endpoint = tests::getenv_safe(t + "_SERVER_ADDRESS_FOR_TEST"),
|
|
.type = t
|
|
};
|
|
return ret;
|
|
}
|
|
|
|
static sstring toc_filename(const sstring& dir, schema_ptr schema, sstables::generation_type generation, sstable_version_types v) {
|
|
return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation,
|
|
sstable_format_types::big, component_type::TOC);
|
|
}
|
|
|
|
future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, sstables::generation_type generation) {
|
|
for (auto v : std::views::reverse(all_sstable_versions)) {
|
|
if (co_await file_exists(toc_filename(dir, schema, generation, v))) {
|
|
co_return co_await reusable_sst(schema, dir, generation, v);
|
|
}
|
|
}
|
|
throw sst_not_found(dir, generation);
|
|
}
|
|
|
|
void test_env_compaction_manager::propagate_replacement(compaction::compaction_group_view& table_s, const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
|
|
_cm.propagate_replacement(table_s, removed, added);
|
|
}
|
|
|
|
// Test version of compaction_manager::perform_compaction<>()
|
|
future<> test_env_compaction_manager::perform_compaction(shared_ptr<compaction::compaction_task_executor> task) {
|
|
_cm._tasks.push_back(*task);
|
|
auto unregister_task = defer([task] {
|
|
if (!task->is_linked()) {
|
|
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", task->compaction_data().compaction_uuid);
|
|
}
|
|
task->unlink();
|
|
task->switch_state(compaction::compaction_task_executor::state::none);
|
|
});
|
|
co_await task->run_compaction();
|
|
}
|
|
|
|
}
|
|
|
|
static std::pair<int, char**> rebuild_arg_list_without(int argc, char** argv, const char* filter_out, bool exclude_positional_arg = false) {
|
|
int new_argc = 0;
|
|
char** new_argv = (char**) malloc(argc * sizeof(char*));
|
|
std::memset(new_argv, 0, argc * sizeof(char*));
|
|
bool exclude_next_arg = false;
|
|
for (auto i = 0; i < argc; i++) {
|
|
if (std::exchange(exclude_next_arg, false)) {
|
|
continue;
|
|
}
|
|
if (strcmp(argv[i], filter_out) == 0) {
|
|
// if arg filtered out has positional arg, that has to be excluded too.
|
|
exclude_next_arg = exclude_positional_arg;
|
|
continue;
|
|
}
|
|
new_argv[new_argc] = (char*) malloc(strlen(argv[i]) + 1);
|
|
std::strcpy(new_argv[new_argc], argv[i]);
|
|
new_argc++;
|
|
}
|
|
return std::make_pair(new_argc, new_argv);
|
|
}
|
|
|
|
static void free_arg_list(int argc, char** argv) {
|
|
for (auto i = 0; i < argc; i++) {
|
|
if (argv[i]) {
|
|
free(argv[i]);
|
|
}
|
|
}
|
|
free(argv);
|
|
}
|
|
|
|
scylla_tests_cmdline_options_processor::~scylla_tests_cmdline_options_processor() {
|
|
if (_new_argv) {
|
|
free_arg_list(_new_argc, _new_argv);
|
|
}
|
|
}
|
|
|
|
std::pair<int, char**> scylla_tests_cmdline_options_processor::process_cmdline_options(int argc, char** argv) {
|
|
namespace po = boost::program_options;
|
|
|
|
// Removes -- (intended to separate boost suite args from seastar ones) which confuses boost::program_options.
|
|
auto [new_argc, new_argv] = rebuild_arg_list_without(argc, argv, "--");
|
|
auto _ = defer([argc = new_argc, argv = new_argv] {
|
|
free_arg_list(argc, argv);
|
|
});
|
|
|
|
po::options_description desc("Scylla tests additional options");
|
|
desc.add_options()
|
|
("help", "Produces help message");
|
|
po::variables_map vm;
|
|
|
|
po::parsed_options parsed = po::command_line_parser(new_argc, new_argv).
|
|
options(desc).
|
|
allow_unregistered().
|
|
run();
|
|
|
|
po::store(parsed, vm);
|
|
po::notify(vm);
|
|
|
|
if (vm.count("help")) {
|
|
std::cout << desc << std::endl;
|
|
return std::make_pair(argc, argv);
|
|
}
|
|
|
|
return std::make_pair(argc, argv);
|
|
}
|