test: topology_custom: Await sync points asynchronously

There's a dedicated HTTP API for communicating with the cluster, so
let's use it instead of yet another custom solution.

(cherry picked from commit c5239edf2a)
This commit is contained in:
Dawid Mędrek
2026-02-10 16:41:42 +01:00
parent 45577072fa
commit 07758bce68

View File

@@ -7,14 +7,13 @@ import asyncio
import pytest
import time
import logging
import requests
import re
from cassandra.cluster import NoHostAvailable # type: ignore
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.internal_types import IPAddress, ServerInfo
from test.pylib.internal_types import IPAddress
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import ScyllaMetricsClient, TCPRESTClient, inject_error
from test.pylib.tablets import get_tablet_replicas
@@ -34,13 +33,13 @@ async def create_sync_point(client: TCPRESTClient, server_ip: IPAddress) -> str:
response = await client.post_json("/hinted_handoff/sync_point", host=server_ip, port=10_000)
return response
def await_sync_point(node: ServerInfo, sync_point: str, timeout: int) -> bool:
async def await_sync_point(client: TCPRESTClient, server_ip: IPAddress, sync_point: str, timeout: int) -> bool:
params = {
"id": sync_point,
"timeout": str(timeout)
}
response = requests.get(f"http://{node.ip_addr}:10000/hinted_handoff/sync_point", params=params).json()
response = await client.get_json("/hinted_handoff/sync_point", host=server_ip, port=10_000, params=params)
match response:
case "IN_PROGRESS":
return False
@@ -148,12 +147,12 @@ async def test_sync_point(manager: ManagerClient):
await manager.server_start(node2.server_id)
await manager.server_sees_other_server(node1.ip_addr, node2.ip_addr)
assert not await_sync_point(node1, sync_point1, 30)
assert not (await await_sync_point(manager.api.client, node1.ip_addr, sync_point1, 30))
await manager.server_start(node3.server_id)
await manager.server_sees_other_server(node1.ip_addr, node3.ip_addr)
assert await_sync_point(node1, sync_point1, 30)
assert await await_sync_point(manager.api.client, node1.ip_addr, sync_point1, 30)
@pytest.mark.asyncio
@@ -226,7 +225,8 @@ async def test_hints_consistency_during_decommission(manager: ManagerClient):
await manager.api.disable_injection(srv.ip_addr, "hinted_handoff_pause_hint_replay")
logger.info("Wait until hints are replayed from nodes 1 and 2")
await asyncio.gather(*(asyncio.to_thread(await_sync_point, srv, pt, timeout=30) for srv, pt in zip((server1, server2), sync_points)))
await asyncio.gather(*(await_sync_point(manager.api.client, srv.ip_addr, pt, timeout=30)
for srv, pt in zip((server1, server2), sync_points)))
# Unpause streaming and let decommission finish
logger.info("Unpause streaming")
@@ -264,12 +264,9 @@ async def test_draining_hints(manager: ManagerClient):
sync_point = await create_sync_point(manager.api.client, s1.ip_addr)
await manager.server_start(s2.server_id)
async def wait():
assert await_sync_point(s1, sync_point, 60)
async with asyncio.TaskGroup() as tg:
_ = tg.create_task(manager.decommission_node(s1.server_id, timeout=60))
_ = tg.create_task(wait())
_ = tg.create_task(await_sync_point(manager.api.client, s1.ip_addr, sync_point, 60))
@pytest.mark.asyncio
@skip_mode("release", "error injections are not supported in release mode")
@@ -313,7 +310,7 @@ async def test_canceling_hint_draining(manager: ManagerClient):
await s1_log.wait_for(f"Draining starts for {host_id2}", s1_mark)
# Make sure draining finishes successfully.
assert await_sync_point(s1, sync_point, 60)
assert await await_sync_point(manager.api.client, s1.ip_addr, sync_point, 60)
await s1_log.wait_for(f"Removed hint directory for {host_id2}")
@pytest.mark.asyncio
@@ -367,7 +364,7 @@ async def test_hint_to_pending(manager: ManagerClient):
await wait_for(migration_reached_streaming, time.time() + 60)
await manager.api.disable_injection(servers[0].ip_addr, "hinted_handoff_pause_hint_replay")
assert await_sync_point(servers[0], sync_point, 30)
assert await await_sync_point(manager.api.client, servers[0].ip_addr, sync_point, 30)
await manager.api.message_injection(servers[0].ip_addr, "pause_after_streaming_tablet")
done, pending = await asyncio.wait([tablet_migration])