Files
scylladb/test/lib/sstable_utils.cc
Raphael S. Carvalho d73ffe7220 sstables: Temporarily disable loading of first and last position metadata
It's known that reading large cells in reverse cause large allocations.
Source: https://github.com/scylladb/scylladb/issues/11642

The loading is preliminary work for splitting large partitions into
fragments composing a run and then be able to later read such a run
in an efficiency way using the position metadata.

The splitting is not turned on yet, anywhere. Therefore, we can
temporarily disable the loading, as a way to avoid regressions in
stable versions. Large allocations can cause stalls due to foreground
memory eviction kicking in.
The default values for position metadata say that first and last
position include all clustering rows, but they aren't used anywhere
other than by sstable_run to determine if a run is disjoint at
clustering level, but given that no splitting is done yet, it
does not really matter.

Unit tests relying on position metadata were adjusted to enable
the loading, such that they can still pass.

Fixes #11642.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #12979
2023-02-24 12:14:18 +02:00

192 lines
7.5 KiB
C++

/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/sstable_utils.hh"
#include "replica/database.hh"
#include "replica/memtable-sstable.hh"
#include "dht/i_partitioner.hh"
#include "dht/murmur3_partitioner.hh"
#include <boost/range/irange.hpp>
#include <boost/range/adaptor/reversed.hpp>
#include "sstables/version.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include <seastar/core/reactor.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/coroutine.hh>
using namespace sstables;
using namespace std::chrono_literals;
sstables::shared_sstable make_sstable_containing(std::function<sstables::shared_sstable()> sst_factory, std::vector<mutation> muts) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto sst = sst_factory();
schema_ptr s = muts[0].schema();
auto mt = make_lw_shared<replica::memtable>(s);
std::size_t i{0};
for (auto&& m : muts) {
mt->apply(m);
++i;
// Give the reactor some time to breathe
if(i == 10) {
seastar::thread::yield();
i = 0;
}
}
write_memtable_to_sstable_for_test(*mt, sst).get();
sstable_open_config cfg { .load_first_and_last_position_metadata = true };
sst->open_data(cfg).get();
std::set<mutation, mutation_decorated_key_less_comparator> merged;
for (auto&& m : muts) {
auto it = merged.find(m);
if (it == merged.end()) {
merged.insert(std::move(m));
} else {
auto old = merged.extract(it);
old.value().apply(std::move(m));
merged.insert(std::move(old));
}
}
// validate the sstable
auto rd = assert_that(sst->as_mutation_source().make_reader_v2(s, semaphore.make_permit()));
for (auto&& m : merged) {
rd.produces(m);
}
rd.produces_end_of_stream();
return sst;
}
shared_sstable make_sstable(sstables::test_env& env, schema_ptr s, sstring dir, std::vector<mutation> mutations,
sstable_writer_config cfg, sstables::sstable::version_types version, gc_clock::time_point query_time) {
auto mt = make_lw_shared<replica::memtable>(s);
fs::path dir_path(dir);
for (auto&& m : mutations) {
mt->apply(m);
}
return make_sstable_easy(env, dir_path, mt, cfg, 1, version, mutations.size(), query_time);
}
shared_sstable make_sstable_easy(test_env& env, const fs::path& path, flat_mutation_reader_v2 rd, sstable_writer_config cfg,
int64_t generation, const sstables::sstable::version_types version, int expected_partition) {
auto s = rd.schema();
auto sst = env.make_sstable(s, path.string(), generation, version, sstable_format_types::big);
sst->write_components(std::move(rd), expected_partition, s, cfg, encoding_stats{}).get();
sst->load().get();
return sst;
}
shared_sstable make_sstable_easy(test_env& env, const fs::path& path, lw_shared_ptr<replica::memtable> mt, sstable_writer_config cfg,
unsigned long gen, const sstable::version_types v, int estimated_partitions, gc_clock::time_point query_time) {
schema_ptr s = mt->schema();
auto sst = env.make_sstable(s, path.string(), gen, v, sstable_format_types::big, default_sstable_buffer_size, query_time);
auto mr = mt->make_flat_reader(s, env.make_reader_permit());
sst->write_components(std::move(mr), estimated_partitions, s, cfg, mt->get_encoding_stats()).get();
sst->load().get();
return sst;
}
future<compaction_result> compact_sstables(compaction_manager& cm, sstables::compaction_descriptor descriptor, table_state& table_s, std::function<shared_sstable()> creator, compaction_sstable_replacer_fn replacer,
can_purge_tombstones can_purge) {
descriptor.creator = [creator = std::move(creator)] (shard_id dummy) mutable {
return creator();
};
descriptor.replacer = std::move(replacer);
if (can_purge) {
descriptor.enable_garbage_collection(table_s.main_sstable_set());
}
auto cmt = compaction_manager_test(cm);
sstables::compaction_result ret;
co_await cmt.run(descriptor.run_identifier, table_s, [&] (sstables::compaction_data& cdata) {
return sstables::compact_sstables(std::move(descriptor), cdata, table_s).then([&] (sstables::compaction_result res) {
ret = std::move(res);
});
});
co_return ret;
}
static sstring toc_filename(const sstring& dir, schema_ptr schema, unsigned int generation, sstable_version_types v) {
return sstable::filename(dir, schema->ks_name(), schema->cf_name(), v, generation_from_value(generation),
sstable_format_types::big, component_type::TOC);
}
future<shared_sstable> test_env::reusable_sst(schema_ptr schema, sstring dir, unsigned long generation) {
for (auto v : boost::adaptors::reverse(all_sstable_versions)) {
if (co_await file_exists(toc_filename(dir, schema, generation, v))) {
co_return co_await reusable_sst(schema, dir, generation, v);
}
}
throw sst_not_found(dir, generation);
}
compaction_manager_for_testing::wrapped_compaction_manager::wrapped_compaction_manager(bool enabled)
: cm(tm, compaction_manager::for_testing_tag{})
{
if (enabled) {
cm.enable();
}
}
// Must run in a seastar thread
compaction_manager_for_testing::wrapped_compaction_manager::~wrapped_compaction_manager() {
cm.stop().get();
}
class compaction_manager::compaction_manager_test_task : public compaction_manager::task {
sstables::run_id _run_id;
noncopyable_function<future<> (sstables::compaction_data&)> _job;
public:
compaction_manager_test_task(compaction_manager& cm, table_state& table_s, sstables::run_id run_id, noncopyable_function<future<> (sstables::compaction_data&)> job)
: compaction_manager::task(cm, &table_s, sstables::compaction_type::Compaction, "Test compaction")
, _run_id(run_id)
, _job(std::move(job))
{ }
protected:
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
setup_new_compaction(_run_id);
return _job(_compaction_data).then([] {
return make_ready_future<compaction_stats_opt>(std::nullopt);
});
}
};
future<> compaction_manager_test::run(sstables::run_id output_run_id, table_state& table_s, noncopyable_function<future<> (sstables::compaction_data&)> job) {
auto task = make_shared<compaction_manager::compaction_manager_test_task>(_cm, table_s, output_run_id, std::move(job));
auto& cdata = register_compaction(task);
return task->run().discard_result().finally([this, &cdata] {
deregister_compaction(cdata);
});
}
sstables::compaction_data& compaction_manager_test::register_compaction(shared_ptr<compaction_manager::task> task) {
testlog.debug("compaction_manager_test: register_compaction uuid={}: {}", task->compaction_data().compaction_uuid, *task);
_cm._tasks.push_back(task);
return task->compaction_data();
}
void compaction_manager_test::deregister_compaction(const sstables::compaction_data& c) {
auto it = boost::find_if(_cm._tasks, [&c] (auto& task) { return task->compaction_data().compaction_uuid == c.compaction_uuid; });
if (it != _cm._tasks.end()) {
auto task = *it;
testlog.debug("compaction_manager_test: deregister_compaction uuid={}: {}", c.compaction_uuid, *task);
_cm._tasks.erase(it);
} else {
testlog.error("compaction_manager_test: deregister_compaction uuid={}: task not found", c.compaction_uuid);
}
}