Some files in compaction/ have using namespace {compaction,sstables}
clauses, some even in headers. This is considered bad practice and
muddies the namespace use. Remove them.
531 lines
26 KiB
C++
531 lines
26 KiB
C++
/*
|
|
* Copyright (C) 2019 ScyllaDB
|
|
*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <boost/test/unit_test.hpp>
|
|
#include <boost/range/iterator_range_core.hpp>
|
|
#include <memory>
|
|
#include <utility>
|
|
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/future-util.hh>
|
|
#include <seastar/core/do_with.hh>
|
|
#include <seastar/core/sharded.hh>
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include "sstables/sstables.hh"
|
|
#include "compaction/incremental_compaction_strategy.hh"
|
|
#include "schema/schema.hh"
|
|
#include "replica/database.hh"
|
|
#include "compaction/compaction_manager.hh"
|
|
#include "sstable_test.hh"
|
|
#include "sstables/metadata_collector.hh"
|
|
#include "test/lib/tmpdir.hh"
|
|
#include "replica/cell_locking.hh"
|
|
#include "test/lib/mutation_reader_assertions.hh"
|
|
#include "test/lib/key_utils.hh"
|
|
#include "service/storage_proxy.hh"
|
|
#include "test/lib/sstable_run_based_compaction_strategy_for_tests.hh"
|
|
#include "dht/i_partitioner.hh"
|
|
#include "dht/murmur3_partitioner.hh"
|
|
#include "db/large_data_handler.hh"
|
|
#include "db/config.hh"
|
|
#include "repair/incremental.hh"
|
|
|
|
#include "test/lib/sstable_utils.hh"
|
|
#include "test/lib/test_services.hh"
|
|
|
|
using namespace sstables;
|
|
|
|
static mutation_reader sstable_reader(reader_permit permit, shared_sstable sst, schema_ptr s) {
|
|
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
|
|
|
|
}
|
|
|
|
class strategy_control_for_test : public compaction::strategy_control {
|
|
bool _has_ongoing_compaction;
|
|
public:
|
|
explicit strategy_control_for_test(bool has_ongoing_compaction) noexcept : _has_ongoing_compaction(has_ongoing_compaction) {}
|
|
|
|
bool has_ongoing_compaction(compaction::compaction_group_view& table_s) const noexcept override {
|
|
return _has_ongoing_compaction;
|
|
}
|
|
virtual future<std::vector<sstables::shared_sstable>> candidates(compaction::compaction_group_view& t) const override {
|
|
auto main_set = co_await t.main_sstable_set();
|
|
co_return boost::copy_range<std::vector<sstables::shared_sstable>>(*main_set->all());
|
|
}
|
|
virtual future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction::compaction_group_view& t) const override {
|
|
auto main_set = co_await t.main_sstable_set();
|
|
co_return main_set->all_sstable_runs();
|
|
}
|
|
};
|
|
|
|
static std::unique_ptr<compaction::strategy_control> make_strategy_control_for_test(bool has_ongoing_compaction) {
|
|
return std::make_unique<strategy_control_for_test>(has_ongoing_compaction);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(incremental_compaction_test) {
|
|
return sstables::test_env::do_with_async([&] (sstables::test_env& env) {
|
|
auto builder = schema_builder("tests", "incremental_compaction_test")
|
|
.with_column("id", utf8_type, column_kind::partition_key)
|
|
.with_column("value", int32_type)
|
|
.with_partitioner("org.apache.cassandra.dht.Murmur3Partitioner")
|
|
.with_sharder(smp::count, 0);
|
|
auto s = builder.build();
|
|
|
|
auto tmp = make_lw_shared<tmpdir>();
|
|
auto sst_gen = [&env, s, tmp] () mutable {
|
|
auto sst = env.make_sstable(s, tmp->path().string(), env.new_generation(), sstable_version_types::md, big);
|
|
return sst;
|
|
};
|
|
|
|
table_for_tests cf = env.make_table_for_tests(s, tmp->path().string());
|
|
auto close_cf = deferred_stop(cf);
|
|
cf->set_compaction_strategy(compaction::compaction_strategy_type::size_tiered);
|
|
auto compact = [&, s] (std::vector<shared_sstable> all, auto replacer) -> std::vector<shared_sstable> {
|
|
auto desc = compaction::compaction_descriptor(std::move(all), 1, 0);
|
|
desc.enable_garbage_collection(cf->get_sstable_set());
|
|
return compact_sstables(env, std::move(desc), cf, sst_gen, replacer).get().new_sstables;
|
|
};
|
|
auto make_insert = [&] (auto p) {
|
|
auto key = p.key();
|
|
mutation m(s, key);
|
|
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 1 /* ts */);
|
|
BOOST_REQUIRE(m.decorated_key().token() == p.token());
|
|
return m;
|
|
};
|
|
|
|
auto tokens = tests::generate_partition_keys(16, s, local_shard_only::yes, tests::key_size{8, 8});
|
|
std::unordered_set<shared_sstable> sstables;
|
|
std::vector<utils::observer<sstable&>> observers;
|
|
sstables::sstable_run_based_compaction_strategy_for_tests cs;
|
|
|
|
auto do_replace = [&] (const std::vector<shared_sstable>& old_sstables, const std::vector<shared_sstable>& new_sstables) {
|
|
for (auto& old_sst : old_sstables) {
|
|
BOOST_REQUIRE(sstables.count(old_sst));
|
|
sstables.erase(old_sst);
|
|
}
|
|
for (auto& new_sst : new_sstables) {
|
|
BOOST_REQUIRE(!sstables.count(new_sst));
|
|
sstables.insert(new_sst);
|
|
}
|
|
column_family_test(cf).rebuild_sstable_list(cf.as_compaction_group_view(), new_sstables, old_sstables).get();
|
|
env.test_compaction_manager().propagate_replacement(cf.as_compaction_group_view(), old_sstables, new_sstables);
|
|
};
|
|
|
|
auto do_incremental_replace = [&] (auto old_sstables, auto new_sstables, auto& expected_sst, auto& closed_sstables_tracker) {
|
|
// that's because each sstable will contain only 1 mutation.
|
|
BOOST_REQUIRE(old_sstables.size() == 1);
|
|
BOOST_REQUIRE(new_sstables.size() == 1);
|
|
auto old_sstable = old_sstables.front();
|
|
// check that sstable replacement follows token order
|
|
BOOST_REQUIRE(*expected_sst == old_sstable->generation());
|
|
expected_sst++;
|
|
// check that previously released sstables were already closed
|
|
BOOST_REQUIRE(*closed_sstables_tracker == old_sstable->generation());
|
|
|
|
do_replace(old_sstables, new_sstables);
|
|
|
|
observers.push_back(old_sstable->add_on_closed_handler([&] (sstable& sst) {
|
|
BOOST_TEST_MESSAGE(fmt::format("Closing sstable of generation {}", sst.generation()));
|
|
closed_sstables_tracker++;
|
|
}));
|
|
|
|
BOOST_TEST_MESSAGE(fmt::format("Removing sstable of generation {}, refcnt: {}", old_sstables.front()->generation(), old_sstables.front().use_count()));
|
|
};
|
|
|
|
auto do_compaction = [&] (size_t expected_input, size_t expected_output) -> std::vector<shared_sstable> {
|
|
auto control = make_strategy_control_for_test(false);
|
|
auto desc = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
|
|
// nothing to compact, move on.
|
|
if (desc.sstables.empty()) {
|
|
return {};
|
|
}
|
|
std::unordered_set<sstables::run_id> run_ids;
|
|
bool incremental_enabled = std::any_of(desc.sstables.begin(), desc.sstables.end(), [&run_ids] (shared_sstable& sst) {
|
|
return !run_ids.insert(sst->run_identifier()).second;
|
|
});
|
|
|
|
BOOST_REQUIRE(desc.sstables.size() == expected_input);
|
|
auto sstable_run = desc.sstables
|
|
| std::views::transform([] (auto& sst) { return sst->generation(); })
|
|
| std::ranges::to<std::set>();
|
|
auto expected_sst = sstable_run.begin();
|
|
auto closed_sstables_tracker = sstable_run.begin();
|
|
auto replacer = [&] (compaction::compaction_completion_desc ccd) {
|
|
BOOST_REQUIRE(expected_sst != sstable_run.end());
|
|
if (incremental_enabled) {
|
|
do_incremental_replace(std::move(ccd.old_sstables), std::move(ccd.new_sstables), expected_sst, closed_sstables_tracker);
|
|
} else {
|
|
do_replace(std::move(ccd.old_sstables), std::move(ccd.new_sstables));
|
|
expected_sst = sstable_run.end();
|
|
}
|
|
};
|
|
|
|
auto result = compact(std::move(desc.sstables), replacer);
|
|
|
|
BOOST_REQUIRE_EQUAL(expected_output, result.size());
|
|
BOOST_REQUIRE(expected_sst == sstable_run.end());
|
|
return result;
|
|
};
|
|
|
|
// Generate 4 sstable runs composed of 4 fragments each after 4 compactions.
|
|
// All fragments non-overlapping.
|
|
for (auto i = 0U; i < tokens.size(); i++) {
|
|
auto sst = make_sstable_containing(sst_gen, { make_insert(tokens[i]) });
|
|
sst->set_sstable_level(1);
|
|
BOOST_REQUIRE(sst->get_sstable_level() == 1);
|
|
column_family_test(cf).add_sstable(sst).get();
|
|
sstables.insert(std::move(sst));
|
|
do_compaction(4, 4);
|
|
}
|
|
BOOST_REQUIRE(sstables.size() == 16);
|
|
|
|
// Generate 1 sstable run from 4 sstables runs of similar size
|
|
auto result = do_compaction(16, 16);
|
|
BOOST_REQUIRE(result.size() == 16);
|
|
for (auto i = 0U; i < tokens.size(); i++) {
|
|
assert_that(sstable_reader(env.semaphore().make_tracking_only_permit(s, "test reader", db::no_timeout, tracing::trace_state_ptr()), result[i], s))
|
|
.produces(make_insert(tokens[i]))
|
|
.produces_end_of_stream();
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(incremental_compaction_sag_test) {
|
|
auto builder = schema_builder("tests", "incremental_compaction_test")
|
|
.with_column("id", utf8_type, column_kind::partition_key)
|
|
.with_column("value", int32_type);
|
|
auto s = builder.build();
|
|
|
|
struct sag_test {
|
|
test_env& _env;
|
|
mutable table_for_tests _cf;
|
|
compaction::incremental_compaction_strategy _ics;
|
|
const unsigned min_threshold = 4;
|
|
const size_t data_set_size = 1'000'000'000;
|
|
|
|
static compaction::incremental_compaction_strategy make_ics(double space_amplification_goal) {
|
|
std::map<sstring, sstring> options;
|
|
options.emplace(sstring("space_amplification_goal"), sstring(std::to_string(space_amplification_goal)));
|
|
return compaction::incremental_compaction_strategy(options);
|
|
}
|
|
static replica::column_family::config make_table_config(test_env& env) {
|
|
auto config = env.make_table_config();
|
|
config.compaction_enforce_min_threshold = true;
|
|
return config;
|
|
}
|
|
|
|
sag_test(test_env& env, schema_ptr s, double space_amplification_goal)
|
|
: _env(env)
|
|
, _cf(env.make_table_for_tests(s))
|
|
, _ics(make_ics(space_amplification_goal))
|
|
{
|
|
}
|
|
|
|
double space_amplification() const {
|
|
auto sstables = _cf->get_sstables();
|
|
auto total = std::ranges::fold_left(*sstables | std::views::transform(std::mem_fn(&sstable::data_size)), uint64_t(0), std::plus{});
|
|
return double(total) / data_set_size;
|
|
}
|
|
|
|
shared_sstable make_sstable_with_size(size_t sstable_data_size) {
|
|
auto sst = _env.make_sstable(_cf->schema(), "/nowhere/in/particular", _env.new_generation(), sstable_version_types::md, big);
|
|
auto keys = tests::generate_partition_keys(2, _cf->schema(), local_shard_only::yes);
|
|
sstables::test(sst).set_values(keys[0].key(), keys[1].key(), stats_metadata{}, sstable_data_size);
|
|
return sst;
|
|
}
|
|
|
|
void populate(double target_space_amplification) {
|
|
auto add_sstable = [this] (unsigned sst_data_size) {
|
|
auto sst = make_sstable_with_size(sst_data_size);
|
|
column_family_test(_cf).add_sstable(sst).get();
|
|
};
|
|
|
|
add_sstable(data_set_size);
|
|
while (space_amplification() < target_space_amplification) {
|
|
add_sstable(data_set_size / min_threshold);
|
|
}
|
|
}
|
|
|
|
void run() {
|
|
auto& table_s = _cf.as_compaction_group_view();
|
|
auto control = make_strategy_control_for_test(false);
|
|
for (;;) {
|
|
auto desc = _ics.get_sstables_for_compaction(table_s, *control).get();
|
|
// no more jobs, bailing out...
|
|
if (desc.sstables.empty()) {
|
|
break;
|
|
}
|
|
auto total = std::ranges::fold_left(desc.sstables | std::views::transform(std::mem_fn(&sstable::data_size)), uint64_t(0), std::plus{});
|
|
std::vector<shared_sstable> new_ssts = { make_sstable_with_size(std::min(total, data_set_size)) };
|
|
column_family_test(_cf).rebuild_sstable_list(table_s, new_ssts, desc.sstables).get();
|
|
}
|
|
}
|
|
|
|
future<> stop() {
|
|
return _cf.stop();
|
|
}
|
|
};
|
|
|
|
using SAG = double;
|
|
using TABLE_INITIAL_SA = double;
|
|
|
|
auto with_sag_test = [&] (SAG sag, TABLE_INITIAL_SA initial_sa) {
|
|
test_env::do_with_async([&] (test_env& env) {
|
|
sag_test test(env, s, sag);
|
|
test.populate(initial_sa);
|
|
BOOST_REQUIRE(test.space_amplification() >= initial_sa);
|
|
test.run();
|
|
BOOST_REQUIRE(test.space_amplification() <= sag);
|
|
test.stop().get();
|
|
}).get();
|
|
};
|
|
|
|
with_sag_test(SAG(1.25), TABLE_INITIAL_SA(1.5));
|
|
with_sag_test(SAG(2), TABLE_INITIAL_SA(1.5));
|
|
with_sag_test(SAG(1.5), TABLE_INITIAL_SA(1.75));
|
|
with_sag_test(SAG(1.01), TABLE_INITIAL_SA(1.5));
|
|
with_sag_test(SAG(1.5), TABLE_INITIAL_SA(1));
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
|
return test_env::do_with_async([] (test_env& env) {
|
|
auto tmp = tmpdir();
|
|
auto s = schema_builder("ks", "cf")
|
|
.with_column("p1", utf8_type, column_kind::partition_key)
|
|
.with_column("c1", utf8_type, column_kind::clustering_key)
|
|
.with_column("r1", utf8_type)
|
|
.build();
|
|
|
|
static constexpr float expired = 0.33;
|
|
// we want number of expired keys to be ~ 1.5*sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE so as to
|
|
// test ability of histogram to return a good estimation after merging keys.
|
|
static int total_keys = std::ceil(sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE/expired)*1.5;
|
|
|
|
auto make_insert = [&] (bytes k, uint32_t ttl, uint32_t expiration_time) {
|
|
auto key = partition_key::from_exploded(*s, {k});
|
|
mutation m(s, key);
|
|
auto c_key = clustering_key::from_exploded(*s, {to_bytes("c1")});
|
|
auto live_cell = atomic_cell::make_live(*utf8_type, 0, bytes("a"), gc_clock::time_point(gc_clock::duration(expiration_time)), gc_clock::duration(ttl));
|
|
m.set_clustered_cell(c_key, *s->get_column_definition("r1"), std::move(live_cell));
|
|
return m;
|
|
};
|
|
utils::chunked_vector<mutation> mutations;
|
|
mutations.reserve(total_keys);
|
|
|
|
auto expired_keys = total_keys*expired;
|
|
auto now = gc_clock::now();
|
|
for (auto i = 0; i < expired_keys; i++) {
|
|
// generate expiration time at different time points or only a few entries would be created in histogram
|
|
auto expiration_time = (now - gc_clock::duration(DEFAULT_GC_GRACE_SECONDS*2+i)).time_since_epoch().count();
|
|
mutations.push_back(make_insert(to_bytes("expired_key" + to_sstring(i)), 1, expiration_time));
|
|
}
|
|
auto remaining = total_keys-expired_keys;
|
|
auto expiration_time = (now + gc_clock::duration(3600)).time_since_epoch().count();
|
|
for (auto i = 0; i < remaining; i++) {
|
|
mutations.push_back(make_insert(to_bytes("key" + to_sstring(i)), 3600, expiration_time));
|
|
}
|
|
|
|
table_for_tests cf = env.make_table_for_tests(s);
|
|
auto close_cf = deferred_stop(cf);
|
|
|
|
auto creator = [&] {
|
|
auto sst = env.make_sstable(s, tmp.path().string(), env.new_generation(), sstables::get_highest_sstable_version(), big);
|
|
return sst;
|
|
};
|
|
auto sst = make_sstable_containing(creator, std::move(mutations));
|
|
column_family_test(cf).add_sstable(sst).get();
|
|
|
|
const auto& stats = sst->get_stats_metadata();
|
|
BOOST_REQUIRE(stats.estimated_tombstone_drop_time.bin.size() == sstables::TOMBSTONE_HISTOGRAM_BIN_SIZE);
|
|
// Asserts that two keys are equal to within a positive delta
|
|
sstable_run run;
|
|
// FIXME: can we ignore return value of insert()?
|
|
(void)run.insert(sst);
|
|
BOOST_REQUIRE(std::fabs(run.estimate_droppable_tombstone_ratio(now, cf.as_compaction_group_view().get_tombstone_gc_state(), cf.schema()) - expired) <= 0.1);
|
|
|
|
auto cd = compaction::compaction_descriptor({ sst });
|
|
cd.enable_garbage_collection(cf->get_sstable_set());
|
|
auto info = compact_sstables(env, std::move(cd), cf, creator).get();
|
|
auto uncompacted_size = sst->data_size();
|
|
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
|
BOOST_REQUIRE(info.new_sstables.front()->estimate_droppable_tombstone_ratio(now, cf.as_compaction_group_view().get_tombstone_gc_state(), cf.schema()) == 0.0f);
|
|
BOOST_REQUIRE_CLOSE(info.new_sstables.front()->data_size(), uncompacted_size*(1-expired), 5);
|
|
auto control = make_strategy_control_for_test(false);
|
|
|
|
// sstable satisfying conditions will be included
|
|
{
|
|
std::map<sstring, sstring> options;
|
|
options.emplace("tombstone_threshold", "0.3f");
|
|
// that's needed because sstable with droppable data should be old enough.
|
|
options.emplace("tombstone_compaction_interval", "1");
|
|
sleep(2s).get();
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, options);
|
|
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
|
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
|
}
|
|
|
|
// sstable with droppable ratio of 0.3 won't be included due to threshold
|
|
{
|
|
std::map<sstring, sstring> options;
|
|
options.emplace("tombstone_threshold", "0.5f");
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, options);
|
|
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
|
}
|
|
// sstable which was recently created won't be included due to min interval
|
|
{
|
|
std::map<sstring, sstring> options;
|
|
options.emplace("tombstone_compaction_interval", "3600");
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, options);
|
|
sstables::test(sst).set_data_file_write_time(db_clock::now());
|
|
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
|
}
|
|
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
|
|
// because the droppable ratio check has been disabled with unchecked_tombstone_compaction set to true
|
|
{
|
|
std::map<sstring, sstring> options;
|
|
options.emplace("tombstone_compaction_interval", "3600");
|
|
options.emplace("tombstone_threshold", "0.5f");
|
|
options.emplace("unchecked_tombstone_compaction", "true");
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, options);
|
|
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
|
|
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(ics_reshape_test) {
|
|
static constexpr unsigned disjoint_sstable_count = 256;
|
|
|
|
return test_env::do_with_async([] (test_env& env) {
|
|
auto builder = schema_builder("tests", "ics_reshape_test")
|
|
.with_column("id", utf8_type, column_kind::partition_key)
|
|
.with_column("cl", ::timestamp_type, column_kind::clustering_key)
|
|
.with_column("value", int32_type);
|
|
builder.set_compaction_strategy(compaction::compaction_strategy_type::incremental);
|
|
constexpr unsigned target_sstable_size_in_mb = 1000;
|
|
std::map <sstring, sstring> opts = {
|
|
{"sstable_size_in_mb", to_sstring(target_sstable_size_in_mb)},
|
|
};
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, opts);
|
|
builder.set_compaction_strategy_options(std::move(opts));
|
|
auto s = builder.build();
|
|
|
|
auto tokens = tests::generate_partition_keys(disjoint_sstable_count, s, local_shard_only::yes);
|
|
|
|
auto make_row = [&](unsigned token_idx) {
|
|
auto key = tokens[token_idx].key();
|
|
|
|
mutation m(s, key);
|
|
auto value = 1;
|
|
auto next_ts = 1;
|
|
auto c_key = clustering_key::from_exploded(*s, {::timestamp_type->decompose(next_ts)});
|
|
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_ts);
|
|
return m;
|
|
};
|
|
|
|
auto tmp = tmpdir();
|
|
|
|
auto sst_gen = [&env, s, &tmp]() {
|
|
return env.make_sstable(s, tmp.path().string(), env.new_generation(), sstables::sstable::version_types::md, big);
|
|
};
|
|
|
|
{
|
|
unsigned sstable_count = s->max_compaction_threshold() * 2;
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
sstables.reserve(sstable_count);
|
|
for (unsigned i = 0; i < sstable_count; i++) {
|
|
auto sst = make_sstable_containing(sst_gen, {make_row(0)});
|
|
sstables.push_back(std::move(sst));
|
|
}
|
|
|
|
auto ret = cs.get_reshaping_job(sstables, s, compaction::reshape_config{.mode = compaction::reshape_mode::strict});
|
|
BOOST_REQUIRE(ret.sstables.size() == unsigned(s->max_compaction_threshold()));
|
|
BOOST_REQUIRE(ret.max_sstable_bytes == target_sstable_size_in_mb*1024*1024);
|
|
}
|
|
|
|
{
|
|
// create set of 256 disjoint ssts and expect that stcs reshape allows them all to be compacted at once
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
sstables.reserve(disjoint_sstable_count);
|
|
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
|
auto sst = make_sstable_containing(sst_gen, {make_row(i)});
|
|
sstables.push_back(std::move(sst));
|
|
}
|
|
|
|
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, compaction::reshape_config{.mode = compaction::reshape_mode::strict}).sstables.size() == disjoint_sstable_count);
|
|
}
|
|
|
|
{
|
|
// create a single run of 256 sstables and expect that reshape will say there's nothing to do.
|
|
|
|
run_id sstable_run_id = run_id::create_random_id();
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
sstables.reserve(disjoint_sstable_count);
|
|
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
|
auto sst = make_sstable_containing(sst_gen, {make_row(i)});
|
|
sstables::test(sst).set_run_identifier(sstable_run_id);
|
|
sstables.push_back(std::move(sst));
|
|
}
|
|
|
|
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, compaction::reshape_config{.mode = compaction::reshape_mode::strict}).sstables.size() == 0);
|
|
}
|
|
|
|
{
|
|
// create set of 256 overlapping ssts and expect that stcs reshape allows only 32 to be compacted at once
|
|
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
sstables.reserve(disjoint_sstable_count);
|
|
for (unsigned i = 0; i < disjoint_sstable_count; i++) {
|
|
auto sst = make_sstable_containing(sst_gen, {make_row(0)});
|
|
sstables.push_back(std::move(sst));
|
|
}
|
|
|
|
BOOST_REQUIRE(cs.get_reshaping_job(sstables, s, compaction::reshape_config{.mode = compaction::reshape_mode::strict}).sstables.size() == uint64_t(s->max_compaction_threshold()));
|
|
}
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(gc_tombstone_with_grace_seconds_test) {
|
|
return test_env::do_with_async([](test_env &env) {
|
|
auto gc_grace_seconds = 5;
|
|
auto schema = schema_builder("tests", "gc_tombstone_with_grace_seconds_test")
|
|
.with_column("id", utf8_type, column_kind::partition_key)
|
|
.with_column("value", byte_type)
|
|
.set_gc_grace_seconds(gc_grace_seconds).build();
|
|
auto sst_factory = env.make_sst_factory(schema);
|
|
|
|
auto now = gc_clock::now();
|
|
// set the expiration time to (now - gc_grace_seconds), so that the tombstone is GC'able when compaction is run
|
|
auto expiration_time = (now - gc_clock::duration(gc_grace_seconds)).time_since_epoch().count();
|
|
mutation mut(schema, tests::generate_partition_key(schema, local_shard_only::yes));
|
|
auto live_cell = atomic_cell::make_live(*byte_type, 0, to_bytes("a"), gc_clock::time_point(gc_clock::duration(expiration_time)), gc_clock::duration(1));
|
|
mut.set_clustered_cell(clustering_key::make_empty(), *schema->get_column_definition("value"), std::move(live_cell));
|
|
auto sst = make_sstable_containing(env.make_sst_factory(schema), {mut});
|
|
|
|
table_for_tests cf = env.make_table_for_tests(schema);
|
|
auto close_cf = deferred_stop(cf);
|
|
column_family_test(cf).add_sstable(sst).get();
|
|
|
|
std::map<sstring, sstring> options;
|
|
// reduce tombstone_compaction_interval to make droppable data old enough for GC.
|
|
options.emplace("tombstone_compaction_interval", "1");
|
|
forward_jump_clocks(std::chrono::seconds{1});
|
|
auto control = make_strategy_control_for_test(false);
|
|
auto cs = compaction::make_compaction_strategy(compaction::compaction_strategy_type::incremental, options);
|
|
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
|
BOOST_REQUIRE_EQUAL(descriptor.sstables.size(), 1);
|
|
BOOST_REQUIRE_EQUAL(descriptor.sstables.front(), sst);
|
|
});
|
|
}
|