diff --git a/test/storage/test_out_of_space_prevention.py b/test/storage/test_out_of_space_prevention.py index 5e29ab8c26..83e94622ef 100644 --- a/test/storage/test_out_of_space_prevention.py +++ b/test/storage/test_out_of_space_prevention.py @@ -16,8 +16,8 @@ from cassandra.query import SimpleStatement from typing import Callable from test.cluster.conftest import skip_mode -from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table -from test.pylib.manager_client import ManagerClient +from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table, reconnect_driver +from test.pylib.manager_client import ManagerClient, wait_for_cql_and_get_hosts from test.pylib.tablets import get_tablet_count from test.pylib.util import Host from test.storage.conftest import space_limited_servers @@ -81,6 +81,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark) @@ -91,8 +92,9 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca logger.info("Restart the node") mark = await log.mark() await manager.server_restart(servers[0].server_id) - await manager.driver_connect() - cql = manager.get_cql() + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) + cql = await reconnect_driver(manager) + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) for _ in range(2): mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark) @@ -104,6 +106,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 1) logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level") + mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark) @@ -112,7 +115,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca @pytest.mark.asyncio -async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None: +async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None: cmdline = [*global_cmdline, "--logger-log-level", "compaction=debug"] async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers: @@ -136,15 +139,20 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark) logger.info("Restart the node") + mark = await log.mark() await manager.server_restart(servers[0].server_id) + await reconnect_driver(manager) + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark) logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level") + mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark) @@ -177,7 +185,8 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory: logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): - await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain") + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) + await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain", from_mark=mark) @pytest.mark.asyncio @@ -202,6 +211,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + s1_mark, _ = await s1_log.wait_for("Reached the critical disk utilization level", from_mark=s1_mark) for _ in range(2): s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark) @@ -236,10 +246,13 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable) await manager.server_stop_gracefully(servers[0].server_id) await manager.server_wipe_sstables(servers[0].server_id, ks, table) await manager.server_start(servers[0].server_id) + cql = await reconnect_driver(manager) + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("repair - Drained", from_mark=mark) @@ -270,11 +283,13 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable) logger.info("Restart the node") mark = await log.mark() await manager.server_restart(servers[0].server_id, wait_others=2) - await manager.driver_connect() + await reconnect_driver(manager) + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("repair - Drained", from_mark=mark) logger.info("With blob file removed, wait for the tablet repair to succeed") + mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark) await manager.api.wait_task(servers[0].ip_addr, task_id) @@ -319,6 +334,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark) @@ -329,6 +345,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol mark, _ = await log.wait_for("Streaming for tablet migration .* failed", from_mark=mark) logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level") + mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark) @@ -377,6 +394,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f logger.info("Create a big file on the target node to reach critical disk utilization level") disk_info = psutil.disk_usage(workdir) with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used): + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark) @@ -389,7 +407,11 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};") await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split") + mark = await log.mark() await manager.server_restart(servers[0].server_id, wait_others=2) + cql = await reconnect_driver(manager) + await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark) logger.info("Check if tablet split happened") await assert_resize_task_info(table_id, lambda response: len(response) == 1 and response[0].resize_task_info is not None) @@ -398,6 +420,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f assert await log.grep(f"compaction.*Split {cf}", from_mark=mark) == [] logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level") + mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark) for _ in range(2): mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark) mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)