Files
scylladb/test/boost/compaction_group_test.cc
Botond Dénes 81e214237f Merge 'Add digests for all sstable components in scylla metadata' from Taras Veretilnyk
This pull request adds support for calculation and storing CRC32 digests for all SSTable components.
This change replaces plain file_writer with crc32_digest_file_writer for all SSTable components that should be checksummed. The resulting component digests are stored in the sstable structure
and later persisted to disk as part of the Scylla metadata component during writer::consume_end_of_stream.
Several test cases where introduced to verify expected behaviour.

Additionally, this PR adds new rewrite component mechanism for safe sstable component rewriting.
Previously, rewriting an sstable component (e.g., via rewrite_statistics) created a temporary file that was renamed to the final name after sealing. This allowed crash recovery by simply removing the temporary file on startup.

However, with component digests stored in scylla_metadata (#20100),
replacing a component like Statistics requires atomically updating both the component
and scylla_metadata with the new digest - impossible with POSIX rename.

The new mechanism creates a clone sstable with a fresh generation:

- Hard-links all components from the source except the component being rewritten and scylla_metadata
- Copies original sstable components pointer and recognized components from the source
- Invokes a modifier callback to adjust the new sstable before rewriting
- Writes the modified component along with updated scylla_metadata containing the new digest
- Seals the new sstable with a temporary TOC
- Replaces the old sstable atomically, the same way as it is done in compaction

This is built on the rewrite_sstables compaction framework to support batch operations (e.g., following incremental repair).
In case of any failure durning the whole process, sstable will be automatically deleted on the node startup due to
temporary toc persistence.

Backport is not required, it is a new feature

Fixes https://github.com/scylladb/scylladb/issues/20100, https://github.com/scylladb/scylladb/issues/27453

Closes scylladb/scylladb#28338

* github.com:scylladb/scylladb:
  docs: document components_digests subcomponent and trailing digest in Scylla.db
  sstable_compaction_test: Add tests for perform_component_rewrite
  sstable_test: add verification testcases of SSTable components digests persistance
  sstables: store digest of all sstable components in scylla metadata
  sstables: replace rewrite_statistics with new rewrite component mechanism
  sstables: add new rewrite component mechanism for safe sstable component rewriting
  compaction: add compaction_group_view method to specify sstable version
  sstables: add null_data_sink and serialized_checksum for checksum-only calculation
  sstables: extract default write open flags into a constant
  sstables: Add write_simple_with_digest for component checksumming
  sstables: Extract file writer closing logic into separate methods
  sstables: Implement CRC32 digest-only writer
2026-03-10 16:02:53 +02:00

274 lines
14 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/sstring.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/aligned_buffer.hh>
#include <seastar/util/closeable.hh>
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_services.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/key_utils.hh"
#include "schema/schema.hh"
#include "schema/schema_builder.hh"
#include "sstables/sstables.hh"
#include "sstables/compress.hh"
#include "compaction/compaction.hh"
#include "compaction/compaction_manager.hh"
#include "replica/compaction_group.hh"
using namespace sstables;
static sstables::shared_sstable generate_sstable(schema_ptr s, std::function<shared_sstable()> sst_gen, noncopyable_function<bool(dht::token)> token_filter) {
auto make_insert = [&] (const dht::decorated_key& key) {
static thread_local int32_t value = 1;
mutation m(s, key);
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), api::timestamp_clock::now().time_since_epoch().count());
return m;
};
auto keys = tests::generate_partition_keys(100, s);
utils::chunked_vector<mutation> muts;
muts.reserve(keys.size());
for (auto& k : keys) {
if (token_filter(k.token())) {
muts.push_back(make_insert(k));
}
}
return make_sstable_containing(sst_gen, std::move(muts));
}
static sstables::shared_sstable sstable_that_needs_split(schema_ptr s, std::function<shared_sstable()> sst_gen) {
return generate_sstable(std::move(s), std::move(sst_gen), [] (dht::token) { return true; });
}
class single_compaction_group : public compaction::compaction_group_view {
private:
schema_ptr _schema;
sstables::sstables_manager& _sst_man;
sstables::sstable_set _main_set;
sstables::sstable_set _maintenance_set;
std::vector<sstables::shared_sstable> _compacted_undeleted_sstables;
mutable compaction::compaction_strategy _compaction_strategy;
compaction::compaction_strategy_state _compaction_strategy_state;
tombstone_gc_state _tombstone_gc_state;
compaction::compaction_backlog_tracker _backlog_tracker;
condition_variable _staging_done_condition;
std::function<shared_sstable()> _sstable_factory;
mutable tests::reader_concurrency_semaphore_wrapper _semaphore;
public:
single_compaction_group(table_for_tests& t, sstables::sstables_manager& sst_man, std::function<shared_sstable()> sstable_factory)
: _schema(t.schema())
, _sst_man(sst_man)
, _main_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
, _compaction_strategy(compaction::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy))
, _tombstone_gc_state(tombstone_gc_state::for_tests())
, _backlog_tracker(_compaction_strategy.make_backlog_tracker())
, _sstable_factory(std::move(sstable_factory))
{
t->get_compaction_manager().add(*this);
}
future<> stop(table_for_tests& t) {
return t->get_compaction_manager().remove(*this);
}
void rebuild_main_set(std::vector<shared_sstable> to_add, std::vector<shared_sstable> to_remove) {
for (auto& sst : to_remove) {
_main_set.erase(sst);
}
for (auto& sst : to_add) {
_main_set.insert(sst);
}
}
virtual dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); }
virtual const schema_ptr& schema() const noexcept override { return _schema; }
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
virtual compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state, sstables::sstable_version_types) const override { return _sstable_factory(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return api::min_timestamp; }
virtual bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
virtual future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
testlog.info("Adding {} sstable(s), removing {} sstables", desc.new_sstables.size(), desc.old_sstables.size());
rebuild_main_set(desc.new_sstables, desc.old_sstables);
return make_ready_future<>();
}
virtual bool is_auto_compaction_disabled_by_user() const noexcept override { return false; }
virtual bool tombstone_gc_enabled() const noexcept override { return false; }
virtual tombstone_gc_state get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; }
virtual compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; }
virtual const std::string get_group_id() const noexcept override { return "0"; }
virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; }
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); }
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
};
SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "compaction_group_splitting")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
auto s = builder.build();
auto t = env.make_table_for_tests(s);
auto close_table = deferred_stop(t);
t->start();
auto sst_factory = env.make_sst_factory(s);
auto classifier = [] (dht::token t) -> mutation_writer::token_group_id {
return dht::compaction_group_of(1, t);
};
auto sstable_needs_split = [&] (const sstables::shared_sstable& sst) {
return classifier(sst->get_first_decorated_key().token()) != classifier(sst->get_last_decorated_key().token());
};
auto run_test = [&] (std::vector<sstables::shared_sstable> ssts, size_t expected_output, noncopyable_function<void(const sstables::shared_sstable&)> validate) {
auto compaction_group = std::make_unique<single_compaction_group>(t, env.manager(), sst_factory);
compaction_group->rebuild_main_set(ssts, {});
auto& cm = t->get_compaction_manager();
auto expected_compaction_size = std::ranges::fold_left(ssts | std::views::transform([&] (auto& sst) {
// sstables that doesn't need split will have compaction bypassed.
return sstable_needs_split(sst) ? sst->bytes_on_disk() : size_t(0);
}), int64_t(0), std::plus{});
auto ret = cm.perform_split_compaction(*compaction_group, compaction::compaction_type_options::split{classifier}, tasks::task_info{}).get();
BOOST_REQUIRE_EQUAL(ret->start_size, expected_compaction_size);
BOOST_REQUIRE(compaction_group->main_sstable_set().get()->size() == expected_output);
compaction_group->main_sstable_set().get()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(!sstable_needs_split(sst));
validate(sst);
});
compaction_group->stop(t).get();
};
// sstable that needs split case will generate 2 sstables, one for left, another for right.
{
auto input = sstable_that_needs_split(s, sst_factory);
std::unordered_set<mutation_writer::token_group_id> expected_ids { 0, 1 };
run_test({ input }, 2, [&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(expected_ids.erase(classifier(sst->get_first_decorated_key().token())) == 1);
});
BOOST_REQUIRE(expected_ids.empty());
}
// sstable that doesn't need split won't actually be compacted
{
auto input = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; });
run_test({ input }, 1, [&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(sst->generation() == input->generation());
BOOST_REQUIRE_EQUAL(0, classifier(sst->get_first_decorated_key().token()));
});
}
// combination of both cases
{
auto input1 = sstable_that_needs_split(s, sst_factory);
auto input2 = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; });
bool found_input2 = false;
run_test({ input1, input2 }, 3, [&] (const sstables::shared_sstable& sst) {
found_input2 |= sst->generation() == input2->generation();
});
BOOST_REQUIRE(found_input2);
}
});
}
static mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, reader_permit permit) {
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
}
SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "compactions_dont_cross_group_boundary")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
auto s = builder.build();
auto t = env.make_table_for_tests(s);
auto close_table = deferred_stop(t);
t->start();
// Disable auto compaction to allow us to trigger compaction manually later.
t->disable_auto_compaction().get();
auto is_unrepaired = [] (dht::token t) { return t.raw() % 3 == 0; };
auto is_repairing = [] (dht::token t) { return t.raw() % 3 == 1; };
auto is_repaired = [] (dht::token t) { return t.raw() % 3 == 2; };
auto sst_factory = env.make_sst_factory(s);
auto generate_sstables = [&] (std::function<bool(dht::token)> filter) {
for (int i = 0; i < 4; i++) {
t->add_sstable_and_update_cache(generate_sstable(s, sst_factory, filter)).get();
}
};
generate_sstables(is_unrepaired);
generate_sstables(is_repairing);
generate_sstables(is_repaired);
auto repair_token_classifier = [&] (dht::token t) -> replica::repair_sstable_classification {
if (is_unrepaired(t)) {
return replica::repair_sstable_classification::unrepaired;
} else if (is_repairing(t)) {
return replica::repair_sstable_classification::repairing;
}
return replica::repair_sstable_classification::repaired;
};
auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) -> replica::repair_sstable_classification {
return repair_token_classifier(sst->get_first_decorated_key().token());
};
t.set_repair_sstable_classifier(repair_sstable_classifier);
for (int i = 0; i < 4; i++) {
t->compact_all_sstables({}).get();
}
auto validate_sstable = [&] (const sstables::shared_sstable& sst) {
auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive.
auto close_reader = deferred_close(reader);
auto expected_classification = repair_sstable_classifier(sst, 0);
while (auto m = read_mutation_from_mutation_reader(reader).get()) {
BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification);
}
};
auto all_sstables = t->get_sstables();
for (auto& sst : *all_sstables) {
validate_sstable(sst);
}
});
}