test/storage: harden out-of-space prevention tests around restart and disk-utilization transitions

The tests in test_out_of_space_prevention.py are flaky. Three issues contribute:

1. After creating/removing the blob file that simulates disk pressure,
   the tests immediately checked derived state (e.g., "compaction_manager
   - Drained") without first confirming the disk space monitor had detected
   the utilization change. Fix: explicitly wait for "Reached/Dropped below
   critical disk utilization level" right after creating/removing the blob
   file, before checking downstream effects.

2. Several tests called `manager.driver_connect()` or omitted reconnection
   entirely after `server_restart()` / `server_start()`. The pre-existing
   driver session can silently reconnect multiple times, causing subsequent
   CQL queries to fail. Fix: call `reconnect_driver()` after every node restart.
   Additionally, call `wait_for_cql_and_get_hosts()` where CQL is used afterward,
   to ensure all connection pools are established.

3. Some log assertions used marks captured before a restart, so they could
   match pre-restart messages or miss messages emitted in the correct post-restart
   window. Fix: refresh marks at the right points.

Apart from that, the patch fixes a typo: autotoogle -> autotoggle.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-655

Closes scylladb/scylladb#28626

(cherry picked from commit 826fd5d6c3)

Closes scylladb/scylladb#28967

Closes scylladb/scylladb#29197
This commit is contained in:
Łukasz Paszkowski
2026-02-12 08:56:00 +01:00
committed by Avi Kivity
parent 5bdd6ca036
commit e5bd2f8679

View File

@@ -16,8 +16,8 @@ from cassandra.query import SimpleStatement
from typing import Callable
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table
from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table, reconnect_driver
from test.pylib.manager_client import ManagerClient, wait_for_cql_and_get_hosts
from test.pylib.tablets import get_tablet_count
from test.pylib.util import Host
from test.storage.conftest import space_limited_servers
@@ -81,6 +81,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -91,8 +92,9 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id)
await manager.driver_connect()
cql = manager.get_cql()
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -104,6 +106,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 1)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark)
@@ -112,7 +115,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
@pytest.mark.asyncio
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
cmdline = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
@@ -136,15 +139,20 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id)
await reconnect_driver(manager)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
@@ -177,7 +185,8 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain", from_mark=mark)
@pytest.mark.asyncio
@@ -202,6 +211,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
s1_mark, _ = await s1_log.wait_for("Reached the critical disk utilization level", from_mark=s1_mark)
for _ in range(2):
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
@@ -236,10 +246,13 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_wipe_sstables(servers[0].server_id, ks, table)
await manager.server_start(servers[0].server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
@@ -270,11 +283,13 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id, wait_others=2)
await manager.driver_connect()
await reconnect_driver(manager)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
logger.info("With blob file removed, wait for the tablet repair to succeed")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
await manager.api.wait_task(servers[0].ip_addr, task_id)
@@ -319,6 +334,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
@@ -329,6 +345,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
mark, _ = await log.wait_for("Streaming for tablet migration .* failed", from_mark=mark)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark)
@@ -377,6 +394,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
@@ -389,7 +407,11 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
mark = await log.mark()
await manager.server_restart(servers[0].server_id, wait_others=2)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
logger.info("Check if tablet split happened")
await assert_resize_task_info(table_id, lambda response: len(response) == 1 and response[0].resize_task_info is not None)
@@ -398,6 +420,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
assert await log.grep(f"compaction.*Split {cf}", from_mark=mark) == []
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)