From 07758bce68e6c044ffc4a75415aefafd0cc6fc42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dawid=20M=C4=99drek?= Date: Tue, 10 Feb 2026 16:41:42 +0100 Subject: [PATCH] test: topology_custom: Await sync points asynchronously There's a dedicated HTTP API for communicating with the cluster, so let's use it instead of yet another custom solution. (cherry picked from commit c5239edf2ab1d02068e8573c9de6556a56689414) --- test/topology_custom/test_hints.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/test/topology_custom/test_hints.py b/test/topology_custom/test_hints.py index 377d262f0d..7f93ffee68 100644 --- a/test/topology_custom/test_hints.py +++ b/test/topology_custom/test_hints.py @@ -7,14 +7,13 @@ import asyncio import pytest import time import logging -import requests import re from cassandra.cluster import NoHostAvailable # type: ignore from cassandra.policies import WhiteListRoundRobinPolicy from cassandra.query import SimpleStatement, ConsistencyLevel -from test.pylib.internal_types import IPAddress, ServerInfo +from test.pylib.internal_types import IPAddress from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error from test.pylib.tablets import get_tablet_replicas @@ -34,13 +33,13 @@ async def create_sync_point(client: TCPRESTClient, server_ip: IPAddress) -> str: response = await client.post_json("/hinted_handoff/sync_point", host=server_ip, port=10_000) return response -def await_sync_point(node: ServerInfo, sync_point: str, timeout: int) -> bool: +async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_point: str, timeout: int) -> bool: params = { "id": sync_point, "timeout": str(timeout) } - response = requests.get(f"http://{node.ip_addr}:10000/hinted_handoff/sync_point", params=params).json() + response = await client.get_json("/hinted_handoff/sync_point", host=server_ip, port=10_000, params=params) match response: case "IN_PROGRESS": return False @@ -148,12 +147,12 @@ async def test_sync_point(manager: ManagerClient): await manager.server_start(node2.server_id) await manager.server_sees_other_server(node1.ip_addr, node2.ip_addr) - assert not await_sync_point(node1, sync_point1, 30) + assert not (await await_sync_point(manager.api.client, node1.ip_addr, sync_point1, 30)) await manager.server_start(node3.server_id) await manager.server_sees_other_server(node1.ip_addr, node3.ip_addr) - assert await_sync_point(node1, sync_point1, 30) + assert await await_sync_point(manager.api.client, node1.ip_addr, sync_point1, 30) @pytest.mark.asyncio @@ -226,7 +225,8 @@ async def test_hints_consistency_during_decommission(manager: ManagerClient): await manager.api.disable_injection(srv.ip_addr, "hinted_handoff_pause_hint_replay") logger.info("Wait until hints are replayed from nodes 1 and 2") - await asyncio.gather(*(asyncio.to_thread(await_sync_point, srv, pt, timeout=30) for srv, pt in zip((server1, server2), sync_points))) + await asyncio.gather(*(await_sync_point(manager.api.client, srv.ip_addr, pt, timeout=30) + for srv, pt in zip((server1, server2), sync_points))) # Unpause streaming and let decommission finish logger.info("Unpause streaming") @@ -264,12 +264,9 @@ async def test_draining_hints(manager: ManagerClient): sync_point = await create_sync_point(manager.api.client, s1.ip_addr) await manager.server_start(s2.server_id) - async def wait(): - assert await_sync_point(s1, sync_point, 60) - async with asyncio.TaskGroup() as tg: _ = tg.create_task(manager.decommission_node(s1.server_id, timeout=60)) - _ = tg.create_task(wait()) + _ = tg.create_task(await_sync_point(manager.api.client, s1.ip_addr, sync_point, 60)) @pytest.mark.asyncio @skip_mode("release", "error injections are not supported in release mode") @@ -313,7 +310,7 @@ async def test_canceling_hint_draining(manager: ManagerClient): await s1_log.wait_for(f"Draining starts for {host_id2}", s1_mark) # Make sure draining finishes successfully. - assert await_sync_point(s1, sync_point, 60) + assert await await_sync_point(manager.api.client, s1.ip_addr, sync_point, 60) await s1_log.wait_for(f"Removed hint directory for {host_id2}") @pytest.mark.asyncio @@ -367,7 +364,7 @@ async def test_hint_to_pending(manager: ManagerClient): await wait_for(migration_reached_streaming, time.time() + 60) await manager.api.disable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay") - assert await_sync_point(servers[0], sync_point, 30) + assert await await_sync_point(manager.api.client, servers[0].ip_addr, sync_point, 30) await manager.api.message_injection(servers[0].ip_addr, "pause_after_streaming_tablet") done, pending = await asyncio.wait([tablet_migration])