Files
scylladb/test/cluster/test_long_join.py
Emil Maskovsky f8c297ca27 test: improve async execution in test_long_join
Replace list comprehensions with asyncio.gather() to await the injection
API calls in fully concurrent manner.
2025-09-08 17:14:37 +02:00

65 lines
2.7 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import time
import pytest
import logging
import asyncio
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import HTTPError
from test.pylib.util import wait_for
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_long_join(manager: ManagerClient) -> None:
"""The test checks that join works even if expiring entries are dropped
between placement of the join request and its processing"""
s1 = await manager.server_add()
inj = 'topology_coordinator_pause_before_processing_backlog'
await manager.api.enable_injection(s1.ip_addr, inj, one_shot=True)
s2 = await manager.server_add(start=False)
task = asyncio.create_task(manager.server_start(s2.server_id))
await manager.server_sees_other_server(s1.ip_addr, s2.ip_addr, interval=300)
await manager.api.enable_injection(s1.ip_addr, "handle_node_transition_drop_expiring", one_shot=True)
await manager.api.message_injection(s1.ip_addr, inj)
await asyncio.gather(task)
@pytest.mark.asyncio
async def test_long_join_drop_entries_on_bootstrapping(manager: ManagerClient) -> None:
"""The test checks that join works even if expiring entries are dropped
on the joining node between placement of the join request and its processing"""
servers = await manager.servers_add(2)
inj = 'topology_coordinator_pause_before_processing_backlog'
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, inj, one_shot=True) for s in servers))
s = await manager.server_add(start=False, config={
'error_injections_at_startup': ['pre_server_start_drop_expiring']
})
task = asyncio.create_task(manager.server_start(s.server_id))
log = await manager.server_open_log(s.server_id)
await log.wait_for("init - starting gossiper")
# the gossiper API initializes at some point after the module startup
# - we need to wait for the API to be ready
async def gossiper_api_ready():
try:
await manager.api.get_alive_endpoints(s.ip_addr)
return True
except HTTPError as e:
# we just filter out the specific error which means the API is not ready
if e.code != 404 or 'Not found' not in e.message:
raise
await wait_for(gossiper_api_ready, time.time() + 60)
servers.append(s)
await manager.servers_see_each_other(servers, interval=300)
await manager.api.enable_injection(s.ip_addr, 'join_node_response_drop_expiring', one_shot=True)
await asyncio.gather(*(manager.api.message_injection(s.ip_addr, inj) for s in servers[:-1]))
await asyncio.gather(task)