repair/replica: Fix race window where post-repair data is wrongly promoted to repaired

During incremental repair, each tablet replica holds three SSTable views:
UNREPAIRED, REPAIRING, and REPAIRED.  The repair lifecycle is:

  1. Replicas snapshot unrepaired SSTables and mark them REPAIRING.
  2. Row-level repair streams missing rows between replicas.
  3. mark_sstable_as_repaired() runs on all replicas, rewriting the
     SSTables with repaired_at = sstables_repaired_at + 1 (e.g. N+1).
  4. The coordinator atomically commits sstables_repaired_at=N+1 and
     the end_repair stage to Raft, then broadcasts
     repair_update_compaction_ctrl which calls clear_being_repaired().

The bug lives in the window between steps 3 and 4.  After step 3, each
replica has on-disk SSTables with repaired_at=N+1, but sstables_repaired_at
in Raft is still N.  The classifier therefore sees:

  is_repaired(N, sst{repaired_at=N+1}) == false
  sst->being_repaired == null   (lost on restart, or not yet set)

and puts them in the UNREPAIRED view.  If a new write arrives and is
flushed (repaired_at=0), STCS minor compaction can fire immediately and
merge the two SSTables.  The output gets repaired_at = max(N+1, 0) = N+1
because compaction preserves the maximum repaired_at of its inputs.

Once step 4 commits sstables_repaired_at=N+1, the compacted output is
classified REPAIRED on the affected replica even though it contains data
that was never part of the repair scan.  Other replicas, which did not
experience this compaction, classify the same rows as UNREPAIRED.  This
divergence is never healed by future repairs because the repaired set is
considered authoritative.  The result is data resurrection: deleted rows
can reappear after the next compaction that merges unrepaired data with the
wrongly-promoted repaired SSTable.

The fix has two layers:

Layer 1 (in-memory, fast path): mark_sstable_as_repaired() now also calls
mark_as_being_repaired(session) on the new SSTables it writes.  This keeps
them in the REPAIRING view from the moment they are created until
repair_update_compaction_ctrl clears the flag after step 4, covering the
race window in the normal (no-restart) case.

Layer 2 (durable, restart-safe): a new is_being_repaired() helper on
tablet_storage_group_manager detects the race window even after a node
restart, when being_repaired has been lost from memory.  It checks:

  sst.repaired_at == sstables_repaired_at + 1
  AND tablet transition kind == tablet_transition_kind::repair

Both conditions survive restarts: repaired_at is on-disk in SSTable
metadata, and the tablet transition is persisted in Raft.  Once the
coordinator commits sstables_repaired_at=N+1 (step 4), is_repaired()
returns true and the SSTable naturally moves to the REPAIRED view.

The classifier in make_repair_sstable_classifier_func() is updated to call
is_being_repaired(sst, sstables_repaired_at) in place of the previous
sst->being_repaired.uuid().is_null() check.

A new test, test_incremental_repair_race_window_promotes_unrepaired_data,
reproduces the bug by:
  - Running repair round 1 to establish sstables_repaired_at=1.
  - Injecting delay_end_repair_update to hold the race window open.
  - Running repair round 2 so all replicas complete mark_sstable_as_repaired
    (repaired_at=2) but the coordinator has not yet committed step 4.
  - Writing post-repair keys to all replicas and flushing servers[1] to
    create an SSTable with repaired_at=0 on disk.
  - Restarting servers[1] so being_repaired is lost from memory.
  - Waiting for autocompaction to merge the two SSTables on servers[1].
  - Asserting that the merged SSTable contains post-repair keys (the bug)
    and that servers[0] and servers[2] do not see those keys as repaired.

NOTE FOR MAINTAINER: Copilot initially only implemented Layer 1 (the
in-memory being_repaired guard), missing the restart scenario entirely.
I pointed out that being_repaired is lost on restart and guided Copilot
to add the durable Layer 2 check.  I also polished the implementation:
moving is_being_repaired into tablet_storage_group_manager so it can
reuse the already-held _tablet_map (avoiding an ERM lookup and try/catch),
passing sstables_repaired_at in from the classifier to avoid re-reading it,
and using compaction_group_for_sstable inside the function rather than
threading a tablet_id parameter through the classifier.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-1239.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

Closes scylladb/scylladb#29244

(cherry picked from commit 16e387d5f9)
This commit is contained in:
Raphael S. Carvalho
2026-03-25 21:11:45 -03:00
committed by scylladbbot
parent da53b8798f
commit cc3dcc4ba8
3 changed files with 300 additions and 3 deletions

View File

@@ -2171,6 +2171,7 @@ public:
public:
future<> mark_sstable_as_repaired() {
auto& sstables = _repair_writer->get_sstable_list_to_mark_as_repaired();
<<<<<<< HEAD
if (_incremental_repair_meta.sst_set || !sstables.empty()) {
co_await seastar::async([&] {
auto do_mark_sstable_as_repaired = [&] (const sstables::shared_sstable& sst, const sstring& type) {
@@ -2188,6 +2189,96 @@ public:
for (auto& sst : sstables) {
seastar::thread::maybe_yield();
do_mark_sstable_as_repaired(sst, "repair_produced");
||||||| parent of 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired)
if (!_incremental_repair_meta.sst_set && !sstables.empty()) {
co_return;
}
auto& table = _db.local().find_column_family(_schema->id());
auto& cm = table.get_compaction_manager();
int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1;
auto modifier = [repaired_at] (sstables::sstable& new_sst) {
new_sst.update_repaired_at(repaired_at);
};
std::unordered_map<compaction::compaction_group_view*, std::vector<sstables::shared_sstable>> sstables_by_group;
auto add_sstable = [&] (const sstables::shared_sstable& sst) {
if (sst->should_update_repaired_at(repaired_at)) {
auto& view = table.compaction_group_view_for_sstable(sst);
sstables_by_group[&view].push_back(sst);
}
};
if (_incremental_repair_meta.sst_set) {
_incremental_repair_meta.sst_set->for_each_sstable(add_sstable);
}
for (auto& sst : sstables) {
add_sstable(sst);
}
for (auto& [view, ssts] : sstables_by_group) {
for (auto& sst : ssts) {
rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair",
sst->toc_filename(), repaired_at, sst->being_repaired);
}
auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts),
sstables::component_type::Statistics, modifier);
// remove the old sstables from incremental repair meta and add the new ones
for (auto& ss : rewritten_sstables) {
bool erased = _incremental_repair_meta.sst_set->erase(ss.first);
if (erased) {
_incremental_repair_meta.sst_set->insert(ss.second);
=======
if (!_incremental_repair_meta.sst_set && !sstables.empty()) {
co_return;
}
auto& table = _db.local().find_column_family(_schema->id());
auto& cm = table.get_compaction_manager();
int64_t repaired_at = _incremental_repair_meta.sstables_repaired_at + 1;
// Keep the new sstables marked as being_repaired until repair_update_compaction_ctrl
// is called (after sstables_repaired_at is committed to Raft). This is an additional
// in-memory guard; the classifier itself also protects these sstables via the
// repaired_at > sstables_repaired_at check.
auto modifier = [repaired_at, session = _frozen_topology_guard] (sstables::sstable& new_sst) {
new_sst.update_repaired_at(repaired_at);
new_sst.mark_as_being_repaired(session);
};
std::unordered_map<compaction::compaction_group_view*, std::vector<sstables::shared_sstable>> sstables_by_group;
auto add_sstable = [&] (const sstables::shared_sstable& sst) {
if (sst->should_update_repaired_at(repaired_at)) {
auto& view = table.compaction_group_view_for_sstable(sst);
sstables_by_group[&view].push_back(sst);
}
};
if (_incremental_repair_meta.sst_set) {
_incremental_repair_meta.sst_set->for_each_sstable(add_sstable);
}
for (auto& sst : sstables) {
add_sstable(sst);
}
for (auto& [view, ssts] : sstables_by_group) {
for (auto& sst : ssts) {
rlogger.info("Marking sstable={} repaired_at={} being_repaired={} for incremental repair",
sst->toc_filename(), repaired_at, sst->being_repaired);
}
auto rewritten_sstables = co_await cm.perform_component_rewrite(*view, tasks::task_info{}, std::move(ssts),
sstables::component_type::Statistics, modifier);
// remove the old sstables from incremental repair meta and add the new ones
for (auto& ss : rewritten_sstables) {
bool erased = _incremental_repair_meta.sst_set->erase(ss.first);
if (erased) {
_incremental_repair_meta.sst_set->insert(ss.second);
>>>>>>> 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired)
}
});
}

View File

@@ -842,14 +842,31 @@ private:
return { idx, side };
}
// Returns true if the sstable is currently being repaired. Checks the in-memory
// being_repaired flag first, then falls back to a durable check: if the sstable's
// repaired_at equals sstables_repaired_at+1 and the tablet is undergoing repair
// (i.e. tablet_transition_kind::repair), the sstable belongs to the current repair
// round but sstables_repaired_at+1 hasn't been committed to Raft yet (race window).
bool is_being_repaired(const sstables::shared_sstable& sst, int64_t sstables_repaired_at) const noexcept {
if (!sst->being_repaired.uuid().is_null()) {
return true;
}
auto repaired_at = sst->get_stats_metadata().repaired_at;
if (repaired_at != sstables_repaired_at + 1) {
return false;
}
auto& cg = compaction_group_for_sstable(sst);
auto trinfo = tablet_map().get_tablet_transition_info(locator::tablet_id(cg.group_id()));
return trinfo && trinfo->transition == locator::tablet_transition_kind::repair;
}
repair_classifier_func make_repair_sstable_classifier_func() const {
// FIXME: implement it for incremental repair!
return [] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
return [this] (const sstables::shared_sstable& sst, int64_t sstables_repaired_at) {
bool is_repaired = repair::is_repaired(sstables_repaired_at, sst);
if (is_repaired) {
return repair_sstable_classification::repaired;
} else {
if (!sst->being_repaired.uuid().is_null()) {
if (is_being_repaired(sst, sstables_repaired_at)) {
return repair_sstable_classification::repairing;
} else {
return repair_sstable_classification::unrepaired;

View File

@@ -205,8 +205,16 @@ async def trigger_tablet_merge(manager, servers, logs):
await s1_log.wait_for('Detected tablet merge for table', from_mark=s1_mark)
await inject_error_off(manager, "tablet_force_tablet_count_decrease", servers)
<<<<<<< HEAD
async def preapre_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
||||||| parent of 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired)
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = []):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline)
=======
async def prepare_cluster_for_incremental_repair(manager, nr_keys = 100 , cmdline = [], tablets = 8):
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, nr_keys=nr_keys, cmdline=cmdline, tablets=tablets)
>>>>>>> 16e387d5f9 (repair/replica: Fix race window where post-repair data is wrongly promoted to repaired)
repaired_keys = set(range(0, nr_keys))
unrepaired_keys = set()
current_key = nr_keys
@@ -899,3 +907,184 @@ async def test_tablet_incremental_repair_table_drop_compaction_group_gone(manage
await manager.api.message_injection(s.ip_addr, "wait_after_prepare_sstables_for_incremental_repair")
await drop_future
# Reproducer for the race window bug in incremental repair where minor compaction
# promotes unrepaired data into the repaired sstable set.
#
# Root cause: after mark_sstable_as_repaired() writes new sstables with repaired_at=N+1
# on all replicas, there is a window before the coordinator commits sstables_repaired_at=N+1
# to Raft. During this window is_repaired() still uses the old threshold N, so
# repaired_at=N+1 does not satisfy repaired_at <= N and the sstables are misclassified as
# UNREPAIRED. Minor compaction can then merge them with a genuinely unrepaired sstable
# (repaired_at=0). Because compaction propagates max(repaired_at), the output carries
# repaired_at=N+1. Once sstables_repaired_at advances to N+1 the merged sstable is
# classified REPAIRED even though it contains post-repair data that was never part of the
# repair scan. Replicas that did not compact during this window keep that post-repair data
# in UNREPAIRED sstables. Future incremental repairs skip the REPAIRED sstable on the
# affected replica but process the UNREPAIRED sstable on the others, so the classification
# divergence is never corrected. In tombstone scenarios this enables premature tombstone GC
# on the affected replica leading to data resurrection.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: ManagerClient):
cmdline = ['--hinted-handoff-enabled', '0']
servers, cql, hosts, ks, table_id, logs, _, _, current_key, token = \
await prepare_cluster_for_incremental_repair(manager, nr_keys=10, cmdline=cmdline, tablets=2)
# Lower min_threshold to 2 so STCS fires as soon as two sstables appear in the
# UNREPAIRED compaction view, making the race easy to trigger deterministically.
await cql.run_async(
f"ALTER TABLE {ks}.test WITH compaction = "
f"{{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 2, 'max_threshold': 4}}"
)
# Disable autocompaction everywhere so we control exactly when compaction runs.
for s in servers:
await manager.api.disable_autocompaction(s.ip_addr, ks, 'test')
scylla_path = await manager.server_get_exe(servers[0].server_id)
# Repair 1: establishes sstables_repaired_at=1 on all nodes.
# Keys 0-9 (inserted by preapre_cluster_for_incremental_repair) end up in
# S0'(repaired_at=1) on all nodes.
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
# Insert keys 10-19 and flush on all nodes → S1(repaired_at=0).
# These will be the subject of repair 2.
repair2_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in repair2_keys])
for s in servers:
await manager.api.flush_keyspace(s.ip_addr, ks)
current_key += 10
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
# Hold the race window open: prevent the coordinator from committing
# end_repair + sstables_repaired_at=2 to Raft.
await manager.api.enable_injection(coord_serv.ip_addr, "delay_end_repair_update", one_shot=False)
repair_response = await manager.api.tablet_repair(
servers[0].ip_addr, ks, "test", token,
await_completion=False, incremental_mode="incremental"
)
task_id = repair_response['tablet_task_id']
# "Finished tablet repair" is logged once per tablet after mark_sstable_as_repaired()
# has completed on all replicas for that tablet. With tablets=2 the coordinator logs
# this message twice (once per tablet). We must wait for BOTH before writing
# post-repair keys; waiting for only the first leaves the second tablet's repair in
# progress, which can flush the memtable and mark newly-flushed sstables as repaired,
# contaminating servers[0] and servers[2] with post-repair data in repaired sstables.
# After both tablets complete, S1 is fully rewritten as S1'(repaired_at=2,
# being_repaired=null) on every replica, but sstables_repaired_at in system.tablets is
# still 1, so is_repaired(1, S1'{repaired_at=2}) == false and S1' lands in the
# UNREPAIRED compaction view on every replica. The race window is now open.
pos, _ = await coord_log.wait_for("Finished tablet repair", from_mark=coord_mark)
await coord_log.wait_for("Finished tablet repair", from_mark=pos)
# --- Race window is open ---
# Write post-repair keys 20-29. All nodes receive the writes into their memtables
# (RF=3, hinted handoff disabled).
post_repair_keys = list(range(current_key, current_key + 10))
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k})") for k in post_repair_keys])
current_key += 10
# Flush servers[1] BEFORE the restart so E(repaired_at=0, keys 20-29) lands on disk.
# At this point servers[1] holds on disk:
# S1' repaired_at=2 being_repaired=session_id (keys 10-19, from mark_sstable_as_repaired)
# E repaired_at=0 being_repaired=null (keys 20-29, genuine post-repair data)
# servers[0] and servers[2] still have keys 20-29 only in their memtables.
target = servers[1]
await manager.api.flush_keyspace(target.ip_addr, ks)
# Restart servers[1]. being_repaired is in-memory and is lost on restart.
# After restart both S1' and E are loaded from disk with being_repaired=null.
# Without the classifier fix: is_repaired(sstables_repaired_at=1, S1'{repaired_at=2})
# is false and being_repaired is null, so S1' lands in the UNREPAIRED view where
# autocompaction is active. STCS (min_threshold=2) immediately merges S1' and E into
# F(repaired_at=max(2,0)=2, keys 10-29), wrongly promoting E into the REPAIRED set.
# With the classifier fix: S1' has repaired_at==sstables_repaired_at+1 and the tablet
# is still in the `repair` stage, so it is classified REPAIRING (compaction disabled),
# and the merge never happens.
await manager.server_stop_gracefully(target.server_id)
await manager.server_start(target.server_id)
await manager.servers_see_each_other(servers)
# Poll until compaction has produced F(repaired_at=2) containing post-repair keys,
# confirming that the bug was triggered (S1' and E merged during the race window).
deadline = time.time() + 60
compaction_ran = False
while time.time() < deadline:
for sst in await get_sstables_for_server(manager, target, ks):
if get_repaired_at_from_sst(sst, scylla_path) == 2:
if set(get_keys_from_sst(sst, scylla_path)) & set(post_repair_keys):
compaction_ran = True
logger.info(f"Post-restart compaction produced F(repaired_at=2) with post-repair keys: {sst}")
break
if compaction_ran:
break
await asyncio.sleep(1)
# --- Release the race window ---
await manager.api.disable_injection(coord_serv.ip_addr, "delay_end_repair_update")
await manager.api.wait_task(servers[0].ip_addr, task_id)
if not compaction_ran:
logger.warning("Compaction did not merge S1' and E after restart during the race window; "
"the bug was not triggered. Skipping assertion.")
return
# Flush servers[0] and servers[2] AFTER the race window closes so their post-repair
# keys land in G(repaired_at=0): correctly classified as UNREPAIRED.
for s in [servers[0], servers[2]]:
await manager.api.flush_keyspace(s.ip_addr, ks)
# Stop all servers so sstable files on disk are stable.
for s in servers:
await manager.server_stop_gracefully(s.server_id)
post_repair_key_set = set(post_repair_keys)
async def keys_in_repaired_sstables(server) -> set:
"""Return the set of keys found in any sstable with repaired_at > 0 on this server."""
result = set()
for sst in await get_sstables_for_server(manager, server, ks):
ra = get_repaired_at_from_sst(sst, scylla_path)
if ra is not None and ra > 0:
result.update(get_keys_from_sst(sst, scylla_path))
return result
repaired_keys_0 = await keys_in_repaired_sstables(servers[0])
repaired_keys_1 = await keys_in_repaired_sstables(servers[1])
repaired_keys_2 = await keys_in_repaired_sstables(servers[2])
logger.info(f"Post-repair keys in repaired sstables: "
f"servers[0]={len(repaired_keys_0 & post_repair_key_set)}, "
f"servers[1]={len(repaired_keys_1 & post_repair_key_set)}, "
f"servers[2]={len(repaired_keys_2 & post_repair_key_set)}")
# servers[0] and servers[2] flushed post-repair keys after the race window closed,
# so those keys are in G(repaired_at=0) → correctly UNREPAIRED.
assert not (repaired_keys_0 & post_repair_key_set), \
f"servers[0] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_0 & post_repair_key_set}"
assert not (repaired_keys_2 & post_repair_key_set), \
f"servers[2] should not have post-repair keys in repaired sstables, " \
f"got: {repaired_keys_2 & post_repair_key_set}"
# BUG: servers[1] restarted during the race window, losing its in-memory being_repaired
# markers. S1'(repaired_at=2) and E(repaired_at=0) both landed in the UNREPAIRED
# compaction view and were merged into F(repaired_at=2) by autocompaction. After
# sstables_repaired_at advances to 2, F is classified REPAIRED even though it contains
# post-repair data that was never part of the repair scan. This diverges from servers[0]
# and servers[2] which keep those keys UNREPAIRED, enabling premature tombstone GC and
# data resurrection.
wrongly_promoted = repaired_keys_1 & post_repair_key_set
assert not wrongly_promoted, \
f"BUG: {len(wrongly_promoted)} post-repair keys were wrongly promoted to REPAIRED " \
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"