diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index 56fcfecc06..8922cefd6d 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -488,7 +488,7 @@ public: future> list_descriptors(sstring dir) const; future> get_segments_to_replay() const; - gc_clock::time_point min_gc_time(const cf_id_type&) const; + gc_clock::time_point min_gc_time(const cf_id_type&, const db::replay_position&) const; flush_handler_id add_flush_handler(flush_handler h) { auto id = ++_flush_ids; @@ -2053,9 +2053,12 @@ future> db::commitlog::segment_manager::get_segments_to_rep co_return segments_to_replay; } -gc_clock::time_point db::commitlog::segment_manager::min_gc_time(const cf_id_type& id) const { +gc_clock::time_point db::commitlog::segment_manager::min_gc_time(const cf_id_type& id, const db::replay_position& rp) const { auto res = gc_clock::time_point::max(); for (auto& s : _segments) { + if (rp.valid() && replay_position(s->_desc.id, s->position()) <= rp) { + continue; + } res = std::min(res, s->min_time(id)); } return res; @@ -3956,8 +3959,8 @@ future> db::commitlog::list_existing_segments(const sstring }); } -gc_clock::time_point db::commitlog::min_gc_time(const cf_id_type& id) const { - return _segment_manager->min_gc_time(id); +gc_clock::time_point db::commitlog::min_gc_time(const cf_id_type& id, const db::replay_position& rp) const { + return _segment_manager->min_gc_time(id, rp); } db::replay_position db::commitlog::min_position() const { diff --git a/db/commitlog/commitlog.hh b/db/commitlog/commitlog.hh index 745c89bd4a..4cb76d11eb 100644 --- a/db/commitlog/commitlog.hh +++ b/db/commitlog/commitlog.hh @@ -375,7 +375,11 @@ public: future> list_existing_segments() const; future> list_existing_segments(const sstring& dir) const; - gc_clock::time_point min_gc_time(const cf_id_type&) const; + /** + * Gets the recorded min timestamp for the given id. Optionally filter by + * replay position, i.e. skip segments that have top position <= rp_filter + */ + gc_clock::time_point min_gc_time(const cf_id_type&, const db::replay_position& rp_filter = {}) const; // Return the lowest possible replay position across all existing or future commitlog segments. // In other words, only positions greater or equal to min_position() can diff --git a/db/commitlog/replay_position.hh b/db/commitlog/replay_position.hh index 7f368cf8e8..53bbbd0ecd 100644 --- a/db/commitlog/replay_position.hh +++ b/db/commitlog/replay_position.hh @@ -55,6 +55,10 @@ struct replay_position { template auto describe_type(sstables::sstable_version_types v, Describer f) { return f(id, pos); } + + bool valid() const { + return id != 0 && pos != 0; + } }; class commitlog; diff --git a/db/schema_applier.cc b/db/schema_applier.cc index 8a5d4e6770..ee8b7947b9 100644 --- a/db/schema_applier.cc +++ b/db/schema_applier.cc @@ -1103,7 +1103,7 @@ future<> schema_applier::finalize_tables_and_views() { if (_tablet_hint) { auto& db = sharded_db.local(); co_await db.get_compaction_manager().get_shared_tombstone_gc_state(). - flush_pending_repair_time_update(db); + flush_pending_repair_time_update(sharded_db, _sys_ks); _ss.local().wake_up_topology_state_machine(); } diff --git a/main.cc b/main.cc index 2b4717ee62..088fcb27a6 100644 --- a/main.cc +++ b/main.cc @@ -2054,21 +2054,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); cm.invoke_on_all([&](compaction::compaction_manager& cm) { - auto cl = db.local().commitlog(); - auto scl = db.local().schema_commitlog(); - if (cl && scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) { - return std::min(cl->min_gc_time(id), scl->min_gc_time(id)); - }); - } else if (cl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) { - return cl->min_gc_time(id); - }); - } else if (scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) { - return scl->min_gc_time(id); - }); - } + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([&db](const table_id& id, const db::replay_position& rp) { + auto t = db.local().get_tables_metadata().get_table_if_exists(id); + if (t && t->ready_for_writes()) { + auto* cl = t->commitlog(); + if (cl) { + return cl->min_gc_time(id, rp); + } + } + return gc_clock::time_point::max(); + }); }).get(); checkpoint(stop_signal, "loading tablet metadata"); diff --git a/repair/row_level.cc b/repair/row_level.cc index d9c2ed3869..2207875bc0 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2606,9 +2606,15 @@ future repair_service::repair_update_system if (!is_valid_range) { throw std::runtime_error(format("repair[{}]: range {} is not in the format of (start, end]", req.repair_uuid, req.range)); } - co_await db.invoke_on_all([&req] (replica::database& local_db) { + co_await db.invoke_on_all([&] (replica::database& local_db) -> future<> { + db::replay_position high_rp; + auto t = local_db.get_tables_metadata().get_table_if_exists(req.table_uuid); + if (t) { + high_rp = t->highest_flushed_replay_position(); + co_await _sys_ks.local().save_commitlog_cleanup_record(req.table_uuid, req.range, high_rp); + } auto& gc_state = local_db.get_compaction_manager().get_shared_tombstone_gc_state(); - return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time); + co_return gc_state.update_repair_time(req.table_uuid, req.range, req.repair_time, high_rp); }); db::system_keyspace::repair_history_entry ent; ent.id = req.repair_uuid; diff --git a/replica/database.hh b/replica/database.hh index c76e88ab19..844ec94269 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1444,6 +1444,10 @@ public: future get_compaction_reenablers_and_lock_holders_for_repair(replica::database& db, const service::frozen_topology_guard& guard, dht::token_range range); future estimated_partitions_in_range(dht::token_range tr) const; + + bool ready_for_writes() const { + return !_readonly; + } }; lw_shared_ptr make_tablet_sstable_set(schema_ptr, const storage_group_manager& sgm, const locator::tablet_map&); diff --git a/service/storage_service.cc b/service/storage_service.cc index 6928a4572a..746479fe67 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2332,7 +2332,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt }); co_await change.destroy(); co_await _db.local().get_compaction_manager().get_shared_tombstone_gc_state(). - flush_pending_repair_time_update(_db.local()); + flush_pending_repair_time_update(_db, _sys_ks); } future<> storage_service::stop() { diff --git a/test.py b/test.py index 050df469ca..2ff29a632a 100755 --- a/test.py +++ b/test.py @@ -366,6 +366,8 @@ def run_pytest(options: argparse.Namespace) -> int: args.append('--save-log-on-success') if options.markers: args.append(f'-m={options.markers}') + if options.log_level: + args.append(f'--log-level={options.log_level}') args.extend(files_to_run) exit_code = pytest.main(args=args) diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 6515d8d2e0..d84dab72ef 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4527,7 +4527,7 @@ SEASTAR_TEST_CASE(test_populating_cache_with_expired_and_nonexpired_tombstones) schema_ptr s = t.schema(); // emulate commitlog behaivor - t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id) { + t.get_compaction_manager().get_shared_tombstone_gc_state().set_gc_time_min_source([s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); @@ -4722,7 +4722,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) { tombstone_gc_state gc_state(gc_shared_state); // emulate commitlog behaivor - gc_shared_state.set_gc_time_min_source([&s](const table_id& id) { + gc_shared_state.set_gc_time_min_source([&s](const table_id& id, auto&) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); diff --git a/test/cluster/test_commitlog_segment_data_resurrection.py b/test/cluster/test_commitlog_segment_data_resurrection.py index 8c988beea7..c0d06ecafc 100644 --- a/test/cluster/test_commitlog_segment_data_resurrection.py +++ b/test/cluster/test_commitlog_segment_data_resurrection.py @@ -11,6 +11,12 @@ import os import logging import glob import json +import asyncio +import time +from functools import partial +from datetime import datetime, timezone +from test.cluster.util import new_test_keyspace +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts logger = logging.getLogger(__name__) @@ -129,3 +135,164 @@ async def test_pinned_cl_segment_doesnt_resurrect_data(manager: ManagerClient): cql = manager.cql assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + +@pytest.mark.asyncio +async def test_pinned_cl_segment_doesnt_resurrect_data_but_repair_ensures_tombstone_gc(manager: ManagerClient): + """ + """ + cfg = { + "commitlog_sync": "batch", + "commitlog_segment_size_in_mb": 1, + "enable_cache": False, + "hinted_handoff_enabled": False, + "repair_hints_batchlog_flush_cache_time_in_ms": 0, + } + servers = await manager.servers_add(3, config=cfg, property_file=[ + {"dc": "dc1", "rack": "r1"}, + {"dc": "dc1", "rack": "r2"}, + {"dc": "dc1", "rack": "r3"}] + ) + + cql = manager.cql + + hosts = [(await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + for s in servers] + + async def get_segments_num(server): + metrics_res = await manager.metrics.query(server.ip_addr) + return int(metrics_res.get("scylla_commitlog_segments")) + + async def get_segments_nums(): + return [await get_segments_num(s) for s in servers] + + def less_than_by(after, before, off = 0): + return all(x < (y + off) for x, y in zip(before, after)) + + async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks1, \ + new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks2: + tbl1 = f"{ks1}.tbl1" + tbl2 = f"{ks2}.tbl2" + await cql.run_async(f"create table {tbl1} (pk int, ck int, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair' }}") + await cql.run_async(f"create table {tbl2} (pk int, ck int, v text, primary key(pk, ck)) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '0'}}") + + insert_id_tbl1 = cql.prepare(f"INSERT INTO {tbl1} (pk, ck) VALUES (?, ?)") + insert_id_tbl2 = cql.prepare(f"INSERT INTO {tbl2} (pk, ck, v) VALUES (?, ?, ?)") + pk1 = 0 + pk2 = 1 + ck = 0 + value = "v" * 1024 + + segments_before_writes = await get_segments_nums() + segments_after_writes = segments_before_writes + + logger.debug("Have %s segments before writing data", segments_after_writes) + + logger.debug("Filling segment with mixed data from %s and %s", tbl2, tbl2) + + # Ensure at least one segment with writes from both tables + while less_than_by(segments_before_writes, segments_after_writes, 1): + cql.execute(insert_id_tbl1, (pk1, ck)) + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Filling segment(s) with %s only", tbl2) + + while less_than_by(segments_before_writes, segments_after_writes, 3): + cql.execute(insert_id_tbl2, (pk1, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + cql.execute(f"DELETE FROM {tbl2} WHERE pk = {pk1}") + + # We need to make sure the segment in which the above delete landed in + # is full, otherwise the memtable flush will not be able to destroy it. + logger.debug("Filling another segment with %s (pk=%s)", tbl2, pk2) + + while less_than_by(segments_before_writes, segments_after_writes, 4): + cql.execute(insert_id_tbl2, (pk2, ck, value)) + ck = ck + 1 + segments_after_writes = await get_segments_nums() + + logger.debug("Wrote %s rows, now have %s segments", ck, segments_after_writes) + logger.debug("Flush %s", tbl2) + + async def flush_ks(server): + await manager.api.keyspace_flush(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + async def compact_ks(server): + await manager.api.keyspace_compaction(node_ip=server.ip_addr, keyspace=ks2, table="tbl2") + + await asyncio.gather(*[flush_ks(s) for s in servers]) + + segments_after = await get_segments_nums() + logger.debug("After flush, now have %s segments", segments_after) + + assert len(list(cql.execute(f"SELECT * FROM {tbl1} WHERE pk = {pk1}"))) > 0 + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 + + tombstone_mark = datetime.now(timezone.utc) + + def get_tombstone(row): + if row.metadata is None: + return None + metadata = json.loads(row.metadata) + return metadata.get("tombstone") + + async def list_tombstones(tombstone_mark, host): + res = list(cql.execute(f"SELECT * FROM MUTATION_FRAGMENTS({tbl2})", host=host)) + tombstones = [] + for row in res: + tombstone = get_tombstone(row) + if tombstone and datetime.fromtimestamp(float(tombstone["timestamp"])/1_000_000, timezone.utc) < tombstone_mark: + tombstones.append(tombstone) + return tombstones + + async def list_all_tombstones(tombstone_mark): + tombstones_per_host = await asyncio.gather(*[list_tombstones(tombstone_mark, host) + for host in hosts]) + all_tombstones = [] + for tombstones in tombstones_per_host: + all_tombstones += tombstones + return all_tombstones + + async def tombstone_gc_completed(tombstone_mark): + # flush and compact the keyspace + await asyncio.gather(*[flush_ks(s) for s in servers]) + await asyncio.gather(*[compact_ks(s) for s in servers]) + + all_tombstones = await list_all_tombstones(tombstone_mark) + logger.debug(all_tombstones) + tombstones_count_total = len(all_tombstones) + if tombstones_count_total != 0: + return None + return True + + # should usually run much faster than 30s, but left some margin to avoid flakiness + async def verify_tombstone_gc(tombstone_mark, timeout=30): + deadline = time.time() + timeout + await wait_for(partial(tombstone_gc_completed, tombstone_mark), deadline) + + + tombstones = await list_all_tombstones(tombstone_mark) + + assert len(tombstones) > 0, "there should be tombstones at this point" + + # wait for 2 sec to let the current tombstones fully expire + #await asyncio.sleep(2) + await manager.api.repair(servers[0].ip_addr, ks2, "tbl2") + + # now we should be able to get to a state where all tombstones are gone. + await verify_tombstone_gc(tombstone_mark) + + logger.debug("Kill + restart the nodes") + + await asyncio.gather(*[manager.server_stop(s.server_id, False) for s in servers]) + await asyncio.gather(*[manager.server_start(s.server_id) for s in servers]) + + manager.driver_close() + await manager.driver_connect() + cql = manager.cql + + assert len(list(cql.execute(f"SELECT * FROM {tbl2} WHERE pk = {pk1}"))) == 0 diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index e2ac86f7c8..550ac89659 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -1031,21 +1031,16 @@ private: }); _cm.invoke_on_all([&](compaction::compaction_manager& cm) { - auto cl = _db.local().commitlog(); - auto scl = _db.local().schema_commitlog(); - if (cl && scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) { - return std::min(cl->min_gc_time(id), scl->min_gc_time(id)); - }); - } else if (cl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) { - return cl->min_gc_time(id); - }); - } else if (scl) { - cm.get_shared_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) { - return scl->min_gc_time(id); - }); - } + cm.get_shared_tombstone_gc_state().set_gc_time_min_source([this](const table_id& id, const db::replay_position& rp) { + auto t = _db.local().get_tables_metadata().get_table_if_exists(id); + if (t && t->ready_for_writes()) { + auto* cl = t->commitlog(); + if (cl) { + return cl->min_gc_time(id, rp); + } + } + return gc_clock::time_point::max(); + }); }).get(); replica::distributed_loader::init_non_system_keyspaces(_db, _proxy, _sys_ks).get(); diff --git a/tombstone_gc-internals.hh b/tombstone_gc-internals.hh index 0b75282e6a..e0b9418cac 100644 --- a/tombstone_gc-internals.hh +++ b/tombstone_gc-internals.hh @@ -6,7 +6,21 @@ #include "tombstone_gc.hh" #include -using repair_history_map = boost::icl::interval_map; +/** + * Holds a repair history entry for a given token range. + * Timestamp is time of last repair, replay_position is an + * optional flush mark for the table in question which is + * also marked in the commitlog cleanup table. I.e. the + * lowest known position for which a replay might occur. + */ +struct repair_history_entry { + gc_clock::time_point timestamp; + db::replay_position replay_position; + + auto operator<=>(const repair_history_entry&) const noexcept = default; +}; + +using repair_history_map = boost::icl::interval_map; class repair_history_map_ptr { lw_shared_ptr _ptr; diff --git a/tombstone_gc.cc b/tombstone_gc.cc index ee438ad7ff..a072dbd49b 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -14,6 +14,7 @@ #include "tombstone_gc-internals.hh" #include "locator/token_metadata.hh" #include "exceptions/exceptions.hh" +#include "db/system_keyspace.hh" #include "locator/abstract_replication_strategy.hh" #include "replica/database.hh" #include "data_dictionary/data_dictionary.hh" @@ -96,6 +97,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be auto max_repair_timestamp = gc_clock::time_point::min(); int hits = 0; knows_entire_range = false; + db::replay_position min_rp; + if (_shared_state && _shared_state->is_table_rf_one(s->id())) { // We don't have repair history, but the table is RF=1 so we return the same as tombstone_gc_mode::immediate would. auto t = check_min(s, query_time); @@ -109,8 +112,9 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be bool contains_all = false; for (const auto& [i, s] = m->equal_range(interval); auto& x : std::ranges::subrange(i, s)) { auto r = locator::token_metadata::interval_to_range(x.first); - min = std::min(x.second, min); - max = std::max(x.second, max); + min = std::min(x.second.timestamp, min); + max = std::max(x.second.timestamp, max); + min_rp = min_rp.valid() ? std::min(min_rp, x.second.replay_position) : min_rp; if (++hits == 1 && r.contains(range, dht::token_comparator{})) { contains_all = true; } @@ -123,8 +127,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be min_repair_timestamp = min; max_repair_timestamp = max; } - min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay)); - max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay)); + min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay), min_rp); + max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay), min_rp); }; dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}", s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range); @@ -141,9 +145,9 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair; } -gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const { +gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t, const db::replay_position& rp) const { if (_check_commitlog && _shared_state && t != gc_clock::time_point::min()) { - return std::min(t, _shared_state->get_gc_min_time(s->id())); + return std::min(t, _shared_state->get_gc_min_time(s->id(), rp)); } return t; } @@ -180,6 +184,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds(); auto gc_before = gc_clock::time_point::min(); auto repair_timestamp = gc_clock::time_point::min(); + db::replay_position rp; if (_shared_state && _shared_state->is_table_rf_one(s->id())) { gc_before = query_time; } else if (auto m = get_repair_history_for_table(s->id()); m) { @@ -187,11 +192,12 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con if (it == m->end()) { gc_before = gc_clock::time_point::min(); } else { - repair_timestamp = it->second; + repair_timestamp = it->second.timestamp; + rp = it->second.replay_position; gc_before = saturating_subtract(repair_timestamp, propagation_delay); } } - gc_before = check_min(s, gc_before); + gc_before = check_min(s, gc_before, rp); dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}", s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before); return gc_before; @@ -225,19 +231,19 @@ const per_table_history_maps& shared_tombstone_gc_state::get_reconcile_history_m return *_reconcile_history_maps; } -static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { +static void do_update_repair_time(per_table_history_maps& reconcile_history_maps, table_id id, const dht::token_range& range, gc_clock::time_point repair_time, db::replay_position rp) { auto [it, inserted] = reconcile_history_maps.try_emplace(id, lw_shared_ptr(nullptr)); if (inserted || !it->second) { // check for failed past update, leaving behind nullptr it->second = seastar::make_lw_shared(); } else { it->second = seastar::make_lw_shared(*it->second); } - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{ repair_time, rp }); } -void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time) { - mutate_repair_history([id, &range, repair_time] (per_table_history_maps& maps) { - do_update_repair_time(maps, id, range, repair_time); +void shared_tombstone_gc_state::update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp rp) { + mutate_repair_history([id, &range, repair_time, rp] (per_table_history_maps& maps) { + do_update_repair_time(maps, id, range, repair_time, rp.value_or({})); }); } @@ -254,7 +260,7 @@ void shared_tombstone_gc_state::batch_update_repair_time(table_id id, std::span< it->second = seastar::make_lw_shared(*it->second); } for (const auto& [range, repair_time] : updates) { - *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_time); + *it->second += std::make_pair(locator::token_metadata::range_to_interval(range), repair_history_entry{repair_time, db::replay_position{}}); } }); } @@ -264,18 +270,27 @@ void shared_tombstone_gc_state::insert_pending_repair_time_update(table_id id, _pending_updates[id].push_back(range_repair_time{range, repair_time, shard}); } -future<> shared_tombstone_gc_state::flush_pending_repair_time_update(replica::database& db) { +future<> shared_tombstone_gc_state::flush_pending_repair_time_update(sharded& db, sharded& sys_ks) { auto pending_updates = std::exchange(_pending_updates, {}); - co_await db.container().invoke_on_all([&pending_updates] (replica::database &localdb) -> future<> { + co_await sys_ks.invoke_on_all([&pending_updates, &db] (db::system_keyspace& local_sys_ks) -> future<> { + auto& localdb = db.local(); auto& shared_gc_state = localdb.get_compaction_manager().get_shared_tombstone_gc_state(); auto shard_maps = make_lw_shared(shared_gc_state.get_reconcile_history_maps()); for (auto& x : pending_updates) { auto& table = x.first; + db::replay_position high_rp; + auto t = localdb.get_tables_metadata().get_table_if_exists(table); + if (t) { + high_rp = t->highest_flushed_replay_position(); + } for (auto& update : x.second) { co_await coroutine::maybe_yield(); if (update.shard == this_shard_id()) { - do_update_repair_time(*shard_maps, table, update.range, update.time); + if (high_rp.valid()) { + co_await local_sys_ks.save_commitlog_cleanup_record(table, update.range, high_rp); + } + do_update_repair_time(*shard_maps, table, update.range, update.time, high_rp); dblog.debug("Flush pending repair time for tombstone gc: table={} range={} repair_time={}", table, update.range, update.time); } diff --git a/tombstone_gc.hh b/tombstone_gc.hh index 6d4e834013..419c2ab294 100644 --- a/tombstone_gc.hh +++ b/tombstone_gc.hh @@ -9,8 +9,10 @@ #pragma once #include +#include #include #include "gc_clock.hh" +#include "db/commitlog/replay_position.hh" #include "dht/token.hh" #include "locator/abstract_replication_strategy.hh" #include "locator/token_metadata.hh" @@ -47,7 +49,7 @@ using per_table_history_maps = std::unordered_map; +using gc_time_min_source = std::function; struct range_repair_time { dht::token_range range; @@ -91,8 +93,8 @@ public: _gc_min_source = std::move(src); } - gc_clock::time_point get_gc_min_time(const table_id& tid) const noexcept { - return _gc_min_source ? _gc_min_source(tid) : gc_clock::time_point::max(); + gc_clock::time_point get_gc_min_time(const table_id& tid, const db::replay_position& rp = {}) const noexcept { + return _gc_min_source ? _gc_min_source(tid, rp) : gc_clock::time_point::max(); } void set_table_rf_one(table_id id) { @@ -108,7 +110,8 @@ public: return _rf_one_tables.contains(id); } - void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time); + using opt_rp = std::optional; + void update_repair_time(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, opt_rp = {}); // A single (range, repair_time) pair used by batch_update_repair_time. using repair_time_update = std::pair; @@ -122,7 +125,7 @@ public: void drop_repair_history_for_table(const table_id& id); void insert_pending_repair_time_update(table_id id, const dht::token_range& range, gc_clock::time_point repair_time, shard_id shard); - future<> flush_pending_repair_time_update(replica::database& db); + future<> flush_pending_repair_time_update(sharded&, sharded&); tombstone_gc_state_snapshot snapshot() const noexcept; }; @@ -148,7 +151,7 @@ private: bool _check_commitlog{true}; private: - [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const; + [[nodiscard]] gc_clock::time_point check_min(schema_ptr, gc_clock::time_point, const db::replay_position& = {}) const; [[nodiscard]] repair_history_map_ptr get_repair_history_for_table(const table_id& id) const;