Files
scylladb/test/cluster/test_ip_mappings.py
Andrei Chekun cc5ac75d73 test.py: remove deprecated skip_mode decorator
Finishing the deprecation of the skip_mode function in favor of
pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests
that are still used and old way of skip_mode.

Closes scylladb/scylladb#28299
2026-01-25 18:17:27 +02:00

148 lines
7.8 KiB
Python

# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
from test.pylib.manager_client import ManagerClient
import pytest
import logging
from uuid import UUID
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely
from test.cluster.util import disable_schema_agreement_wait, new_test_keyspace, reconnect_driver
from cassandra.cluster import ConsistencyLevel, SimpleStatement
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_broken_bootstrap(manager: ManagerClient):
server_a = await manager.server_add()
server_b = await manager.server_add(start=False)
async with new_test_keyspace(manager, "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.test"
await manager.cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int)")
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
await inject_error_one_shot(manager.api, server_a.ip_addr, "crash-before-bootstrapping-node-added")
try:
# Timeout fast since we do not expect the operation to complete
# because the coordinator is dead by now due to the error injection
# above
await manager.server_start(server_b.server_id, timeout=5)
pytest.fail("Expected server_add to fail")
except Exception:
pass
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
await manager.server_start(server_a.server_id)
await manager.driver_connect()
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.parametrize('reuse_ip', [False, True])
async def test_full_shutdown_during_replace(manager: ManagerClient, reuse_ip: bool):
"""
Test that shutting down all live nodes during replace doesn't cause the IP address of the replacing node to be
missing on restart. After restarts, the test performs requests with the replacing node as a pending replica, which
would crash the request coordinators if the IP address was missing.
Regression test for scylla-enterprise#5686. Note that in the reported issue `nodetool status` performed during
stuck replace caused a crash, but the problem turned out to be more general.
"""
logger.info('Adding servers')
leader = await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack0'})
follower, dead_server = await manager.servers_add(2, auto_rack_dc='dc1')
live_servers = [leader, follower]
logger.info('Waiting for driver')
cql, [host] = await manager.get_ready_cql([follower])
with disable_schema_agreement_wait(cql): # This makes the test a bit faster.
async with new_test_keyspace(manager, """WITH REPLICATION = {'class': 'NetworkTopologyStrategy',
'replication_factor': 3} AND tablets = {'enabled': false}""", host) as ks:
table = f'{ks}.test'
await cql.run_async(f'CREATE TABLE {table} (a int PRIMARY KEY, b int)', host=host)
logger.info(f'Stopping {dead_server}')
await manager.server_stop_gracefully(dead_server.server_id)
await manager.api.enable_injection(
leader.ip_addr, 'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier', True)
replace_cfg = ReplaceConfig(replaced_id=dead_server.server_id, reuse_ip_addr=reuse_ip, use_host_id=False)
logger.info(f'Adding the server that will replace {dead_server}')
replacing_server = await manager.server_add(
replace_cfg, property_file=dead_server.property_file(), start=False)
logger.info(f'Trying to replace {dead_server} with {replacing_server}')
replacing_task = asyncio.create_task(manager.server_start(replacing_server.server_id))
logger.info('Waiting for the topology coordinator to pause in write_both_read_old')
leader_log = await manager.server_open_log(leader.server_id)
await leader_log.wait_for(
'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier: waiting for message')
logger.info(f'Stopping {live_servers + [replacing_server]}')
await gather_safely(*(manager.server_stop(srv.server_id) for srv in live_servers + [replacing_server]))
replacing_task.cancel()
for srv in live_servers:
await manager.server_update_config(srv.server_id, 'error_injections_at_startup',
['topology_coordinator/write_both_read_old/before_global_token_metadata_barrier'])
logger.info(f'Starting {live_servers}')
await gather_safely(*(manager.server_start(srv.server_id) for srv in live_servers))
# This is needed to ensure that the replacing node will be a pending replica for the requests below.
logger.info(f'Waiting for {live_servers} to apply the latest topology state')
await gather_safely(*[read_barrier(manager.api, srv.ip_addr) for srv in live_servers])
await reconnect_driver(manager)
logger.info('Waiting for driver')
cql, hosts = await manager.get_ready_cql(live_servers)
logger.info(f'Sending requests to {table} with {replacing_server} as a pending replica')
for i in range(100):
await cql.run_async(SimpleStatement(f'INSERT INTO {table} (a, b) VALUES ({i}, {i})',
consistency_level=ConsistencyLevel.ONE))
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
# Verify that `/storage_service/tokens/{endpoint}` returns no tokens for IP of the node being replaced iff
# reuse_ip is True. The goal here is to check that the API endpoint resolves the provided IP address to the
# host ID of the replacing node when reuse_ip is True. There is nothing special about this endpoint; we
# can use any endpoint that calls `gossiper::get_host_id(inet_address endpoint)`. When reuse_ip is False, we
# expect non-empty tokens as a sanity check.
logger.info(f'Checking tokens of {dead_server.ip_addr}')
for srv in live_servers:
tokens = await manager.api.get_tokens(srv.ip_addr, dead_server.ip_addr)
assert (len(tokens) == 0) == reuse_ip
for srv in live_servers:
await manager.api.message_injection(
srv.ip_addr, 'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier')
logger.info(f'Retrying replace of {dead_server}')
new_server = await manager.server_add(replace_cfg, property_file=dead_server.property_file())
live_servers.append(new_server)
logger.info(f'Checking peers on {live_servers}')
host_ids = [await manager.get_host_id(srv.server_id) for srv in live_servers]
for srv, host in zip(live_servers, hosts):
result = await cql.run_async('SELECT peer, host_id FROM system.peers', host=host)
peers = {(row.peer, row.host_id) for row in result}
expected = {(other.ip_addr, UUID(id)) for other, id in zip(live_servers, host_ids) if other != srv}
assert peers == expected