From f2f76f64450d5c34244fab6ab1900ca601f66548 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:23:42 +0200 Subject: [PATCH] tombstone_gc: Add optional replay_position to repair_history Adds an optional replay_position to the (transient) replay history, as well as provides this (when available) to gc_min_time callback checks. The idea is that when set (not zero rp), this can be used to limit the commitlog segments from which we check timestamps, potentially filtering out older ones. Of couse, whomever sets the attribute in the history must ensure that data below this point cannot be replayed. Note: this does not affect the persisted repair history, only node-local, transient data. --- main.cc | 4 ++-- test/boost/row_cache_test.cc | 4 ++-- test/lib/cql_test_env.cc | 4 ++-- tombstone_gc-internals.hh | 16 +++++++++++++++- tombstone_gc.cc | 35 ++++++++++++++++++++--------------- tombstone_gc.hh | 13 ++++++++----- 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/main.cc b/main.cc index 156d3b2bba..73f743b20c 100644 --- a/main.cc +++ b/main.cc @@ -2054,12 +2054,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id, const db::replay_position& rp) { auto t = db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index efd8e032ac..ee4eeb3587 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4527,7 +4527,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones) schema_ptr s = t.schema(); // emulate commitlog behaivor - t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id) { + t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); @@ -4722,7 +4722,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) { tombstone_gc_state gc_state(gc_shared_state); // emulate commitlog behaivor - gc_shared_state.set_gc_time_min_source([&s](const table_id& id) { + gc_shared_state.set_gc_time_min_source([&s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index a8f67baf13..514db2be1d 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1030,12 +1030,12 @@ private: }); _cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id, const db::replay_position& rp) { auto t = _db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/tombstone_gc-internals.hh b/tombstone_gc-internals.hh index 0b75282e6a..e0b9418cac 100644 --- a/tombstone_gc-internals.hh +++ b/tombstone_gc-internals.hh @@ -6,7 +6,21 @@ #include "tombstone_gc.hh" #include -using repair_history_map = boost::icl::interval_map; +/** + * Holds a repair history entry for a given token range. + * Timestamp is time of last repair, replay_position is an + * optional flush mark for the table in question which is + * also marked in the commitlog cleanup table. I.e. the + * lowest known position for which a replay might occur. + */ +struct repair_history_entry { + gc_clock::time_point timestamp; + db::replay_position replay_position; + + auto operator<=>(const repair_history_entry&) const noexcept = default; +}; + +using repair_history_map = boost::icl::interval_map; class repair_history_map_ptr { lw_shared_ptr _ptr; diff --git a/tombstone_gc.cc b/tombstone_gc.cc index ee438ad7ff..7ed58e30a9 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -96,6 +96,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be auto max_repair_timestamp = gc_clock::time_point::min(); int hits = 0; knows_entire_range = false; + db::replay_position min_rp; + if (_shared_state && _shared_state->is_table_rf_one(s->id())) { // We don't have repair history, but the table is RF=1 so we return the same as tombstone_gc_mode::immediate would. auto t = check_min(s, query_time); @@ -109,8 +111,9 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be bool contains_all = false; for (const auto& [i, s] = m->equal_range(interval); auto& x : std::ranges::subrange(i, s)) { auto r = locator::token_metadata::interval_to_range(x.first); - min = std::min(x.second, min); - max = std::max(x.second, max); + min = std::min(x.second.timestamp, min); + max = std::max(x.second.timestamp, max); + min_rp = min_rp.valid() ? std::min(min_rp, x.second.replay_position) : min_rp; if (++hits == 1 && r.contains(range, dht::token_comparator{})) { contains_all = true; } @@ -123,8 +126,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be min_repair_timestamp = min; max_repair_timestamp = max; } - min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay)); - max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay)); + min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay), min_rp); + max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay), min_rp); }; dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}", s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range); @@ -141,9 +144,9 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair; } -gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const { +gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t, const db::replay_position& rp) const { if (_check_commitlog && _shared_state && t != gc_clock::time_point::min()) { - return std::min(t, _shared_state->get_gc_min_time(s->id())); + return std::min(t, _shared_state->get_gc_min_time(s->id(), rp)); } return t; } @@ -180,6 +183,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); auto gc_before = gc_clock::time_point::min(); auto repair_timestamp = gc_clock::time_point::min(); + db::replay_position rp; if (_shared_state && _shared_state->is_table_rf_one(s->id())) { gc_before = query_time; } else if (auto m = get_repair_history_for_table(s->id()); m) { @@ -187,11 +191,12 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con if (it == m->end()) { gc_before = gc_clock::time_point::min(); } else { - repair_timestamp = it->second; + repair_timestamp = it->second.timestamp; + rp = it->second.replay_position; gc_before = saturating_subtract(repair_timestamp, propagation_delay); } } - gc_before = check_min(s, gc_before); + gc_before = check_min(s, gc_before, rp); dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}", s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before); return gc_before; @@ -225,19 +230,19 @@ const per_table_history_maps& shared_tombstone_gc_state::get_reconcile_history_m return *_reconcile_history_maps; } -static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { +static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time, db::replay_position rp) { auto [it, inserted] = reconcile_history_maps.try_emplace(id, lw_shared_ptr(nullptr)); if (inserted || !it->second) { // check for failed past update, leaving behind nullptr it->second = seastar::make_lw_shared(); } else { it->second = seastar::make_lw_shared(*it->second); } - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{ repair_time, rp }); } -void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { - mutate_repair_history([id, &range, repair_time] (per_table_history_maps& maps) { - do_update_repair_time(maps, id, range, repair_time); +void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp rp) { + mutate_repair_history([id, &range, repair_time, rp] (per_table_history_maps& maps) { + do_update_repair_time(maps, id, range, repair_time, rp.value_or({})); }); } @@ -254,7 +259,7 @@ void shared_tombstone_gc_state::batch_update_repair_time(table_id id, std::span< it->second = seastar::make_lw_shared(*it->second); } for (const auto& [range, repair_time] : updates) { - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{repair_time, db::replay_position{}}); } }); } @@ -275,7 +280,7 @@ future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::da for (auto& update : x.second) { co_await coroutine::maybe_yield(); if (update.shard == this_shard_id()) { - do_update_repair_time(*shard_maps, table, update.range, update.time); + do_update_repair_time(*shard_maps, table, update.range, update.time, {}); dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}", table, update.range, update.time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 6d4e834013..12f2874505 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -9,8 +9,10 @@ #pragma once #include +#include #include #include "gc_clock.hh" +#include "db/commitlog/replay_position.hh" #include "dht/token.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/token_metadata.hh" @@ -47,7 +49,7 @@ using per_table_history_maps = std::unordered_map; +using gc_time_min_source = std::function; struct range_repair_time { dht::token_range range; @@ -91,8 +93,8 @@ public: _gc_min_source = std::move(src); } - gc_clock::time_point get_gc_min_time(const table_id& tid) const noexcept { - return _gc_min_source ? _gc_min_source(tid) : gc_clock::time_point::max(); + gc_clock::time_point get_gc_min_time(const table_id& tid, const db::replay_position& rp = {}) const noexcept { + return _gc_min_source ? _gc_min_source(tid, rp) : gc_clock::time_point::max(); } void set_table_rf_one(table_id id) { @@ -108,7 +110,8 @@ public: return _rf_one_tables.contains(id); } - void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); + using opt_rp = std::optional; + void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp = {}); // A single (range, repair_time) pair used by batch_update_repair_time. using repair_time_update = std::pair; @@ -148,7 +151,7 @@ private: bool _check_commitlog{true}; private: - [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const; + [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point, const db::replay_position& = {}) const; [[nodiscard]] repair_history_map_ptr get_repair_history_for_table(const table_id& id) const;