From bf445d97e5375a20e8b93aaf5f91e2b584b69303 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:14:22 +0200 Subject: [PATCH 1/8] db::replay_position: Add attribute valid() Just to reduce the rp == db::replay_position() statements... --- db/commitlog/replay_position.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/db/commitlog/replay_position.hh b/db/commitlog/replay_position.hh index 7f368cf8e8..53bbbd0ecd 100644 --- a/db/commitlog/replay_position.hh +++ b/db/commitlog/replay_position.hh @@ -55,6 +55,10 @@ struct replay_position { template auto describe_type(sstables::sstable_version_types v, Describer f) { return f(id, pos); } + + bool valid() const { + return id != 0 && pos != 0; + } }; class commitlog; From eef16c096c803b405ba23307e55596d95e1892a7 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:15:24 +0200 Subject: [PATCH 2/8] db::commitlog: Add optional replay_position sieve to min_gc_time Allows filtering out timestamps by replay position (i.e. skip some segments) --- db/commitlog/commitlog.cc | 11 +++++++---- db/commitlog/commitlog.hh | 6 +++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index ee2155c0f0..8be025b261 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -488,7 +488,7 @@ public: future> list_descriptors(sstring dir) const; future> get_segments_to_replay() const; - gc_clock::time_point min_gc_time(const cf_id_type&) const; + gc_clock::time_point min_gc_time(const cf_id_type&, const db::replay_position&) const; flush_handler_id add_flush_handler(flush_handler h) { auto id = ++_flush_ids; @@ -2053,9 +2053,12 @@ future> db::commitlog::segment_manager::get_segments_to_rep co_return segments_to_replay; } -gc_clock::time_point db::commitlog::segment_manager::min_gc_time(const cf_id_type& id) const { +gc_clock::time_point db::commitlog::segment_manager::min_gc_time(const cf_id_type& id, const db::replay_position& rp) const { auto res = gc_clock::time_point::max(); for (auto& s : _segments) { + if (rp.valid() && replay_position(s->_desc.id, s->position()) <= rp) { + continue; + } res = std::min(res, s->min_time(id)); } return res; @@ -3956,8 +3959,8 @@ future> db::commitlog::list_existing_segments(const sstring }); } -gc_clock::time_point db::commitlog::min_gc_time(const cf_id_type& id) const { - return _segment_manager->min_gc_time(id); +gc_clock::time_point db::commitlog::min_gc_time(const cf_id_type& id, const db::replay_position& rp) const { + return _segment_manager->min_gc_time(id, rp); } db::replay_position db::commitlog::min_position() const { diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 745c89bd4a..4cb76d11eb 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -375,7 +375,11 @@ public: future> list_existing_segments() const; future> list_existing_segments(const sstring& dir) const; - gc_clock::time_point min_gc_time(const cf_id_type&) const; + /** + * Gets the recorded min timestamp for the given id. Optionally filter by + * replay position, i.e. skip segments that have top position <= rp_filter + */ + gc_clock::time_point min_gc_time(const cf_id_type&, const db::replay_position& rp_filter = {}) const; // Return the lowest possible replay position across all existing or future commitlog segments. // In other words, only positions greater or equal to min_position() can From be79d08925c34b260967a359350847a3f1dc301e Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:21:04 +0200 Subject: [PATCH 3/8] table: Add ready_for_writes attribute Allows checking whether "commitlog()" can be called. --- replica/database.hh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/replica/database.hh b/replica/database.hh index 43c1692e0f..f95dc3b74b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1430,6 +1430,10 @@ public: future get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db, const service::frozen_topology_guard& guard, dht::token_range range); future estimated_partitions_in_range(dht::token_range tr) const; + + bool ready_for_writes() const { + return !_readonly; + } }; lw_shared_ptr make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&); From b82d16d6c89d60bb0d42605ab57b7a29b84eb3a9 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:19:10 +0200 Subject: [PATCH 4/8] main/cql_test_env: Make tombstone gc_time_min_source use just table CL Simplify the logic + reduces the amount of segments searched on each call --- main.cc | 25 ++++++++++--------------- test/lib/cql_test_env.cc | 25 ++++++++++--------------- 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/main.cc b/main.cc index e71f964cea..156d3b2bba 100644 --- a/main.cc +++ b/main.cc @@ -2054,21 +2054,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); cm.invoke_on_all([&](compaction::compaction_manager& cm) { - auto cl = db.local().commitlog(); - auto scl = db.local().schema_commitlog(); - if (cl && scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) { - return std::min(cl->min_gc_time(id), scl->min_gc_time(id)); - }); - } else if (cl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) { - return cl->min_gc_time(id); - }); - } else if (scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) { - return scl->min_gc_time(id); - }); - } + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id) { + auto t = db.local().get_tables_metadata().get_table_if_exists(id); + if (t && t->ready_for_writes()) { + auto* cl = t->commitlog(); + if (cl) { + return cl->min_gc_time(id); + } + } + return gc_clock::time_point::max(); + }); }).get(); checkpoint(stop_signal, "loading tablet metadata"); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index de2ab7ff21..a8f67baf13 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1030,21 +1030,16 @@ private: }); _cm.invoke_on_all([&](compaction::compaction_manager& cm) { - auto cl = _db.local().commitlog(); - auto scl = _db.local().schema_commitlog(); - if (cl && scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) { - return std::min(cl->min_gc_time(id), scl->min_gc_time(id)); - }); - } else if (cl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) { - return cl->min_gc_time(id); - }); - } else if (scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) { - return scl->min_gc_time(id); - }); - } + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id) { + auto t = _db.local().get_tables_metadata().get_table_if_exists(id); + if (t && t->ready_for_writes()) { + auto* cl = t->commitlog(); + if (cl) { + return cl->min_gc_time(id); + } + } + return gc_clock::time_point::max(); + }); }).get(); replica::distributed_loader::init_non_system_keyspaces(_db, _proxy, _sys_ks).get(); From f2f76f64450d5c34244fab6ab1900ca601f66548 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:23:42 +0200 Subject: [PATCH 5/8] tombstone_gc: Add optional replay_position to repair_history Adds an optional replay_position to the (transient) replay history, as well as provides this (when available) to gc_min_time callback checks. The idea is that when set (not zero rp), this can be used to limit the commitlog segments from which we check timestamps, potentially filtering out older ones. Of couse, whomever sets the attribute in the history must ensure that data below this point cannot be replayed. Note: this does not affect the persisted repair history, only node-local, transient data. --- main.cc | 4 ++-- test/boost/row_cache_test.cc | 4 ++-- test/lib/cql_test_env.cc | 4 ++-- tombstone_gc-internals.hh | 16 +++++++++++++++- tombstone_gc.cc | 35 ++++++++++++++++++++--------------- tombstone_gc.hh | 13 ++++++++----- 6 files changed, 49 insertions(+), 27 deletions(-) diff --git a/main.cc b/main.cc index 156d3b2bba..73f743b20c 100644 --- a/main.cc +++ b/main.cc @@ -2054,12 +2054,12 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id, const db::replay_position& rp) { auto t = db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index efd8e032ac..ee4eeb3587 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4527,7 +4527,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones) schema_ptr s = t.schema(); // emulate commitlog behaivor - t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id) { + t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); @@ -4722,7 +4722,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) { tombstone_gc_state gc_state(gc_shared_state); // emulate commitlog behaivor - gc_shared_state.set_gc_time_min_source([&s](const table_id& id) { + gc_shared_state.set_gc_time_min_source([&s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index a8f67baf13..514db2be1d 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1030,12 +1030,12 @@ private: }); _cm.invoke_on_all([&](compaction::compaction_manager& cm) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id) { + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id, const db::replay_position& rp) { auto t = _db.local().get_tables_metadata().get_table_if_exists(id); if (t && t->ready_for_writes()) { auto* cl = t->commitlog(); if (cl) { - return cl->min_gc_time(id); + return cl->min_gc_time(id, rp); } } return gc_clock::time_point::max(); diff --git a/tombstone_gc-internals.hh b/tombstone_gc-internals.hh index 0b75282e6a..e0b9418cac 100644 --- a/tombstone_gc-internals.hh +++ b/tombstone_gc-internals.hh @@ -6,7 +6,21 @@ #include "tombstone_gc.hh" #include -using repair_history_map = boost::icl::interval_map; +/** + * Holds a repair history entry for a given token range. + * Timestamp is time of last repair, replay_position is an + * optional flush mark for the table in question which is + * also marked in the commitlog cleanup table. I.e. the + * lowest known position for which a replay might occur. + */ +struct repair_history_entry { + gc_clock::time_point timestamp; + db::replay_position replay_position; + + auto operator<=>(const repair_history_entry&) const noexcept = default; +}; + +using repair_history_map = boost::icl::interval_map; class repair_history_map_ptr { lw_shared_ptr _ptr; diff --git a/tombstone_gc.cc b/tombstone_gc.cc index ee438ad7ff..7ed58e30a9 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -96,6 +96,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be auto max_repair_timestamp = gc_clock::time_point::min(); int hits = 0; knows_entire_range = false; + db::replay_position min_rp; + if (_shared_state && _shared_state->is_table_rf_one(s->id())) { // We don't have repair history, but the table is RF=1 so we return the same as tombstone_gc_mode::immediate would. auto t = check_min(s, query_time); @@ -109,8 +111,9 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be bool contains_all = false; for (const auto& [i, s] = m->equal_range(interval); auto& x : std::ranges::subrange(i, s)) { auto r = locator::token_metadata::interval_to_range(x.first); - min = std::min(x.second, min); - max = std::max(x.second, max); + min = std::min(x.second.timestamp, min); + max = std::max(x.second.timestamp, max); + min_rp = min_rp.valid() ? std::min(min_rp, x.second.replay_position) : min_rp; if (++hits == 1 && r.contains(range, dht::token_comparator{})) { contains_all = true; } @@ -123,8 +126,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be min_repair_timestamp = min; max_repair_timestamp = max; } - min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay)); - max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay)); + min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay), min_rp); + max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay), min_rp); }; dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}", s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range); @@ -141,9 +144,9 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair; } -gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const { +gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t, const db::replay_position& rp) const { if (_check_commitlog && _shared_state && t != gc_clock::time_point::min()) { - return std::min(t, _shared_state->get_gc_min_time(s->id())); + return std::min(t, _shared_state->get_gc_min_time(s->id(), rp)); } return t; } @@ -180,6 +183,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); auto gc_before = gc_clock::time_point::min(); auto repair_timestamp = gc_clock::time_point::min(); + db::replay_position rp; if (_shared_state && _shared_state->is_table_rf_one(s->id())) { gc_before = query_time; } else if (auto m = get_repair_history_for_table(s->id()); m) { @@ -187,11 +191,12 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con if (it == m->end()) { gc_before = gc_clock::time_point::min(); } else { - repair_timestamp = it->second; + repair_timestamp = it->second.timestamp; + rp = it->second.replay_position; gc_before = saturating_subtract(repair_timestamp, propagation_delay); } } - gc_before = check_min(s, gc_before); + gc_before = check_min(s, gc_before, rp); dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}", s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before); return gc_before; @@ -225,19 +230,19 @@ const per_table_history_maps& shared_tombstone_gc_state::get_reconcile_history_m return *_reconcile_history_maps; } -static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { +static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time, db::replay_position rp) { auto [it, inserted] = reconcile_history_maps.try_emplace(id, lw_shared_ptr(nullptr)); if (inserted || !it->second) { // check for failed past update, leaving behind nullptr it->second = seastar::make_lw_shared(); } else { it->second = seastar::make_lw_shared(*it->second); } - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{ repair_time, rp }); } -void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { - mutate_repair_history([id, &range, repair_time] (per_table_history_maps& maps) { - do_update_repair_time(maps, id, range, repair_time); +void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp rp) { + mutate_repair_history([id, &range, repair_time, rp] (per_table_history_maps& maps) { + do_update_repair_time(maps, id, range, repair_time, rp.value_or({})); }); } @@ -254,7 +259,7 @@ void shared_tombstone_gc_state::batch_update_repair_time(table_id id, std::span< it->second = seastar::make_lw_shared(*it->second); } for (const auto& [range, repair_time] : updates) { - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{repair_time, db::replay_position{}}); } }); } @@ -275,7 +280,7 @@ future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::da for (auto& update : x.second) { co_await coroutine::maybe_yield(); if (update.shard == this_shard_id()) { - do_update_repair_time(*shard_maps, table, update.range, update.time); + do_update_repair_time(*shard_maps, table, update.range, update.time, {}); dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}", table, update.range, update.time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 6d4e834013..12f2874505 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -9,8 +9,10 @@ #pragma once #include +#include #include #include "gc_clock.hh" +#include "db/commitlog/replay_position.hh" #include "dht/token.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/token_metadata.hh" @@ -47,7 +49,7 @@ using per_table_history_maps = std::unordered_map; +using gc_time_min_source = std::function; struct range_repair_time { dht::token_range range; @@ -91,8 +93,8 @@ public: _gc_min_source = std::move(src); } - gc_clock::time_point get_gc_min_time(const table_id& tid) const noexcept { - return _gc_min_source ? _gc_min_source(tid) : gc_clock::time_point::max(); + gc_clock::time_point get_gc_min_time(const table_id& tid, const db::replay_position& rp = {}) const noexcept { + return _gc_min_source ? _gc_min_source(tid, rp) : gc_clock::time_point::max(); } void set_table_rf_one(table_id id) { @@ -108,7 +110,8 @@ public: return _rf_one_tables.contains(id); } - void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); + using opt_rp = std::optional; + void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp = {}); // A single (range, repair_time) pair used by batch_update_repair_time. using repair_time_update = std::pair; @@ -148,7 +151,7 @@ private: bool _check_commitlog{true}; private: - [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const; + [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point, const db::replay_position& = {}) const; [[nodiscard]] repair_history_map_ptr get_repair_history_for_table(const table_id& id) const; From 47524d8d17b19bd525631373945d5a73d5a567fb Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 12 May 2026 10:29:55 +0200 Subject: [PATCH 6/8] repair: Set commitlog cleanup record and store replay pos in history Fixes: SCYLLADB-1416 When updating repair history, also set a commitlog cleanup record preventing replay of data lower than current flush position, and add said flush position to the local repair history. This will allow tomstone gc repair case to potentially filter out some commitlog data, and thus reducing the retention window for tombstones. --- db/schema_applier.cc | 2 +- repair/row_level.cc | 10 ++++++++-- service/storage_service.cc | 2 +- tombstone_gc.cc | 16 +++++++++++++--- tombstone_gc.hh | 2 +- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/db/schema_applier.cc b/db/schema_applier.cc index deb9849106..0d470b286a 100644 --- a/db/schema_applier.cc +++ b/db/schema_applier.cc @@ -1156,7 +1156,7 @@ future<> schema_applier::finalize_tables_and_views() { if (_tablet_hint) { auto& db = sharded_db.local(); co_await db.get_compaction_manager().get_shared_tombstone_gc_state(). - flush_pending_repair_time_update(db); + flush_pending_repair_time_update(sharded_db, _sys_ks); _ss.local().wake_up_topology_state_machine(); } diff --git a/repair/row_level.cc b/repair/row_level.cc index 9ae9d5a73c..7d3468c79b 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2606,9 +2606,15 @@ future repair_service::repair_update_system if (!is_valid_range) { throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range)); } - co_await db.invoke_on_all([&req] (replica::database& local_db) { + co_await db.invoke_on_all([&] (replica::database& local_db) -> future<> { + db::replay_position high_rp; + auto t = local_db.get_tables_metadata().get_table_if_exists(req.table_uuid); + if (t) { + high_rp = t->highest_flushed_replay_position(); + co_await _sys_ks.local().save_commitlog_cleanup_record(req.table_uuid, req.range, high_rp); + } auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state(); - return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time); + co_return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time, high_rp); }); db::system_keyspace::repair_history_entry ent; ent.id = req.repair_uuid; diff --git a/service/storage_service.cc b/service/storage_service.cc index ce665e1726..c7c87864db 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2335,7 +2335,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt }); co_await change.destroy(); co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state(). - flush_pending_repair_time_update(_db.local()); + flush_pending_repair_time_update(_db, _sys_ks); } future<> storage_service::stop() { diff --git a/tombstone_gc.cc b/tombstone_gc.cc index 7ed58e30a9..a072dbd49b 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -14,6 +14,7 @@ #include "tombstone_gc-internals.hh" #include "locator/token_metadata.hh" #include "exceptions/exceptions.hh" +#include "db/system_keyspace.hh" #include "locator/abstract_replication_strategy.hh" #include "replica/database.hh" #include "data_dictionary/data_dictionary.hh" @@ -269,18 +270,27 @@ void shared_tombstone_gc_state::insert_pending_repair_time_update(table_id id, _pending_updates[id].push_back(range_repair_time{range, repair_time, shard}); } -future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::database& db) { +future<> shared_tombstone_gc_state::flush_pending_repair_time_update(sharded& db, sharded& sys_ks) { auto pending_updates = std::exchange(_pending_updates, {}); - co_await db.container().invoke_on_all([&pending_updates] (replica::database &localdb) -> future<> { + co_await sys_ks.invoke_on_all([&pending_updates, &db] (db::system_keyspace& local_sys_ks) -> future<> { + auto& localdb = db.local(); auto& shared_gc_state = localdb.get_compaction_manager().get_shared_tombstone_gc_state(); auto shard_maps = make_lw_shared(shared_gc_state.get_reconcile_history_maps()); for (auto& x : pending_updates) { auto& table = x.first; + db::replay_position high_rp; + auto t = localdb.get_tables_metadata().get_table_if_exists(table); + if (t) { + high_rp = t->highest_flushed_replay_position(); + } for (auto& update : x.second) { co_await coroutine::maybe_yield(); if (update.shard == this_shard_id()) { - do_update_repair_time(*shard_maps, table, update.range, update.time, {}); + if (high_rp.valid()) { + co_await local_sys_ks.save_commitlog_cleanup_record(table, update.range, high_rp); + } + do_update_repair_time(*shard_maps, table, update.range, update.time, high_rp); dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}", table, update.range, update.time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 12f2874505..419c2ab294 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -125,7 +125,7 @@ public: void drop_repair_history_for_table(const table_id& id); void insert_pending_repair_time_update(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, shard_id shard); - future<> flush_pending_repair_time_update(replica::database& db); + future<> flush_pending_repair_time_update(sharded&, sharded&); tombstone_gc_state_snapshot snapshot() const noexcept; }; From f0eadcdd6416d30013731ecf2674e83360beff7d Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 19 May 2026 15:08:32 +0200 Subject: [PATCH 7/8] test.py: Make --log-level work again. Was regressed away. Just apply the values to pytest invocation --- test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test.py b/test.py index 2824f659bf..8258da1059 100755 --- a/test.py +++ b/test.py @@ -369,6 +369,8 @@ def run_pytest(options: argparse.Namespace) -> int: args.append('--save-log-on-success') if options.markers: args.append(f'-m={options.markers}') + if options.log_level: + args.append(f'--log-level={options.log_level}') args.extend(files_to_run) exit_code = pytest.main(args=args) From 43cab2a33214e21cb14e774d5d3ac5bfc76ce540 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Tue, 19 May 2026 15:10:05 +0200 Subject: [PATCH 8/8] test_...data_resurrection: Add test case for repair CL truncation Tests that we can remove tombstones even while live commitlog segments hold data for a table, iff repair added to CL truncation log. --- ...est_commitlog_segment_data_resurrection.py | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/test/cluster/test_commitlog_segment_data_resurrection.py b/test/cluster/test_commitlog_segment_data_resurrection.py index 8c988beea7..c0d06ecafc 100644 --- a/test/cluster/test_commitlog_segment_data_resurrection.py +++ b/test/cluster/test_commitlog_segment_data_resurrection.py @@ -11,6 +11,12 @@ import os import logging import glob import json +import asyncio +import time +from functools import partial +from datetime import datetime, timezone +from test.cluster.util import new_test_keyspace +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts logger = logging.getLogger(__name__) @@ -129,3 +135,164 @@ async def test_pinned_cl_segment_doesnt_resurrect_data(manager: ManagerClient): cql = manager.cql assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + +@pytest.mark.asyncio +async def test_pinned_cl_segment_doesnt_resurrect_data_but_repair_ensures_tombstone_gc(manager: ManagerClient): + """ + """ + cfg = { + "commitlog_sync": "batch", + "commitlog_segment_size_in_mb": 1, + "enable_cache": False, + "hinted_handoff_enabled": False, + "repair_hints_batchlog_flush_cache_time_in_ms": 0, + } + servers = await manager.servers_add(3, config=cfg, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r2"}, + {"dc": "dc1", "rack": "r3"}] + ) + + cql = manager.cql + + hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + for s in servers] + + async def get_segments_num(server): + metrics_res = await manager.metrics.query(server.ip_addr) + return int(metrics_res.get("scylla_commitlog_segments")) + + async def get_segments_nums(): + return [await get_segments_num(s) for s in servers] + + def less_than_by(after, before, off = 0): + return all(x < (y + off) for x, y in zip(before, after)) + + async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks1, \ + new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks2: + tbl1 = f"{ks1}.tbl1" + tbl2 = f"{ks2}.tbl2" + await cql.run_async(f"create table {tbl1} (pk int, ck int, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair' }}") + await cql.run_async(f"create table {tbl2} (pk int, ck int, v text, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '0'}}") + + insert_id_tbl1 = cql.prepare(f"INSERT INTO {tbl1} (pk, ck) VALUES (?, ?)") + insert_id_tbl2 = cql.prepare(f"INSERT INTO {tbl2} (pk, ck, v) VALUES (?, ?, ?)") + pk1 = 0 + pk2 = 1 + ck = 0 + value = "v" * 1024 + + segments_before_writes = await get_segments_nums() + segments_after_writes = segments_before_writes + + logger.debug("Have %s segments before writing data", segments_after_writes) + + logger.debug("Filling segment with mixed data from %s and %s", tbl2, tbl2) + + # Ensure at least one segment with writes from both tables + while less_than_by(segments_before_writes, segments_after_writes, 1): + cql.execute(insert_id_tbl1, (pk1, ck)) + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Filling segment(s) with %s only", tbl2) + + while less_than_by(segments_before_writes, segments_after_writes, 3): + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + cql.execute(f"DELETE FROM {tbl2} WHERE pk = {pk1}") + + # We need to make sure the segment in which the above delete landed in + # is full, otherwise the memtable flush will not be able to destroy it. + logger.debug("Filling another segment with %s (pk=%s)", tbl2, pk2) + + while less_than_by(segments_before_writes, segments_after_writes, 4): + cql.execute(insert_id_tbl2, (pk2, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Wrote %s rows, now have %s segments", ck, segments_after_writes) + logger.debug("Flush %s", tbl2) + + async def flush_ks(server): + await manager.api.keyspace_flush(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + async def compact_ks(server): + await manager.api.keyspace_compaction(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + await asyncio.gather(*[flush_ks(s) for s in servers]) + + segments_after = await get_segments_nums() + logger.debug("After flush, now have %s segments", segments_after) + + assert len(list(cql.execute(f"SELECT * FROM {tbl1} WHERE pk = {pk1}"))) > 0 + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + tombstone_mark = datetime.now(timezone.utc) + + def get_tombstone(row): + if row.metadata is None: + return None + metadata = json.loads(row.metadata) + return metadata.get("tombstone") + + async def list_tombstones(tombstone_mark, host): + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({tbl2})", host=host)) + tombstones = [] + for row in res: + tombstone = get_tombstone(row) + if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark: + tombstones.append(tombstone) + return tombstones + + async def list_all_tombstones(tombstone_mark): + tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host) + for host in hosts]) + all_tombstones = [] + for tombstones in tombstones_per_host: + all_tombstones += tombstones + return all_tombstones + + async def tombstone_gc_completed(tombstone_mark): + # flush and compact the keyspace + await asyncio.gather(*[flush_ks(s) for s in servers]) + await asyncio.gather(*[compact_ks(s) for s in servers]) + + all_tombstones = await list_all_tombstones(tombstone_mark) + logger.debug(all_tombstones) + tombstones_count_total = len(all_tombstones) + if tombstones_count_total != 0: + return None + return True + + # should usually run much faster than 30s, but left some margin to avoid flakiness + async def verify_tombstone_gc(tombstone_mark, timeout=30): + deadline = time.time() + timeout + await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline) + + + tombstones = await list_all_tombstones(tombstone_mark) + + assert len(tombstones) > 0, "there should be tombstones at this point" + + # wait for 2 sec to let the current tombstones fully expire + #await asyncio.sleep(2) + await manager.api.repair(servers[0].ip_addr, ks2, "tbl2") + + # now we should be able to get to a state where all tombstones are gone. + await verify_tombstone_gc(tombstone_mark) + + logger.debug("Kill + restart the nodes") + + await asyncio.gather(*[manager.server_stop(s.server_id, False) for s in servers]) + await asyncio.gather(*[manager.server_start(s.server_id) for s in servers]) + + manager.driver_close() + await manager.driver_connect() + cql = manager.cql + + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0