diff --git a/repair/row_level.cc b/repair/row_level.cc index bc84759411..1f8c06700e 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2171,6 +2171,7 @@ public: public: future<> mark_sstable_as_repaired() { auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired(); +<<<<<<< HEAD if (_incremental_repair_meta.sst_set || !sstables.empty()) { co_await seastar::async([&] { auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) { @@ -2188,6 +2189,96 @@ public: for (auto& sst : sstables) { seastar::thread::maybe_yield(); do_mark_sstable_as_repaired(sst, "repair_produced"); +||||||| parent of 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired) + if (!_incremental_repair_meta.sst_set && !sstables.empty()) { + co_return; + } + + auto& table = _db.local().find_column_family(_schema->id()); + auto& cm = table.get_compaction_manager(); + int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1; + + auto modifier = [repaired_at] (sstables::sstable& new_sst) { + new_sst.update_repaired_at(repaired_at); + }; + + std::unordered_map> sstables_by_group; + auto add_sstable = [&] (const sstables::shared_sstable& sst) { + if (sst->should_update_repaired_at(repaired_at)) { + auto& view = table.compaction_group_view_for_sstable(sst); + sstables_by_group[&view].push_back(sst); + } + }; + + if (_incremental_repair_meta.sst_set) { + _incremental_repair_meta.sst_set->for_each_sstable(add_sstable); + } + + for (auto& sst : sstables) { + add_sstable(sst); + } + + for (auto& [view, ssts] : sstables_by_group) { + for (auto& sst : ssts) { + rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair", + sst->toc_filename(), repaired_at, sst->being_repaired); + } + auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts), + sstables::component_type::Statistics, modifier); + + // remove the old sstables from incremental repair meta and add the new ones + for (auto& ss : rewritten_sstables) { + bool erased = _incremental_repair_meta.sst_set->erase(ss.first); + if (erased) { + _incremental_repair_meta.sst_set->insert(ss.second); +======= + if (!_incremental_repair_meta.sst_set && !sstables.empty()) { + co_return; + } + + auto& table = _db.local().find_column_family(_schema->id()); + auto& cm = table.get_compaction_manager(); + int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1; + + // Keep the new sstables marked as being_repaired until repair_update_compaction_ctrl + // is called (after sstables_repaired_at is committed to Raft). This is an additional + // in-memory guard; the classifier itself also protects these sstables via the + // repaired_at > sstables_repaired_at check. + auto modifier = [repaired_at, session = _frozen_topology_guard] (sstables::sstable& new_sst) { + new_sst.update_repaired_at(repaired_at); + new_sst.mark_as_being_repaired(session); + }; + + std::unordered_map> sstables_by_group; + auto add_sstable = [&] (const sstables::shared_sstable& sst) { + if (sst->should_update_repaired_at(repaired_at)) { + auto& view = table.compaction_group_view_for_sstable(sst); + sstables_by_group[&view].push_back(sst); + } + }; + + if (_incremental_repair_meta.sst_set) { + _incremental_repair_meta.sst_set->for_each_sstable(add_sstable); + } + + for (auto& sst : sstables) { + add_sstable(sst); + } + + for (auto& [view, ssts] : sstables_by_group) { + for (auto& sst : ssts) { + rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair", + sst->toc_filename(), repaired_at, sst->being_repaired); + } + auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts), + sstables::component_type::Statistics, modifier); + + // remove the old sstables from incremental repair meta and add the new ones + for (auto& ss : rewritten_sstables) { + bool erased = _incremental_repair_meta.sst_set->erase(ss.first); + if (erased) { + _incremental_repair_meta.sst_set->insert(ss.second); +>>>>>>> 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired) } }); } diff --git a/replica/table.cc b/replica/table.cc index 366449e039..d6242f92be 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -842,14 +842,31 @@ private: return { idx, side }; } + // Returns true if the sstable is currently being repaired. Checks the in-memory + // being_repaired flag first, then falls back to a durable check: if the sstable's + // repaired_at equals sstables_repaired_at+1 and the tablet is undergoing repair + // (i.e. tablet_transition_kind::repair), the sstable belongs to the current repair + // round but sstables_repaired_at+1 hasn't been committed to Raft yet (race window). + bool is_being_repaired(const sstables::shared_sstable& sst, int64_t sstables_repaired_at) const noexcept { + if (!sst->being_repaired.uuid().is_null()) { + return true; + } + auto repaired_at = sst->get_stats_metadata().repaired_at; + if (repaired_at != sstables_repaired_at + 1) { + return false; + } + auto& cg = compaction_group_for_sstable(sst); + auto trinfo = tablet_map().get_tablet_transition_info(locator::tablet_id(cg.group_id())); + return trinfo && trinfo->transition == locator::tablet_transition_kind::repair; + } + repair_classifier_func make_repair_sstable_classifier_func() const { - // FIXME: implement it for incremental repair! - return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) { + return [this] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) { bool is_repaired = repair::is_repaired(sstables_repaired_at, sst); if (is_repaired) { return repair_sstable_classification::repaired; } else { - if (!sst->being_repaired.uuid().is_null()) { + if (is_being_repaired(sst, sstables_repaired_at)) { return repair_sstable_classification::repairing; } else { return repair_sstable_classification::unrepaired; diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 1dede2ad88..b67d0b761d 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -205,8 +205,16 @@ async def trigger_tablet_merge(manager, servers, logs): await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark) await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers) +<<<<<<< HEAD async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []): servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline) +||||||| parent of 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired) +async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []): + servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline) +======= +async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = [], tablets = 8): + servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline, tablets=tablets) +>>>>>>> 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired) repaired_keys = set(range(0, nr_keys)) unrepaired_keys = set() current_key = nr_keys @@ -899,3 +907,184 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage await manager.api.message_injection(s.ip_addr, "wait_after_prepare_sstables_for_incremental_repair") await drop_future + +# Reproducer for the race window bug in incremental repair where minor compaction +# promotes unrepaired data into the repaired sstable set. +# +# Root cause: after mark_sstable_as_repaired() writes new sstables with repaired_at=N+1 +# on all replicas, there is a window before the coordinator commits sstables_repaired_at=N+1 +# to Raft. During this window is_repaired() still uses the old threshold N, so +# repaired_at=N+1 does not satisfy repaired_at <= N and the sstables are misclassified as +# UNREPAIRED. Minor compaction can then merge them with a genuinely unrepaired sstable +# (repaired_at=0). Because compaction propagates max(repaired_at), the output carries +# repaired_at=N+1. Once sstables_repaired_at advances to N+1 the merged sstable is +# classified REPAIRED even though it contains post-repair data that was never part of the +# repair scan. Replicas that did not compact during this window keep that post-repair data +# in UNREPAIRED sstables. Future incremental repairs skip the REPAIRED sstable on the +# affected replica but process the UNREPAIRED sstable on the others, so the classification +# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC +# on the affected replica leading to data resurrection. +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient): + cmdline = ['--hinted-handoff-enabled', '0'] + servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \ + await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2) + + # Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the + # UNREPAIRED compaction view, making the race easy to trigger deterministically. + await cql.run_async( + f"ALTER TABLE {ks}.test WITH compaction = " + f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}" + ) + + # Disable autocompaction everywhere so we control exactly when compaction runs. + for s in servers: + await manager.api.disable_autocompaction(s.ip_addr, ks, 'test') + + scylla_path = await manager.server_get_exe(servers[0].server_id) + + # Repair 1: establishes sstables_repaired_at=1 on all nodes. + # Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in + # S0'(repaired_at=1) on all nodes. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental') + + # Insert keys 10-19 and flush on all nodes → S1(repaired_at=0). + # These will be the subject of repair 2. + repair2_keys = list(range(current_key, current_key + 10)) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys]) + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + current_key += 10 + + coord = await get_topology_coordinator(manager) + coord_serv = await find_server_by_host_id(manager, servers, coord) + coord_log = await manager.server_open_log(coord_serv.server_id) + coord_mark = await coord_log.mark() + + # Hold the race window open: prevent the coordinator from committing + # end_repair + sstables_repaired_at=2 to Raft. + await manager.api.enable_injection(coord_serv.ip_addr, "delay_end_repair_update", one_shot=False) + + repair_response = await manager.api.tablet_repair( + servers[0].ip_addr, ks, "test", token, + await_completion=False, incremental_mode="incremental" + ) + task_id = repair_response['tablet_task_id'] + + # "Finished tablet repair" is logged once per tablet after mark_sstable_as_repaired() + # has completed on all replicas for that tablet. With tablets=2 the coordinator logs + # this message twice (once per tablet). We must wait for BOTH before writing + # post-repair keys; waiting for only the first leaves the second tablet's repair in + # progress, which can flush the memtable and mark newly-flushed sstables as repaired, + # contaminating servers[0] and servers[2] with post-repair data in repaired sstables. + # After both tablets complete, S1 is fully rewritten as S1'(repaired_at=2, + # being_repaired=null) on every replica, but sstables_repaired_at in system.tablets is + # still 1, so is_repaired(1, S1'{repaired_at=2}) == false and S1' lands in the + # UNREPAIRED compaction view on every replica. The race window is now open. + pos, _ = await coord_log.wait_for("Finished tablet repair", from_mark=coord_mark) + await coord_log.wait_for("Finished tablet repair", from_mark=pos) + + # --- Race window is open --- + # Write post-repair keys 20-29. All nodes receive the writes into their memtables + # (RF=3, hinted handoff disabled). + post_repair_keys = list(range(current_key, current_key + 10)) + await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in post_repair_keys]) + current_key += 10 + + # Flush servers[1] BEFORE the restart so E(repaired_at=0, keys 20-29) lands on disk. + # At this point servers[1] holds on disk: + # S1' repaired_at=2 being_repaired=session_id (keys 10-19, from mark_sstable_as_repaired) + # E repaired_at=0 being_repaired=null (keys 20-29, genuine post-repair data) + # servers[0] and servers[2] still have keys 20-29 only in their memtables. + target = servers[1] + await manager.api.flush_keyspace(target.ip_addr, ks) + + # Restart servers[1]. being_repaired is in-memory and is lost on restart. + # After restart both S1' and E are loaded from disk with being_repaired=null. + # Without the classifier fix: is_repaired(sstables_repaired_at=1, S1'{repaired_at=2}) + # is false and being_repaired is null, so S1' lands in the UNREPAIRED view where + # autocompaction is active. STCS (min_threshold=2) immediately merges S1' and E into + # F(repaired_at=max(2,0)=2, keys 10-29), wrongly promoting E into the REPAIRED set. + # With the classifier fix: S1' has repaired_at==sstables_repaired_at+1 and the tablet + # is still in the `repair` stage, so it is classified REPAIRING (compaction disabled), + # and the merge never happens. + await manager.server_stop_gracefully(target.server_id) + await manager.server_start(target.server_id) + await manager.servers_see_each_other(servers) + + # Poll until compaction has produced F(repaired_at=2) containing post-repair keys, + # confirming that the bug was triggered (S1' and E merged during the race window). + deadline = time.time() + 60 + compaction_ran = False + while time.time() < deadline: + for sst in await get_sstables_for_server(manager, target, ks): + if get_repaired_at_from_sst(sst, scylla_path) == 2: + if set(get_keys_from_sst(sst, scylla_path)) & set(post_repair_keys): + compaction_ran = True + logger.info(f"Post-restart compaction produced F(repaired_at=2) with post-repair keys: {sst}") + break + if compaction_ran: + break + await asyncio.sleep(1) + + # --- Release the race window --- + await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update") + await manager.api.wait_task(servers[0].ip_addr, task_id) + + if not compaction_ran: + logger.warning("Compaction did not merge S1' and E after restart during the race window; " + "the bug was not triggered. Skipping assertion.") + return + + # Flush servers[0] and servers[2] AFTER the race window closes so their post-repair + # keys land in G(repaired_at=0): correctly classified as UNREPAIRED. + for s in [servers[0], servers[2]]: + await manager.api.flush_keyspace(s.ip_addr, ks) + + # Stop all servers so sstable files on disk are stable. + for s in servers: + await manager.server_stop_gracefully(s.server_id) + + post_repair_key_set = set(post_repair_keys) + + async def keys_in_repaired_sstables(server) -> set: + """Return the set of keys found in any sstable with repaired_at > 0 on this server.""" + result = set() + for sst in await get_sstables_for_server(manager, server, ks): + ra = get_repaired_at_from_sst(sst, scylla_path) + if ra is not None and ra > 0: + result.update(get_keys_from_sst(sst, scylla_path)) + return result + + repaired_keys_0 = await keys_in_repaired_sstables(servers[0]) + repaired_keys_1 = await keys_in_repaired_sstables(servers[1]) + repaired_keys_2 = await keys_in_repaired_sstables(servers[2]) + + logger.info(f"Post-repair keys in repaired sstables: " + f"servers[0]={len(repaired_keys_0 & post_repair_key_set)}, " + f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, " + f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}") + + # servers[0] and servers[2] flushed post-repair keys after the race window closed, + # so those keys are in G(repaired_at=0) → correctly UNREPAIRED. + assert not (repaired_keys_0 & post_repair_key_set), \ + f"servers[0] should not have post-repair keys in repaired sstables, " \ + f"got: {repaired_keys_0 & post_repair_key_set}" + assert not (repaired_keys_2 & post_repair_key_set), \ + f"servers[2] should not have post-repair keys in repaired sstables, " \ + f"got: {repaired_keys_2 & post_repair_key_set}" + + # BUG: servers[1] restarted during the race window, losing its in-memory being_repaired + # markers. S1'(repaired_at=2) and E(repaired_at=0) both landed in the UNREPAIRED + # compaction view and were merged into F(repaired_at=2) by autocompaction. After + # sstables_repaired_at advances to 2, F is classified REPAIRED even though it contains + # post-repair data that was never part of the repair scan. This diverges from servers[0] + # and servers[2] which keep those keys UNREPAIRED, enabling premature tombstone GC and + # data resurrection. + wrongly_promoted = repaired_keys_1 & post_repair_key_set + assert not wrongly_promoted, \ + f"BUG: {len(wrongly_promoted)} post-repair keys were wrongly promoted to REPAIRED " \ + f"on servers[1] after restart lost the being_repaired markers during the race window. " \ + f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ + f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"