Files
scylladb/test/boost/compaction_group_test.cc
Raphael S. Carvalho 474e962e01 compaction: Restrict tombstone GC sstable set to repaired sstables for tombstone_gc=repair mode
When tombstone_gc=repair, the repaired compaction view's sstable_set_for_tombstone_gc()
previously returned all sstables across all three views (unrepaired, repairing, repaired).
This is correct but unnecessarily expensive: the unrepaired and repairing sets are never
the source of a GC-blocking shadow when tombstone_gc=repair, for base tables.

The key ordering guarantee that makes this safe is:
- topology_coordinator sends send_tablet_repair RPC and waits for it to complete.
  Inside that RPC, mark_sstable_as_repaired() runs on all replicas, moving D from
  repairing → repaired (repaired_at stamped on disk).
- Only after the RPC returns does the coordinator commit repair_time + sstables_repaired_at
  to Raft.
- gc_before = repair_time - propagation_delay only advances once that Raft commit applies.

Therefore, when a tombstone T in the repaired set first becomes GC-eligible (its
deletion_time < gc_before), any data D it shadows is already in the repaired set on
every replica. This holds because:
- The memtable is flushed before the repairing snapshot is taken (take_storage_snapshot
  calls sg->flush()), capturing all data present at repair time.
- Hints and batchlog are flushed before the snapshot, ensuring remotely-hinted writes
  arrive before the snapshot boundary.
- Legitimate unrepaired data has timestamps close to 'now', always newer than any
  GC-eligible tombstone (USING TIMESTAMP to write backdated data is user error / UB).

Excluding the repairing and unrepaired sets from the GC shadow check cannot cause any
tombstone to be wrongly collected. The memtable check is also skipped for the same
reason: memtable data is either newer than the GC-eligible tombstone, or was flushed
into the repairing/repaired set before gc_before advanced.

Safety restriction — materialized views:
The optimization IS applied to materialized view tables. Two possible paths could inject
D_view into the MV's unrepaired set after MV repair: view hints and staging via the
view-update-generator. Both are safe:

(1) View hints: flush_hints() creates a sync point covering BOTH _hints_manager (base
mutations) AND _hints_for_views_manager (view mutations). It waits until ALL pending view
hints — including D_view entries queued in _hints_for_views_manager while the target MV
replica was down — have been replayed to the target node before take_storage_snapshot() is
called. D_view therefore lands in the MV's repairing sstable and is promoted to repaired.
When a repaired compaction then checks for shadows it finds D_view in the repaired set,
keeping T_mv non-purgeable.

(2) View-update-generator staging path: Base table repair can write a missing D_base to a
replica via a staging sstable. The view-update-generator processes the staging sstable
ASYNCHRONOUSLY: it may fire arbitrarily later, even after MV repair has committed
repair_time and T_mv has been GC'd from the repaired set. However, the staging processor
calls stream_view_replica_updates() which performs a READ-BEFORE-WRITE via
as_mutation_source_excluding_staging(): it reads the CURRENT base table state before
building the view update. If T_base was written to the base table (as it always is before
the base replica can be repaired and the MV tombstone can become GC-eligible), the
view_update_builder sees T_base as the existing partition tombstone. D_base's row marker
(ts_d < ts_t) is expired by T_base, so the view update is a no-op: D_view is never
dispatched to the MV replica. No resurrection can occur regardless of how long staging is
delayed.

A potential sub-edge-case is T_base being purged BEFORE staging fires (leaving D_base as
the sole survivor, so stream_view_replica_updates would dispatch D_view). This is blocked
by an additional invariant: for tablet-based tables, the repair writer stamps repaired_at
on staging sstables (repair_writer_impl::create_writer sets mark_as_repaired = true and
perform_component_rewrite writes repaired_at = sstables_repaired_at + 1 on every staging
sstable). After base repair commits sstables_repaired_at to Raft, the staging sstable
satisfies is_repaired(sstables_repaired_at, staging_sst) and therefore appears in
make_repaired_sstable_set(). Any subsequent base repair that advances sstables_repaired_at
further still includes the staging sstable (its repaired_at ≤ new sstables_repaired_at).
D_base in the staging sstable thus shadows T_base in every repaired compaction's shadow
check, keeping T_base non-purgeable as long as D_base remains in staging.

A base table hint also cannot bypass this. A base hint is replayed as a base mutation. The
resulting view update is generated synchronously on the base replica and sent to the MV
replica via _hints_for_views_manager (path 1 above), not via staging.

USING TIMESTAMP with timestamps predating (gc_before + propagation_delay) is explicitly
UB and excluded from the safety argument.

For tombstone_gc modes other than repair (timeout, immediate, disabled) the invariant
does not hold for base tables either, so the full storage-group set is returned.

Implementation:
- Add compaction_group::is_repaired_view(v): pointer comparison against _repaired_view.
- Add compaction_group::make_repaired_sstable_set(): iterates _main_sstables and inserts
  only sstables classified as repaired (repair::is_repaired(sstables_repaired_at, sst)).
- Add storage_group::make_repaired_sstable_set(): collects repaired sstables across all
  compaction groups in the storage group.
- Add table::make_repaired_sstable_set_for_tombstone_gc(): collects repaired sstables from
  all compaction groups across all storage groups (needed for multi-tablet tables).
- Add compaction_group_view::skip_memtable_for_tombstone_gc(): returns true iff the
  repaired-only optimization is active; used by get_max_purgeable_timestamp() in
  compaction.cc to bypass the memtable shadow check.
- is_tombstone_gc_repaired_only() private helper gates both methods: requires
  is_repaired_view(this) && tombstone_gc_mode == repair. No is_view() exclusion.
- Add error injection "view_update_generator_pause_before_processing" in
  process_staging_sstables() to support testing the staging-delay scenario.
- New test test_tombstone_gc_mv_optimization_safe_via_hints: stops servers[2], writes
  D_base + T_base (view hints queued for servers[2]'s MV replica), restarts, runs MV
  tablet repair (flush_hints delivers D_view + T_mv before snapshot), triggers repaired
  compaction, and asserts the MV row is NOT visible — T_mv preserved because D_view
  landed in the repaired set via the hints-before-snapshot path.
- New test test_tombstone_gc_mv_safe_staging_processor_delay: runs base repair before
  writing T_base so D_base is staged on servers[0] via row-sync; blocks the
  view-update-generator with an error injection; writes T_base + T_mv; runs MV repair
  (fast path, T_mv GC-eligible); triggers repaired compaction (T_mv purged — no D_view
  in repaired set); asserts no resurrection; releases injection; waits for staging to
  complete; asserts no resurrection after a second flush+compaction. Demonstrates that
  the read-before-write in stream_view_replica_updates() makes the optimization safe even
  when staging fires after T_mv has been GC'd.

The expected gain is reduced bloom filter and memtable key-lookup I/O during repaired
compactions: the unrepaired set is typically the largest (it holds all recent writes),
yet for tombstone_gc=repair it never influences GC decisions.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-20 16:59:09 -03:00

275 lines
14 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <seastar/core/sstring.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/aligned_buffer.hh>
#include <seastar/util/closeable.hh>
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_services.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/key_utils.hh"
#include "schema/schema.hh"
#include "schema/schema_builder.hh"
#include "sstables/sstables.hh"
#include "sstables/compress.hh"
#include "compaction/compaction.hh"
#include "compaction/compaction_manager.hh"
#include "replica/compaction_group.hh"
using namespace sstables;
static sstables::shared_sstable generate_sstable(schema_ptr s, std::function<shared_sstable()> sst_gen, noncopyable_function<bool(dht::token)> token_filter) {
auto make_insert = [&] (const dht::decorated_key& key) {
static thread_local int32_t value = 1;
mutation m(s, key);
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), api::timestamp_clock::now().time_since_epoch().count());
return m;
};
auto keys = tests::generate_partition_keys(100, s);
utils::chunked_vector<mutation> muts;
muts.reserve(keys.size());
for (auto& k : keys) {
if (token_filter(k.token())) {
muts.push_back(make_insert(k));
}
}
return make_sstable_containing(sst_gen, std::move(muts));
}
static sstables::shared_sstable sstable_that_needs_split(schema_ptr s, std::function<shared_sstable()> sst_gen) {
return generate_sstable(std::move(s), std::move(sst_gen), [] (dht::token) { return true; });
}
class single_compaction_group : public compaction::compaction_group_view {
private:
schema_ptr _schema;
sstables::sstables_manager& _sst_man;
sstables::sstable_set _main_set;
sstables::sstable_set _maintenance_set;
std::vector<sstables::shared_sstable> _compacted_undeleted_sstables;
mutable compaction::compaction_strategy _compaction_strategy;
compaction::compaction_strategy_state _compaction_strategy_state;
tombstone_gc_state _tombstone_gc_state;
compaction::compaction_backlog_tracker _backlog_tracker;
condition_variable _staging_done_condition;
std::function<shared_sstable()> _sstable_factory;
mutable tests::reader_concurrency_semaphore_wrapper _semaphore;
public:
single_compaction_group(table_for_tests& t, sstables::sstables_manager& sst_man, std::function<shared_sstable()> sstable_factory)
: _schema(t.schema())
, _sst_man(sst_man)
, _main_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
, _maintenance_set(sstables::make_partitioned_sstable_set(_schema, token_range()))
, _compaction_strategy(compaction::make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _compaction_strategy_state(compaction::compaction_strategy_state::make(_compaction_strategy))
, _tombstone_gc_state(tombstone_gc_state::for_tests())
, _backlog_tracker(_compaction_strategy.make_backlog_tracker())
, _sstable_factory(std::move(sstable_factory))
{
t->get_compaction_manager().add(*this);
}
future<> stop(table_for_tests& t) {
return t->get_compaction_manager().remove(*this);
}
void rebuild_main_set(std::vector<shared_sstable> to_add, std::vector<shared_sstable> to_remove) {
for (auto& sst : to_remove) {
_main_set.erase(sst);
}
for (auto& sst : to_add) {
_main_set.insert(sst);
}
}
virtual dht::token_range token_range() const noexcept override { return dht::token_range::make(dht::first_token(), dht::last_token()); }
virtual const schema_ptr& schema() const noexcept override { return _schema; }
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual bool skip_memtable_for_tombstone_gc() const noexcept override { return false; }
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
virtual compaction::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); }
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); }
virtual sstables::shared_sstable make_sstable(sstables::sstable_state, sstables::sstable_version_types) const override { return _sstable_factory(); }
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); }
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
virtual api::timestamp_type min_memtable_live_row_marker_timestamp() const override { return api::min_timestamp; }
virtual bool memtable_has_key(const dht::decorated_key& key) const override { return false; }
virtual future<> on_compaction_completion(compaction::compaction_completion_desc desc, sstables::offstrategy offstrategy) override {
testlog.info("Adding {} sstable(s), removing {} sstables", desc.new_sstables.size(), desc.old_sstables.size());
rebuild_main_set(desc.new_sstables, desc.old_sstables);
return make_ready_future<>();
}
virtual bool is_auto_compaction_disabled_by_user() const noexcept override { return false; }
virtual bool tombstone_gc_enabled() const noexcept override { return false; }
virtual tombstone_gc_state get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; }
virtual compaction::compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; }
virtual const std::string get_group_id() const noexcept override { return "0"; }
virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; }
dht::token_range get_token_range_after_split(const dht::token& t) const noexcept override { return dht::token_range(); }
int64_t get_sstables_repaired_at() const noexcept override { return 0; }
};
SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "compaction_group_splitting")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
auto s = builder.build();
auto t = env.make_table_for_tests(s);
auto close_table = deferred_stop(t);
t->start();
auto sst_factory = env.make_sst_factory(s);
auto classifier = [] (dht::token t) -> mutation_writer::token_group_id {
return dht::compaction_group_of(1, t);
};
auto sstable_needs_split = [&] (const sstables::shared_sstable& sst) {
return classifier(sst->get_first_decorated_key().token()) != classifier(sst->get_last_decorated_key().token());
};
auto run_test = [&] (std::vector<sstables::shared_sstable> ssts, size_t expected_output, noncopyable_function<void(const sstables::shared_sstable&)> validate) {
auto compaction_group = std::make_unique<single_compaction_group>(t, env.manager(), sst_factory);
compaction_group->rebuild_main_set(ssts, {});
auto& cm = t->get_compaction_manager();
auto expected_compaction_size = std::ranges::fold_left(ssts | std::views::transform([&] (auto& sst) {
// sstables that doesn't need split will have compaction bypassed.
return sstable_needs_split(sst) ? sst->bytes_on_disk() : size_t(0);
}), int64_t(0), std::plus{});
auto ret = cm.perform_split_compaction(*compaction_group, compaction::compaction_type_options::split{classifier}, tasks::task_info{}).get();
BOOST_REQUIRE_EQUAL(ret->start_size, expected_compaction_size);
BOOST_REQUIRE(compaction_group->main_sstable_set().get()->size() == expected_output);
compaction_group->main_sstable_set().get()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(!sstable_needs_split(sst));
validate(sst);
});
compaction_group->stop(t).get();
};
// sstable that needs split case will generate 2 sstables, one for left, another for right.
{
auto input = sstable_that_needs_split(s, sst_factory);
std::unordered_set<mutation_writer::token_group_id> expected_ids { 0, 1 };
run_test({ input }, 2, [&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(expected_ids.erase(classifier(sst->get_first_decorated_key().token())) == 1);
});
BOOST_REQUIRE(expected_ids.empty());
}
// sstable that doesn't need split won't actually be compacted
{
auto input = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; });
run_test({ input }, 1, [&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(sst->generation() == input->generation());
BOOST_REQUIRE_EQUAL(0, classifier(sst->get_first_decorated_key().token()));
});
}
// combination of both cases
{
auto input1 = sstable_that_needs_split(s, sst_factory);
auto input2 = generate_sstable(s, sst_factory, [&] (dht::token t) { return classifier(t) == 0; });
bool found_input2 = false;
run_test({ input1, input2 }, 3, [&] (const sstables::shared_sstable& sst) {
found_input2 |= sst->generation() == input2->generation();
});
BOOST_REQUIRE(found_input2);
}
});
}
static mutation_reader sstable_reader(shared_sstable sst, schema_ptr s, reader_permit permit) {
return sst->as_mutation_source().make_mutation_reader(s, std::move(permit), query::full_partition_range, s->full_slice());
}
SEASTAR_TEST_CASE(compactions_dont_cross_group_boundary_test) {
return test_env::do_with_async([] (test_env& env) {
auto builder = schema_builder("tests", "compactions_dont_cross_group_boundary")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("cl", int32_type, column_kind::clustering_key)
.with_column("value", int32_type);
auto s = builder.build();
auto t = env.make_table_for_tests(s);
auto close_table = deferred_stop(t);
t->start();
// Disable auto compaction to allow us to trigger compaction manually later.
t->disable_auto_compaction().get();
auto is_unrepaired = [] (dht::token t) { return t.raw() % 3 == 0; };
auto is_repairing = [] (dht::token t) { return t.raw() % 3 == 1; };
auto is_repaired = [] (dht::token t) { return t.raw() % 3 == 2; };
auto sst_factory = env.make_sst_factory(s);
auto generate_sstables = [&] (std::function<bool(dht::token)> filter) {
for (int i = 0; i < 4; i++) {
t->add_sstable_and_update_cache(generate_sstable(s, sst_factory, filter)).get();
}
};
generate_sstables(is_unrepaired);
generate_sstables(is_repairing);
generate_sstables(is_repaired);
auto repair_token_classifier = [&] (dht::token t) -> replica::repair_sstable_classification {
if (is_unrepaired(t)) {
return replica::repair_sstable_classification::unrepaired;
} else if (is_repairing(t)) {
return replica::repair_sstable_classification::repairing;
}
return replica::repair_sstable_classification::repaired;
};
auto repair_sstable_classifier = [&] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) -> replica::repair_sstable_classification {
return repair_token_classifier(sst->get_first_decorated_key().token());
};
t.set_repair_sstable_classifier(repair_sstable_classifier);
for (int i = 0; i < 4; i++) {
t->compact_all_sstables({}).get();
}
auto validate_sstable = [&] (const sstables::shared_sstable& sst) {
auto reader = sstable_reader(sst, s, env.make_reader_permit()); // reader holds sst and s alive.
auto close_reader = deferred_close(reader);
auto expected_classification = repair_sstable_classifier(sst, 0);
while (auto m = read_mutation_from_mutation_reader(reader).get()) {
BOOST_REQUIRE(repair_token_classifier(m->decorated_key().token()) == expected_classification);
}
};
auto all_sstables = t->get_sstables();
for (auto& sst : *all_sstables) {
validate_sstable(sst);
}
});
}