diff --git a/main.cc b/main.cc index 156d3b2bba..73f743b20c 100644 --- a/main.cc +++ b/main.cc @@ -2054,12 +2054,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id, const db::replay_position& rp) { auto t = db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index efd8e032ac..ee4eeb3587 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4527,7 +4527,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones) schema_ptr s = t.schema(); // emulate commitlog behaivor - t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id) { + t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); @@ -4722,7 +4722,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) { tombstone_gc_state gc_state(gc_shared_state); // emulate commitlog behaivor - gc_shared_state.set_gc_time_min_source([&s](const table_id& id) { + gc_shared_state.set_gc_time_min_source([&s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index a8f67baf13..514db2be1d 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1030,12 +1030,12 @@ private: }); _cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id, const db::replay_position& rp) { auto t = _db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/tombstone_gc-internals.hh b/tombstone_gc-internals.hh index 0b75282e6a..e0b9418cac 100644 --- a/tombstone_gc-internals.hh +++ b/tombstone_gc-internals.hh @@ -6,7 +6,21 @@ #include "tombstone_gc.hh" #include -using repair_history_map = boost::icl::interval_map; +/** + * Holds a repair history entry for a given token range. + * Timestamp is time of last repair, replay_position is an + * optional flush mark for the table in question which is + * also marked in the commitlog cleanup table. I.e. the + * lowest known position for which a replay might occur. + */ +struct repair_history_entry { + gc_clock::time_point timestamp; + db::replay_position replay_position; + + auto operator<=>(const repair_history_entry&) const noexcept = default; +}; + +using repair_history_map = boost::icl::interval_map; class repair_history_map_ptr { lw_shared_ptr _ptr; diff --git a/tombstone_gc.cc b/tombstone_gc.cc index ee438ad7ff..7ed58e30a9 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -96,6 +96,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be auto max_repair_timestamp = gc_clock::time_point::min(); int hits = 0; knows_entire_range = false; + db::replay_position min_rp; + if (_shared_state && _shared_state->is_table_rf_one(s->id())) { // We don't have repair history, but the table is RF=1 so we return the same as tombstone_gc_mode::immediate would. auto t = check_min(s, query_time); @@ -109,8 +111,9 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be bool contains_all = false; for (const auto& [i, s] = m->equal_range(interval); auto& x : std::ranges::subrange(i, s)) { auto r = locator::token_metadata::interval_to_range(x.first); - min = std::min(x.second, min); - max = std::max(x.second, max); + min = std::min(x.second.timestamp, min); + max = std::max(x.second.timestamp, max); + min_rp = min_rp.valid() ? std::min(min_rp, x.second.replay_position) : min_rp; if (++hits == 1 && r.contains(range, dht::token_comparator{})) { contains_all = true; } @@ -123,8 +126,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be min_repair_timestamp = min; max_repair_timestamp = max; } - min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay)); - max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay)); + min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay), min_rp); + max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay), min_rp); }; dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}", s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range); @@ -141,9 +144,9 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair; } -gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const { +gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t, const db::replay_position& rp) const { if (_check_commitlog && _shared_state && t != gc_clock::time_point::min()) { - return std::min(t, _shared_state->get_gc_min_time(s->id())); + return std::min(t, _shared_state->get_gc_min_time(s->id(), rp)); } return t; } @@ -180,6 +183,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); auto gc_before = gc_clock::time_point::min(); auto repair_timestamp = gc_clock::time_point::min(); + db::replay_position rp; if (_shared_state && _shared_state->is_table_rf_one(s->id())) { gc_before = query_time; } else if (auto m = get_repair_history_for_table(s->id()); m) { @@ -187,11 +191,12 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con if (it == m->end()) { gc_before = gc_clock::time_point::min(); } else { - repair_timestamp = it->second; + repair_timestamp = it->second.timestamp; + rp = it->second.replay_position; gc_before = saturating_subtract(repair_timestamp, propagation_delay); } } - gc_before = check_min(s, gc_before); + gc_before = check_min(s, gc_before, rp); dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}", s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before); return gc_before; @@ -225,19 +230,19 @@ const per_table_history_maps& shared_tombstone_gc_state::get_reconcile_history_m return *_reconcile_history_maps; } -static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { +static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time, db::replay_position rp) { auto [it, inserted] = reconcile_history_maps.try_emplace(id, lw_shared_ptr(nullptr)); if (inserted || !it->second) { // check for failed past update, leaving behind nullptr it->second = seastar::make_lw_shared(); } else { it->second = seastar::make_lw_shared(*it->second); } - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{ repair_time, rp }); } -void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { - mutate_repair_history([id, &range, repair_time] (per_table_history_maps& maps) { - do_update_repair_time(maps, id, range, repair_time); +void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp rp) { + mutate_repair_history([id, &range, repair_time, rp] (per_table_history_maps& maps) { + do_update_repair_time(maps, id, range, repair_time, rp.value_or({})); }); } @@ -254,7 +259,7 @@ void shared_tombstone_gc_state::batch_update_repair_time(table_id id, std::span< it->second = seastar::make_lw_shared(*it->second); } for (const auto& [range, repair_time] : updates) { - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{repair_time, db::replay_position{}}); } }); } @@ -275,7 +280,7 @@ future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::da for (auto& update : x.second) { co_await coroutine::maybe_yield(); if (update.shard == this_shard_id()) { - do_update_repair_time(*shard_maps, table, update.range, update.time); + do_update_repair_time(*shard_maps, table, update.range, update.time, {}); dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}", table, update.range, update.time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 6d4e834013..12f2874505 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -9,8 +9,10 @@ #pragma once #include +#include #include #include "gc_clock.hh" +#include "db/commitlog/replay_position.hh" #include "dht/token.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/token_metadata.hh" @@ -47,7 +49,7 @@ using per_table_history_maps = std::unordered_map; +using gc_time_min_source = std::function; struct range_repair_time { dht::token_range range; @@ -91,8 +93,8 @@ public: _gc_min_source = std::move(src); } - gc_clock::time_point get_gc_min_time(const table_id& tid) const noexcept { - return _gc_min_source ? _gc_min_source(tid) : gc_clock::time_point::max(); + gc_clock::time_point get_gc_min_time(const table_id& tid, const db::replay_position& rp = {}) const noexcept { + return _gc_min_source ? _gc_min_source(tid, rp) : gc_clock::time_point::max(); } void set_table_rf_one(table_id id) { @@ -108,7 +110,8 @@ public: return _rf_one_tables.contains(id); } - void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); + using opt_rp = std::optional; + void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp = {}); // A single (range, repair_time) pair used by batch_update_repair_time. using repair_time_update = std::pair; @@ -148,7 +151,7 @@ private: bool _check_commitlog{true}; private: - [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const; + [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point, const db::replay_position& = {}) const; [[nodiscard]] repair_history_map_ptr get_repair_history_for_table(const table_id& id) const;