Files
scylladb/test/cluster/test_batchlog_manager.py
Andrei Chekun cc5ac75d73 test.py: remove deprecated skip_mode decorator
Finishing the deprecation of the skip_mode function in favor of
pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests
that are still used and old way of skip_mode.

Closes scylladb/scylladb#28299
2026-01-25 18:17:27 +02:00

418 lines
20 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.cluster.util import new_test_keyspace, reconnect_driver, wait_for_cql_and_get_hosts
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_batchlog_replay_while_a_node_is_down(manager: ManagerClient) -> None:
""" Test that batchlog replay handles the case when a node is down while replaying a batch.
Reproduces issue #24599.
1. Create a cluster with 3 nodes.
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
needs to be replayed.
3. Stop server 1.
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
so it should fail.
5. Bring server 1 back up.
6. Verify that the batch is replayed and removed from the batchlog eventually.
"""
cmdline=['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
# make sure the batch is replayed only after the server is stopped
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
s0_log = await manager.server_open_log(servers[0].server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
except Exception as e:
# injected error is expected
logger.error(f"Error executing batch: {e}")
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
await manager.server_stop(servers[1].server_id)
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=hosts[0]))[0].count
assert batchlog_row_count > 0
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
# The batch is replayed while server 1 is down
await s0_log.wait_for('Replaying batch', timeout=60)
await asyncio.sleep(1)
# Bring server 1 back up and verify that eventually the batch is replayed and removed from the batchlog
await manager.server_start(servers[1].server_id)
s0_mark = await s0_log.mark()
await s0_log.wait_for('Finished replayAllFailedBatches', timeout=60, from_mark=s0_mark)
async def batchlog_empty() -> bool:
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=hosts[0]))[0].count
if batchlog_row_count == 0:
return True
await wait_for(batchlog_empty, time.time() + 60)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_batchlog_replay_aborted_on_shutdown(manager: ManagerClient) -> None:
""" Similar to the previous test, but also verifies that the batchlog replay is aborted on shutdown,
and node shutdown is not stuck.
1. Create a cluster with 3 nodes.
2. Write a batch and inject an error to fail it before it's removed from the batchlog, so it
needs to be replayed.
3. Stop server 1.
4. Server 0 tries to replay the batch. it sends the mutation to all replicas, but one of them is down,
so it should fail.
5. Shut down server 0 gracefully, which should abort the batchlog replay which is in progress.
6. Bring server 0 and server 1 back up.
6. Verify that the batch is replayed and removed from the batchlog eventually.
"""
cmdline=['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(3, config=config, cmdline=cmdline, auto_rack_dc="dc1")
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c))")
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False) for s in servers])
# make sure the batch is replayed only after the server is stopped
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "skip_batch_replay", one_shot=False) for s in servers])
s0_log = await manager.server_open_log(servers[0].server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.tab (key, c, v) VALUES (0,0,0); INSERT INTO {ks}.tab (key, c, v) VALUES (1,1,1); APPLY BATCH")
except Exception as e:
# injected error is expected
logger.error(f"Error executing batch: {e}")
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "storage_proxy_fail_remove_from_batchlog") for s in servers])
await manager.server_stop(servers[1].server_id)
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, "skip_batch_replay") for s in servers if s != servers[1]])
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=hosts[0]))[0].count
assert batchlog_row_count > 0
# The batch is replayed while server 1 is down
await s0_log.wait_for('Replaying batch', timeout=60)
await asyncio.sleep(1)
# verify shutdown is not stuck
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_start(servers[0].server_id)
await manager.server_start(servers[1].server_id)
cql = await reconnect_driver(manager)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
async def batchlog_empty() -> bool:
batchlog_row_count = (await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=hosts[0]))[0].count
if batchlog_row_count == 0:
return True
await wait_for(batchlog_empty, time.time() + 60)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_batchlog_replay_includes_cdc(manager: ManagerClient) -> None:
""" Test that when a batch is replayed from the batchlog, it includes CDC mutations.
1. Create a cluster with a single node.
2. Create a table with CDC enabled.
3. Write a batch and inject an error to fail it after it's written to the batchlog but before the mutation is applied.
4. Wait for the batch to be replayed.
5. Verify that the data is written to the base table.
6. Verify that CDC mutations are also applied and visible in the CDC log table.
"""
cmdline = ['--logger-log-level', 'batchlog_manager=trace']
config = {'error_injections_at_startup': ['short_batchlog_manager_replay_interval'], 'write_request_timeout_in_ms': 2000}
servers = await manager.servers_add(1, config=config, cmdline=cmdline)
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks:
# Create table with CDC enabled
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v int, PRIMARY KEY (key, c)) WITH cdc = {{'enabled': true}}")
# Enable error injection to make the batch fail after writing to batchlog
await manager.api.enable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog", one_shot=False)
# Execute a batch that will fail due to injection but be written to batchlog
try:
await cql.run_async(
"BEGIN BATCH " +
f"INSERT INTO {ks}.tab(key, c, v) VALUES (10, 20, 30); " +
f"INSERT INTO {ks}.tab(key, c, v) VALUES (40, 50, 60); " +
"APPLY BATCH"
)
except Exception as e:
logger.info(f"Expected error executing batch: {e}")
await manager.api.disable_injection(servers[0].ip_addr, "storage_proxy_fail_remove_from_batchlog")
# Wait for data to appear in the base table
async def data_written():
result1 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 10 AND c = 20")
result2 = await cql.run_async(f"SELECT * FROM {ks}.tab WHERE key = 40 AND c = 50")
if len(result1) > 0 and len(result2) > 0:
return True
await wait_for(data_written, time.time() + 60)
# Check that CDC log table exists and has the CDC mutations
cdc_table_name = f"{ks}.tab_scylla_cdc_log"
# Wait for CDC mutations to be visible
async def cdc_data_present():
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
if len(result1) > 0 and len(result2) > 0:
return True
await wait_for(cdc_data_present, time.time() + 60)
result1 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 10 ALLOW FILTERING")
assert len(result1) == 1, f"Expected 1 CDC mutation for key 10, got {len(result1)}"
result2 = await cql.run_async(f"SELECT * FROM {cdc_table_name} WHERE key = 40 ALLOW FILTERING")
assert len(result2) == 1, f"Expected 1 CDC mutation for key 40, got {len(result2)}"
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_drop_mutations_for_dropped_table(manager: ManagerClient) -> None:
"""
This test is an adjusted version of `test_batchlog_replay_while_a_node_is_down`.
We want to verify that batchlog replay is aborted when the corresponding table has been dropped.
This test reproduces scylladb/scylladb#24806.
1. Create a cluster with 2 nodes, a keyspace, and a table.
2. Enable error injections. We need to ensure that:
- The mutations are not removed from the batchlog.
- The mutations are not replayed from the batchlog before we drop the table.
3. Write a batch. Because of step 2, the mutations will stay in the batchlog.
4. Drop the table.
5. Resume batchlog replay.
6. Wait for the replay to finish. Verify that the batchlog is empty.
Note: This test will most likely work even with a 1-node cluster, but let's
use 2 nodes to make sure we're dealing with a realistic scenario.
"""
cmdline=["--logger-log-level", "batchlog_manager=trace"]
config = {"error_injections_at_startup": ["short_batchlog_manager_replay_interval"],
"write_request_timeout_in_ms": 2000}
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc="dc1")
s1, _ = servers
cql, hosts = await manager.get_ready_cql(servers)
host1, _ = hosts
async def get_batchlog_row_count():
rows = await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=host1)
row = rows[0]
assert hasattr(row, "count")
result = row.count
logger.debug(f"Batchlow row count={result}")
return result
async def enable_injection(injection: str) -> None:
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
async def disable_injection(injection: str) -> None:
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
# Make sure the mutations stay in the batchlog.
await enable_injection("storage_proxy_fail_remove_from_batchlog")
# Make sure the mutations are not replayed too early (i.e. before dropping the table).
await enable_injection("skip_batch_replay")
s1_log = await manager.server_open_log(s1.server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.my_table (pk, ck, v) VALUES (0,0,0);"
f"INSERT INTO {ks}.my_table (pk, ck, v) VALUES (1,1,1); APPLY BATCH")
except Exception as e:
# Injected error is expected.
logger.error(f"Error executing batch: {e}")
# Once the mutations are in the batchlog, waiting to be replayed, we can disable this.
await disable_injection("storage_proxy_fail_remove_from_batchlog")
batchlog_row_count = await get_batchlog_row_count()
assert batchlog_row_count > 0
await cql.run_async(f"DROP TABLE {ks}.my_table")
s1_mark = await s1_log.mark()
# Once the table is dropped, we can resume the replay. The bug can
# be triggered from now on (if it's present).
await disable_injection("skip_batch_replay")
# We don't need these, but let's keep them just so we know the replay
# is really going on and the mutations don't just disappear.
await s1_log.wait_for("Replaying batch", timeout=60, from_mark=s1_mark)
await s1_log.wait_for("Finished replayAllFailedBatches", timeout=60, from_mark=s1_mark)
async def batchlog_empty() -> bool:
batchlog_row_count = await get_batchlog_row_count()
return True if batchlog_row_count == 0 else None
await wait_for(batchlog_empty, time.time() + 60)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.parametrize("repair_cache", [True, False])
async def test_batchlog_replay_failure_during_repair(manager: ManagerClient, repair_cache: bool) -> None:
"""
We want to verify that repair_time will not be updated if batchlog replay fails.
This test reproduces scylladb/scylladb#24415.
1. Create a cluster with 2 nodes, a keyspace, and a table.
2. Do some preparations to ensure that mutations won't be removed from the batchlog.
3. Write a batch.
4. Delete a row (with the key that is modified in the batch).
5. Enable injections to make batchlog replay fail.
6. Run repair on the table.
7. Compact the keyspace.
8. Disable injection and wait for the replay to finish.
9. Verify that the row is deleted and there is no data resurrection.
"""
write_timeout_ms = 2000
cmdline=['--enable-cache', '0',
"--hinted-handoff-enabled", "0",
"--logger-log-level", "batchlog_manager=trace:repair=debug",
"--repair-hints-batchlog-flush-cache-time-in-ms", "1000000" if repair_cache else "0"]
config = {"error_injections_at_startup": ["short_batchlog_manager_replay_interval"],
"write_request_timeout_in_ms": write_timeout_ms,
'group0_tombstone_gc_refresh_interval_in_ms': 1000,
'tablets_mode_for_new_keyspaces': 'disabled'
}
servers = await manager.servers_add(2, config=config, cmdline=cmdline, auto_rack_dc="dc1")
s1, s2 = servers
cql, hosts = await manager.get_ready_cql(servers)
host1, _ = hosts
async def get_batchlog_row_count():
rows = await cql.run_async("SELECT COUNT(*) FROM system.batchlog_v2", host=host1)
row = rows[0]
assert hasattr(row, "count")
result = row.count
logger.debug(f"Batchlow row count={result}")
return result
async def enable_injection(injection: str) -> None:
await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, injection, one_shot=False) for s in servers])
async def disable_injection(injection: str) -> None:
await asyncio.gather(*[manager.api.disable_injection(s.ip_addr, injection) for s in servers])
async def batchlog_empty() -> bool:
batchlog_row_count = await get_batchlog_row_count()
return True if batchlog_row_count == 0 else None
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.my_table (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '1'}} AND compaction = {{ 'class' : 'NullCompactionStrategy' }}")
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.my_table (pk, ck, v) VALUES (0,0,0);"
f"INSERT INTO {ks}.my_table (pk, ck, v) VALUES (1,1,1); APPLY BATCH")
await manager.api.flush_keyspace(s1.ip_addr, ks)
await manager.api.flush_keyspace(s2.ip_addr, ks)
await wait_for(batchlog_empty, time.time() + 60)
# Make sure the mutations stay in the batchlog.
await enable_injection("storage_proxy_fail_remove_from_batchlog")
# Make sure the mutations are not replayed too early (i.e. before dropping the table).
await enable_injection("skip_batch_replay")
s1_log = await manager.server_open_log(s1.server_id)
try:
await cql.run_async(f"BEGIN BATCH INSERT INTO {ks}.my_table (pk, ck, v) VALUES (0,0,1);"
f"INSERT INTO {ks}.my_table (pk, ck, v) VALUES (1,1,2); APPLY BATCH")
except Exception as e:
# Injected error is expected.
logger.error(f"Error executing batch: {e}")
await cql.run_async(f"DELETE FROM {ks}.my_table WHERE pk=0 AND ck=0")
await asyncio.sleep((write_timeout_ms // 1000) * 2 + 1)
batchlog_row_count = await get_batchlog_row_count()
assert batchlog_row_count > 0
s1_log = await manager.server_open_log(s1.server_id)
s1_mark = await s1_log.mark()
# Once the mutations are in the batchlog, waiting to be replayed, we can disable this.
await disable_injection("storage_proxy_fail_remove_from_batchlog")
await enable_injection("storage_proxy_fail_replay_batch")
# Once the table is dropped, we can resume the replay. The bug can
# be triggered from now on (if it's present).
await disable_injection("skip_batch_replay")
await s1_log.wait_for("Batchlog replay on shard 0: done", timeout=60, from_mark=s1_mark)
await s1_log.wait_for("Batchlog replay on shard 1: done", timeout=60, from_mark=s1_mark)
await manager.api.repair(s1.ip_addr, ks, "my_table")
await manager.api.flush_keyspace(s1.ip_addr, ks)
await manager.api.flush_keyspace(s2.ip_addr, ks)
await manager.api.keyspace_compaction(s1.ip_addr, ks)
await manager.api.keyspace_compaction(s2.ip_addr, ks)
await disable_injection("storage_proxy_fail_replay_batch")
await s1_log.wait_for("Replaying batch", timeout=60, from_mark=s1_mark)
await s1_log.wait_for("Finished replayAllFailedBatches", timeout=60, from_mark=s1_mark)
await wait_for(batchlog_empty, time.time() + 60)
batchlog_row_count = await get_batchlog_row_count()
assert batchlog_row_count == 0
res = await cql.run_async(f"SELECT * FROM {ks}.my_table WHERE pk=0 BYPASS CACHE")
assert len(res) == 0