tombstone_gc: Add optional replay_position to repair_history

Adds an optional replay_position to the (transient) replay history,
as well as provides this (when available) to gc_min_time callback
checks.
The idea is that when set (not zero rp), this can be used to
limit the commitlog segments from which we check timestamps,
potentially filtering out older ones.
Of couse, whomever sets the attribute in the history must
ensure that data below this point cannot be replayed.

Note: this does not affect the persisted repair history,
only node-local, transient data.
This commit is contained in:
Calle Wilund
2026-05-12 10:23:42 +02:00
parent b82d16d6c8
commit f2f76f6445
6 changed files with 49 additions and 27 deletions

View File

@@ -2054,12 +2054,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
cm.invoke_on_all([&](compaction::compaction_manager& cm) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id, const db::replay_position& rp) {
auto t = db.local().get_tables_metadata().get_table_if_exists(id);
if (t && t->ready_for_writes()) {
auto* cl = t->commitlog();
if (cl) {
return cl->min_gc_time(id);
return cl->min_gc_time(id, rp);
}
}
return gc_clock::time_point::max();

View File

@@ -4527,7 +4527,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones)
schema_ptr s = t.schema();
// emulate commitlog behaivor
t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id) {
t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id, auto&) {
return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600));
});
@@ -4722,7 +4722,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) {
tombstone_gc_state gc_state(gc_shared_state);
// emulate commitlog behaivor
gc_shared_state.set_gc_time_min_source([&s](const table_id& id) {
gc_shared_state.set_gc_time_min_source([&s](const table_id& id, auto&) {
return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600));
});

View File

@@ -1030,12 +1030,12 @@ private:
});
_cm.invoke_on_all([&](compaction::compaction_manager& cm) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id) {
cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id, const db::replay_position& rp) {
auto t = _db.local().get_tables_metadata().get_table_if_exists(id);
if (t && t->ready_for_writes()) {
auto* cl = t->commitlog();
if (cl) {
return cl->min_gc_time(id);
return cl->min_gc_time(id, rp);
}
}
return gc_clock::time_point::max();

View File

@@ -6,7 +6,21 @@
#include "tombstone_gc.hh"
#include <boost/icl/interval_map.hpp>
using repair_history_map = boost::icl::interval_map<dht::token, gc_clock::time_point, boost::icl::partial_absorber, std::less, boost::icl::inplace_max>;
/**
* Holds a repair history entry for a given token range.
* Timestamp is time of last repair, replay_position is an
* optional flush mark for the table in question which is
* also marked in the commitlog cleanup table. I.e. the
* lowest known position for which a replay might occur.
*/
struct repair_history_entry {
gc_clock::time_point timestamp;
db::replay_position replay_position;
auto operator<=>(const repair_history_entry&) const noexcept = default;
};
using repair_history_map = boost::icl::interval_map<dht::token, repair_history_entry, boost::icl::partial_absorber, std::less, boost::icl::inplace_max>;
class repair_history_map_ptr {
lw_shared_ptr<repair_history_map> _ptr;

View File

@@ -96,6 +96,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
auto max_repair_timestamp = gc_clock::time_point::min();
int hits = 0;
knows_entire_range = false;
db::replay_position min_rp;
if (_shared_state && _shared_state->is_table_rf_one(s->id())) {
// We don't have repair history, but the table is RF=1 so we return the same as tombstone_gc_mode::immediate would.
auto t = check_min(s, query_time);
@@ -109,8 +111,9 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
bool contains_all = false;
for (const auto& [i, s] = m->equal_range(interval); auto& x : std::ranges::subrange(i, s)) {
auto r = locator::token_metadata::interval_to_range(x.first);
min = std::min(x.second, min);
max = std::max(x.second, max);
min = std::min(x.second.timestamp, min);
max = std::max(x.second.timestamp, max);
min_rp = min_rp.valid() ? std::min(min_rp, x.second.replay_position) : min_rp;
if (++hits == 1 && r.contains(range, dht::token_comparator{})) {
contains_all = true;
}
@@ -123,8 +126,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
min_repair_timestamp = min;
max_repair_timestamp = max;
}
min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay));
max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay));
min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay), min_rp);
max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay), min_rp);
};
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}",
s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range);
@@ -141,9 +144,9 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept
return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair;
}
gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const {
gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t, const db::replay_position& rp) const {
if (_check_commitlog && _shared_state && t != gc_clock::time_point::min()) {
return std::min(t, _shared_state->get_gc_min_time(s->id()));
return std::min(t, _shared_state->get_gc_min_time(s->id(), rp));
}
return t;
}
@@ -180,6 +183,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con
const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
auto gc_before = gc_clock::time_point::min();
auto repair_timestamp = gc_clock::time_point::min();
db::replay_position rp;
if (_shared_state && _shared_state->is_table_rf_one(s->id())) {
gc_before = query_time;
} else if (auto m = get_repair_history_for_table(s->id()); m) {
@@ -187,11 +191,12 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con
if (it == m->end()) {
gc_before = gc_clock::time_point::min();
} else {
repair_timestamp = it->second;
repair_timestamp = it->second.timestamp;
rp = it->second.replay_position;
gc_before = saturating_subtract(repair_timestamp, propagation_delay);
}
}
gc_before = check_min(s, gc_before);
gc_before = check_min(s, gc_before, rp);
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}",
s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before);
return gc_before;
@@ -225,19 +230,19 @@ const per_table_history_maps& shared_tombstone_gc_state::get_reconcile_history_m
return *_reconcile_history_maps;
}
static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time) {
static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time, db::replay_position rp) {
auto [it, inserted] = reconcile_history_maps.try_emplace(id, lw_shared_ptr<repair_history_map>(nullptr));
if (inserted || !it->second) { // check for failed past update, leaving behind nullptr
it->second = seastar::make_lw_shared<repair_history_map>();
} else {
it->second = seastar::make_lw_shared<repair_history_map>(*it->second);
}
*it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
*it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{ repair_time, rp });
}
void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) {
mutate_repair_history([id, &range, repair_time] (per_table_history_maps& maps) {
do_update_repair_time(maps, id, range, repair_time);
void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp rp) {
mutate_repair_history([id, &range, repair_time, rp] (per_table_history_maps& maps) {
do_update_repair_time(maps, id, range, repair_time, rp.value_or({}));
});
}
@@ -254,7 +259,7 @@ void shared_tombstone_gc_state::batch_update_repair_time(table_id id, std::span<
it->second = seastar::make_lw_shared<repair_history_map>(*it->second);
}
for (const auto& [range, repair_time] : updates) {
*it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time);
*it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{repair_time, db::replay_position{}});
}
});
}
@@ -275,7 +280,7 @@ future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::da
for (auto& update : x.second) {
co_await coroutine::maybe_yield();
if (update.shard == this_shard_id()) {
do_update_repair_time(*shard_maps, table, update.range, update.time);
do_update_repair_time(*shard_maps, table, update.range, update.time, {});
dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}",
table, update.range, update.time);
}

View File

@@ -9,8 +9,10 @@
#pragma once
#include <span>
#include <tuple>
#include <seastar/core/shared_ptr.hh>
#include "gc_clock.hh"
#include "db/commitlog/replay_position.hh"
#include "dht/token.hh"
#include "locator/abstract_replication_strategy.hh"
#include "locator/token_metadata.hh"
@@ -47,7 +49,7 @@ using per_table_history_maps = std::unordered_map<table_id, repair_history_map_p
class tombstone_gc_options;
using gc_time_min_source = std::function<gc_clock::time_point(const table_id&)>;
using gc_time_min_source = std::function<gc_clock::time_point(const table_id&, const db::replay_position&)>;
struct range_repair_time {
dht::token_range range;
@@ -91,8 +93,8 @@ public:
_gc_min_source = std::move(src);
}
gc_clock::time_point get_gc_min_time(const table_id& tid) const noexcept {
return _gc_min_source ? _gc_min_source(tid) : gc_clock::time_point::max();
gc_clock::time_point get_gc_min_time(const table_id& tid, const db::replay_position& rp = {}) const noexcept {
return _gc_min_source ? _gc_min_source(tid, rp) : gc_clock::time_point::max();
}
void set_table_rf_one(table_id id) {
@@ -108,7 +110,8 @@ public:
return _rf_one_tables.contains(id);
}
void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time);
using opt_rp = std::optional<db::replay_position>;
void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp = {});
// A single (range, repair_time) pair used by batch_update_repair_time.
using repair_time_update = std::pair<dht::token_range, gc_clock::time_point>;
@@ -148,7 +151,7 @@ private:
bool _check_commitlog{true};
private:
[[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const;
[[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point, const db::replay_position& = {}) const;
[[nodiscard]] repair_history_map_ptr get_repair_history_for_table(const table_id& id) const;