Replace list comprehensions with asyncio.gather() to await the injection API calls in fully concurrent manner.
65 lines
2.7 KiB
Python
65 lines
2.7 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import HTTPError
|
|
from test.pylib.util import wait_for
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_join(manager: ManagerClient) -> None:
|
|
"""The test checks that join works even if expiring entries are dropped
|
|
between placement of the join request and its processing"""
|
|
s1 = await manager.server_add()
|
|
inj = 'topology_coordinator_pause_before_processing_backlog'
|
|
await manager.api.enable_injection(s1.ip_addr, inj, one_shot=True)
|
|
s2 = await manager.server_add(start=False)
|
|
task = asyncio.create_task(manager.server_start(s2.server_id))
|
|
await manager.server_sees_other_server(s1.ip_addr, s2.ip_addr, interval=300)
|
|
await manager.api.enable_injection(s1.ip_addr, "handle_node_transition_drop_expiring", one_shot=True)
|
|
await manager.api.message_injection(s1.ip_addr, inj)
|
|
await asyncio.gather(task)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_long_join_drop_entries_on_bootstrapping(manager: ManagerClient) -> None:
|
|
"""The test checks that join works even if expiring entries are dropped
|
|
on the joining node between placement of the join request and its processing"""
|
|
servers = await manager.servers_add(2)
|
|
inj = 'topology_coordinator_pause_before_processing_backlog'
|
|
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, inj, one_shot=True) for s in servers))
|
|
s = await manager.server_add(start=False, config={
|
|
'error_injections_at_startup': ['pre_server_start_drop_expiring']
|
|
})
|
|
task = asyncio.create_task(manager.server_start(s.server_id))
|
|
log = await manager.server_open_log(s.server_id)
|
|
await log.wait_for("init - starting gossiper")
|
|
|
|
# the gossiper API initializes at some point after the module startup
|
|
# - we need to wait for the API to be ready
|
|
|
|
async def gossiper_api_ready():
|
|
try:
|
|
await manager.api.get_alive_endpoints(s.ip_addr)
|
|
return True
|
|
except HTTPError as e:
|
|
# we just filter out the specific error which means the API is not ready
|
|
if e.code != 404 or 'Not found' not in e.message:
|
|
raise
|
|
|
|
await wait_for(gossiper_api_ready, time.time() + 60)
|
|
|
|
servers.append(s)
|
|
await manager.servers_see_each_other(servers, interval=300)
|
|
await manager.api.enable_injection(s.ip_addr, 'join_node_response_drop_expiring', one_shot=True)
|
|
await asyncio.gather(*(manager.api.message_injection(s.ip_addr, inj) for s in servers[:-1]))
|
|
await asyncio.gather(task)
|