Files
scylladb/test/cluster/test_ip_mappings.py
Patryk Jędrzejczak ba7f314cdc test: test_full_shutdown_during_replace: retry replace after the replacing node is removed from gossip
The test is currently flaky with `reuse_ip = True`. The issue is that the
test retries replace before the first replace is rolled back and the
first replacing node is removed from gossip. The second replacing node
can see the entry of the first replacing node in gossip. This entry has
a newer generation than the entry of the node being replaced, and both
replacing nodes have the same IP as the node being replaced. Therefore,
the second replacing node incorrectly considers this entry as the entry
of the node being replaced. This entry is missing rack and DC, so the
second replace fails with
```
ERROR 2026-02-24 21:19:03,420 [shard 0:main] init - Startup failed:
std::runtime_error (Cannot replace node
8762a9d2-3b30-4e66-83a1-98d16c5dd007/127.61.127.1 with a node on
a different data center or rack.
Current location=UNKNOWN_DC/UNKNOWN_RACK, new location=dc1/rack2)
```

Fixes SCYLLADB-805

Closes scylladb/scylladb#28829
2026-03-02 10:26:57 +02:00

157 lines
8.2 KiB
Python

# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
from test.pylib.manager_client import ManagerClient
import pytest
import logging
from uuid import UUID
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import gather_safely
from test.cluster.util import disable_schema_agreement_wait, new_test_keyspace, reconnect_driver
from cassandra.cluster import ConsistencyLevel, SimpleStatement
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_broken_bootstrap(manager: ManagerClient):
server_a = await manager.server_add()
server_b = await manager.server_add(start=False)
async with new_test_keyspace(manager, "WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.test"
await manager.cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int)")
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
await inject_error_one_shot(manager.api, server_a.ip_addr, "crash-before-bootstrapping-node-added")
try:
# Timeout fast since we do not expect the operation to complete
# because the coordinator is dead by now due to the error injection
# above
await manager.server_start(server_b.server_id, timeout=5)
pytest.fail("Expected server_add to fail")
except Exception:
pass
await gather_safely(*(manager.server_stop(srv.server_id) for srv in [server_a, server_b]))
await manager.server_start(server_a.server_id)
await manager.driver_connect()
for i in range(100):
await manager.cql.run_async(f"INSERT INTO {table} (a, b) VALUES ({i}, {i})")
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.parametrize('reuse_ip', [False, True])
async def test_full_shutdown_during_replace(manager: ManagerClient, reuse_ip: bool):
"""
Test that shutting down all live nodes during replace doesn't cause the IP address of the replacing node to be
missing on restart. After restarts, the test performs requests with the replacing node as a pending replica, which
would crash the request coordinators if the IP address was missing.
Regression test for scylla-enterprise#5686. Note that in the reported issue `nodetool status` performed during
stuck replace caused a crash, but the problem turned out to be more general.
"""
logger.info('Adding servers')
leader = await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack0'})
follower, dead_server = await manager.servers_add(2, auto_rack_dc='dc1')
live_servers = [leader, follower]
logger.info('Waiting for driver')
cql, [host] = await manager.get_ready_cql([follower])
with disable_schema_agreement_wait(cql): # This makes the test a bit faster.
async with new_test_keyspace(manager, """WITH REPLICATION = {'class': 'NetworkTopologyStrategy',
'replication_factor': 3} AND tablets = {'enabled': false}""", host) as ks:
table = f'{ks}.test'
await cql.run_async(f'CREATE TABLE {table} (a int PRIMARY KEY, b int)', host=host)
logger.info(f'Stopping {dead_server}')
await manager.server_stop_gracefully(dead_server.server_id)
await manager.api.enable_injection(
leader.ip_addr, 'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier', True)
replace_cfg = ReplaceConfig(replaced_id=dead_server.server_id, reuse_ip_addr=reuse_ip, use_host_id=False)
logger.info(f'Adding the server that will replace {dead_server}')
replacing_server = await manager.server_add(
replace_cfg, property_file=dead_server.property_file(), start=False)
logger.info(f'Trying to replace {dead_server} with {replacing_server}')
replacing_task = asyncio.create_task(manager.server_start(replacing_server.server_id))
logger.info('Waiting for the topology coordinator to pause in write_both_read_old')
leader_log = await manager.server_open_log(leader.server_id)
await leader_log.wait_for(
'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier: waiting for message')
replacing_host_id = await manager.get_host_id(replacing_server.server_id)
logger.info(f'Stopping {live_servers + [replacing_server]}')
await gather_safely(*(manager.server_stop(srv.server_id) for srv in live_servers + [replacing_server]))
replacing_task.cancel()
for srv in live_servers:
await manager.server_update_config(srv.server_id, 'error_injections_at_startup',
['topology_coordinator/write_both_read_old/before_global_token_metadata_barrier'])
logger.info(f'Starting {live_servers}')
await gather_safely(*(manager.server_start(srv.server_id) for srv in live_servers))
# This is needed to ensure that the replacing node will be a pending replica for the requests below.
logger.info(f'Waiting for {live_servers} to apply the latest topology state')
await gather_safely(*[read_barrier(manager.api, srv.ip_addr) for srv in live_servers])
await reconnect_driver(manager)
logger.info('Waiting for driver')
cql, hosts = await manager.get_ready_cql(live_servers)
logger.info(f'Sending requests to {table} with {replacing_server} as a pending replica')
for i in range(100):
await cql.run_async(SimpleStatement(f'INSERT INTO {table} (a, b) VALUES ({i}, {i})',
consistency_level=ConsistencyLevel.ONE))
response = await manager.cql.run_async(f"SELECT * FROM {table} WHERE a = {i}")
assert response[0].b == i
# Verify that `/storage_service/tokens/{endpoint}` returns no tokens for IP of the node being replaced iff
# reuse_ip is True. The goal here is to check that the API endpoint resolves the provided IP address to the
# host ID of the replacing node when reuse_ip is True. There is nothing special about this endpoint; we
# can use any endpoint that calls `gossiper::get_host_id(inet_address endpoint)`. When reuse_ip is False, we
# expect non-empty tokens as a sanity check.
logger.info(f'Checking tokens of {dead_server.ip_addr}')
for srv in live_servers:
tokens = await manager.api.get_tokens(srv.ip_addr, dead_server.ip_addr)
assert (len(tokens) == 0) == reuse_ip
for srv in live_servers:
await manager.api.message_injection(
srv.ip_addr, 'topology_coordinator/write_both_read_old/before_global_token_metadata_barrier')
logs = [await manager.server_open_log(srv.server_id) for srv in live_servers]
logger.info(f'Waiting for {replacing_server} to be removed from gossip after replace rollback')
await gather_safely(*[
log.wait_for(f'gossip - Finished to force remove node {replacing_host_id}')
for log in logs
])
logger.info(f'Retrying replace of {dead_server}')
new_server = await manager.server_add(replace_cfg, property_file=dead_server.property_file())
live_servers.append(new_server)
logger.info(f'Checking peers on {live_servers}')
host_ids = [await manager.get_host_id(srv.server_id) for srv in live_servers]
for srv, host in zip(live_servers, hosts):
result = await cql.run_async('SELECT peer, host_id FROM system.peers', host=host)
peers = {(row.peer, row.host_id) for row in result}
expected = {(other.ip_addr, UUID(id)) for other, id in zip(live_servers, host_ids) if other != srv}
assert peers == expected