diff --git a/db/batchlog_manager.cc b/db/batchlog_manager.cc index c391adb022..9421a756ba 100644 --- a/db/batchlog_manager.cc +++ b/db/batchlog_manager.cc @@ -77,9 +77,11 @@ future db::batchlog_manager::do_batch_log_replay(post_ }); }); } - co_await bm.container().invoke_on_all([last_replay] (auto& bm) { - bm._last_replay = last_replay; - }); + if (all_replayed == all_batches_replayed::yes) { + co_await bm.container().invoke_on_all([last_replay] (auto& bm) { + bm._last_replay = last_replay; + }); + } blogger.debug("Batchlog replay on shard {}: done", dest); co_return all_replayed; }); @@ -188,6 +190,7 @@ future db::batchlog_manager::replay_all_failed_batches if (utils::get_local_injector().is_enabled("skip_batch_replay")) { blogger.debug("Skipping batch replay due to skip_batch_replay injection"); + all_replayed = all_batches_replayed::no; co_return stop_iteration::no; } diff --git a/test/cluster/test_batchlog_manager.py b/test/cluster/test_batchlog_manager.py index ababad2a8d..3c4ee70e5e 100644 --- a/test/cluster/test_batchlog_manager.py +++ b/test/cluster/test_batchlog_manager.py @@ -295,7 +295,8 @@ async def test_drop_mutations_for_dropped_table(manager: ManagerClient) -> None: @pytest.mark.asyncio @skip_mode("release", "error injections are not supported in release mode") -async def test_batchlog_replay_failure_during_repair(manager: ManagerClient) -> None: +@pytest.mark.parametrize("repair_cache", [True, False]) +async def test_batchlog_replay_failure_during_repair(manager: ManagerClient, repair_cache: bool) -> None: """ We want to verify that repair_time will not be updated if batchlog replay fails. @@ -315,7 +316,7 @@ async def test_batchlog_replay_failure_during_repair(manager: ManagerClient) -> cmdline=['--enable-cache', '0', "--hinted-handoff-enabled", "0", "--logger-log-level", "batchlog_manager=trace:repair=debug", - "--repair-hints-batchlog-flush-cache-time-in-ms", "0"] + "--repair-hints-batchlog-flush-cache-time-in-ms", "1000000" if repair_cache else "0"] config = {"error_injections_at_startup": ["short_batchlog_manager_replay_interval"], "write_request_timeout_in_ms": 2000, 'group0_tombstone_gc_refresh_interval_in_ms': 1000, @@ -390,6 +391,9 @@ async def test_batchlog_replay_failure_during_repair(manager: ManagerClient) -> # be triggered from now on (if it's present). await disable_injection("skip_batch_replay") + await s1_log.wait_for("Batchlog replay on shard 0: done", timeout=60, from_mark=s1_mark) + await s1_log.wait_for("Batchlog replay on shard 1: done", timeout=60, from_mark=s1_mark) + await manager.api.repair(s1.ip_addr, ks, "my_table") await manager.api.flush_keyspace(s1.ip_addr, ks)