Files
scylladb/test/cluster/test_change_ip.py
Cezar Moise 7c8ab3d3d3 test.py: add pid to ServerInfo
Adding pid info to servers allows matching coredumps with servers

Other improvements:
- When replacing just some fields of ServerInfo, use `_replace` instead of
building a new object. This way it is agnostic to changes to the Object
- When building ServerInfo from a list, the types defined for its fields are
not enforced, so ServerInfo(*list) works fine and does not need to be changed if
fields are added or removed.
2025-12-12 15:11:03 +02:00

151 lines
8.0 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Test clusters can restart fine after an IP address change.
"""
import logging
import time
import pytest
import uuid
from cassandra.cluster import NoHostAvailable # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo
from test.pylib.random_tables import Column, IntType, TextType
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
from test.cluster.util import reconnect_driver
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
async def test_change_two(manager, random_tables, build_mode):
"""Stop two nodes, change their IPs and start, check the cluster is
functional"""
servers = await manager.running_servers()
host_ids = {s.server_id: await manager.get_host_id(s.server_id) for s in servers}
table = await random_tables.add_table(name='t1', pks=1, columns=[
Column("pk", IntType),
Column('int_c', IntType)
])
logger.info(f"Servers {servers}, gracefully stopping servers {servers[1]} and {servers[2]} to change ips")
await manager.server_stop_gracefully(servers[1].server_id)
await manager.server_stop_gracefully(servers[2].server_id)
s1_new_ip = await manager.server_change_ip(servers[1].server_id)
s2_new_ip = await manager.server_change_ip(servers[2].server_id)
logger.info(f"s1 new ip {s1_new_ip}, s2 new ip {s2_new_ip}")
async def wait_proper_ips(alive_servers: list[ServerInfo]):
hosts = {h.address: h for h in await wait_for_cql_and_get_hosts(manager.get_cql(), alive_servers, time.time() + 60)}
expected_alive_ips = {s.rpc_address for s in alive_servers}
expected_down_ips = {s.rpc_address for s in servers if s not in alive_servers}
expected_host_id_map = {host_ids[s.server_id]: s.rpc_address for s in servers}
logger.info(f"wait for proper ips, servers {servers}, alive servers {alive_servers}, expected host id map {expected_host_id_map}")
for server in alive_servers:
ip = server.rpc_address
host = hosts[ip]
host_id = host_ids[server.server_id]
expected_peers = [(uuid.UUID(host_ids[s.server_id]), s.rpc_address) for s in servers if s.server_id != server.server_id]
expected_peers.sort()
logger.info(f"waiting for {ip}/{host_id} to see the proper ips, expected peers {expected_peers}")
async def see_proper_ips():
local_rows = await manager.get_cql().run_async("select host_id, listen_address, rpc_address, broadcast_address from system.local", host=host)
if len(local_rows) != 1:
logger.info(f"len(system.local) == {len(local_rows)} != 1")
return None
local_row = local_rows[0]
local_vals = (local_row.host_id, local_row.listen_address, local_row.rpc_address, local_row.broadcast_address)
local_expected_vals = (uuid.UUID(host_id), ip, ip, ip)
if local_vals != local_expected_vals:
logger.info(f"local vals {local_vals} doesn't match expected vals {local_expected_vals}")
return None
current_peers = [(r.host_id, r.peer) for r in await manager.get_cql().run_async("select peer, host_id from system.peers", host=host)]
current_peers.sort()
if current_peers != expected_peers:
logger.info(f"host {host} current peers {current_peers} doesn't match the expected_peers {expected_peers}, continue waiting")
return None
alive_eps = set(await manager.api.get_alive_endpoints(ip))
if alive_eps != expected_alive_ips:
logger.info(f"host {host} alive endpoints {alive_eps} doesn't match the expected alive ips {expected_alive_ips}, continue waiting")
return None
down_eps = set(await manager.api.get_down_endpoints(ip))
if down_eps != expected_down_ips:
logger.info(f"host {host} down endpoints {down_eps} doesn't match the expected down ips {expected_down_ips}, continue waiting")
return None
actual_host_id_map = {x['value']: x['key'] for x in await manager.api.get_host_id_map(host.address)}
if expected_host_id_map != actual_host_id_map:
logger.info(f"host {host} id map {actual_host_id_map} doesn't match the expected host id map {expected_host_id_map}, continue waiting")
return None
return True
# FIXME: This is a workaround for the scylladb/python-driver#295 issue.
# We ignore the exception and keep retrying the operation.
# Can be removed once the issue is fixed.
async def safe_see_proper_ips():
try:
return await see_proper_ips()
except NoHostAvailable as e:
logger.info(f"see_proper_ips failed: {e}")
return None
await wait_for(safe_see_proper_ips, time.time() + 60)
# We're checking the crash scenario here - the servers[0] crashes just after
# saving s1_new_ip but before removing s1_old_ip. After its restart we should
# see s1_new_ip.
if build_mode != 'release':
await manager.api.enable_injection(servers[0].ip_addr, 'crash-before-prev-ip-removed', one_shot=True)
# There is a code in raft_ip_address_updater::on_endpoint_change which
# calls gossiper.force_remove_endpoint for an endpoint if it sees
# that the current generation of host_id -> ip mapping in raft_address_map
# is greater than the generation of the endpoint.
# We need to inject a delay ip-change-raft-sync-delay
# before old IP is removed. Otherwise, servers[0] removes the old
# IP-s before they are send back to servers[1] and servers[2],
# and the mentioned above code is not exercised by this test.
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
# sleep_before_start_gossiping injections are needed to reproduce #22777
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
if build_mode != 'release':
s0_logs = await manager.server_open_log(servers[0].server_id)
await s0_logs.wait_for('crash-before-prev-ip-removed hit, killing the node')
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1]])
await manager.server_start(servers[2].server_id)
servers[2] = servers[2]._replace(ip_addr=s2_new_ip, rpc_address=s2_new_ip)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1], servers[2]])
await table.add_column(column=Column("str_c", TextType))
await random_tables.verify_schema()
host0 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[0]], time.time() + 60))[0]
await manager.get_cql().run_async(f"USE {random_tables.keyspace}")
await manager.get_cql().run_async("insert into t1(pk, int_c, str_c) values (1, 2, 'test-val')", host=host0)
rows = list(await manager.get_cql().run_async("select * from t1", host=host0))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.int_c == 2
assert row.str_c == 'test-val'