Files
scylladb/test/cluster/test_change_ip.py
Dawid Mędrek dbb8835fdf test/cluster: Adjust simple tests to RF-rack-validity
We adjust all of the simple cases of cluster tests so they work
with `rf_rack_valid_keyspaces: true`. It boils down to assigning
nodes to multiple racks. For most of the changes, we do that by:

* Using `pytest.mark.prepare_3_racks_cluster` instead of
  `pytest.mark.prepare_3_nodes_cluster`.
* Using an additional argument -- `auto_rack_dc` -- when calling
  `ManagerClient::servers_add()`.

In some cases, we need to assign the racks manually, which may be
less obvious, but in every such situation, the tests didn't rely
on that assignment, so that doesn't affect them or what they verify.
2025-05-10 16:30:18 +02:00

151 lines
8.1 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Test clusters can restart fine after an IP address change.
"""
import logging
import time
import pytest
import uuid
from cassandra.cluster import NoHostAvailable # type: ignore # pylint: disable=no-name-in-module
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
from test.pylib.internal_types import ServerInfo
from test.pylib.random_tables import Column, IntType, TextType
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for
from test.cluster.util import reconnect_driver
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
async def test_change_two(manager, random_tables, build_mode):
"""Stop two nodes, change their IPs and start, check the cluster is
functional"""
servers = await manager.running_servers()
host_ids = {s.server_id: await manager.get_host_id(s.server_id) for s in servers}
table = await random_tables.add_table(name='t1', pks=1, columns=[
Column("pk", IntType),
Column('int_c', IntType)
])
logger.info(f"Servers {servers}, gracefully stopping servers {servers[1]} and {servers[2]} to change ips")
await manager.server_stop_gracefully(servers[1].server_id)
await manager.server_stop_gracefully(servers[2].server_id)
s1_new_ip = await manager.server_change_ip(servers[1].server_id)
s2_new_ip = await manager.server_change_ip(servers[2].server_id)
logger.info(f"s1 new ip {s1_new_ip}, s2 new ip {s2_new_ip}")
async def wait_proper_ips(alive_servers: list[ServerInfo]):
hosts = {h.address: h for h in await wait_for_cql_and_get_hosts(manager.get_cql(), alive_servers, time.time() + 60)}
expected_alive_ips = {s.rpc_address for s in alive_servers}
expected_down_ips = {s.rpc_address for s in servers if s not in alive_servers}
expected_host_id_map = {host_ids[s.server_id]: s.rpc_address for s in servers}
logger.info(f"wait for proper ips, servers {servers}, alive servers {alive_servers}, expected host id map {expected_host_id_map}")
for server in alive_servers:
ip = server.rpc_address
host = hosts[ip]
host_id = host_ids[server.server_id]
expected_peers = [(uuid.UUID(host_ids[s.server_id]), s.rpc_address) for s in servers if s.server_id != server.server_id]
expected_peers.sort()
logger.info(f"waiting for {ip}/{host_id} to see the proper ips, expected peers {expected_peers}")
async def see_proper_ips():
local_rows = await manager.get_cql().run_async("select host_id, listen_address, rpc_address, broadcast_address from system.local", host=host)
if len(local_rows) != 1:
logger.info(f"len(system.local) == {len(local_rows)} != 1")
return None
local_row = local_rows[0]
local_vals = (local_row.host_id, local_row.listen_address, local_row.rpc_address, local_row.broadcast_address)
local_expected_vals = (uuid.UUID(host_id), ip, ip, ip)
if local_vals != local_expected_vals:
logger.info(f"local vals {local_vals} doesn't match expected vals {local_expected_vals}")
return None
current_peers = [(r.host_id, r.peer) for r in await manager.get_cql().run_async("select peer, host_id from system.peers", host=host)]
current_peers.sort()
if current_peers != expected_peers:
logger.info(f"host {host} current peers {current_peers} doesn't match the expected_peers {expected_peers}, continue waiting")
return None
alive_eps = set(await manager.api.get_alive_endpoints(ip))
if alive_eps != expected_alive_ips:
logger.info(f"host {host} alive endpoints {alive_eps} doesn't match the expected alive ips {expected_alive_ips}, continue waiting")
return None
down_eps = set(await manager.api.get_down_endpoints(ip))
if down_eps != expected_down_ips:
logger.info(f"host {host} down endpoints {down_eps} doesn't match the expected down ips {expected_down_ips}, continue waiting")
return None
actual_host_id_map = {x['value']: x['key'] for x in await manager.api.get_host_id_map(host.address)}
if expected_host_id_map != actual_host_id_map:
logger.info(f"host {host} id map {actual_host_id_map} doesn't match the expected host id map {expected_host_id_map}, continue waiting")
return None
return True
# FIXME: This is a workaround for the scylladb/python-driver#295 issue.
# We ignore the exception and keep retrying the operation.
# Can be removed once the issue is fixed.
async def safe_see_proper_ips():
try:
return await see_proper_ips()
except NoHostAvailable as e:
logger.info(f"see_proper_ips failed: {e}")
return None
await wait_for(safe_see_proper_ips, time.time() + 60)
# We're checking the crash scenario here - the servers[0] crashes just after
# saving s1_new_ip but before removing s1_old_ip. After its restart we should
# see s1_new_ip.
if build_mode != 'release':
await manager.api.enable_injection(servers[0].ip_addr, 'crash-before-prev-ip-removed', one_shot=True)
# There is a code in raft_ip_address_updater::on_endpoint_change which
# calls gossiper.force_remove_endpoint for an endpoint if it sees
# that the current generation of host_id -> ip mapping in raft_address_map
# is greater than the generation of the endpoint.
# We need to inject a delay ip-change-raft-sync-delay
# before old IP is removed. Otherwise, servers[0] removes the old
# IP-s before they are send back to servers[1] and servers[2],
# and the mentioned above code is not exercised by this test.
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
# sleep_before_start_gossiping injections are needed to reproduce #22777
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
await manager.server_start(servers[1].server_id)
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
if build_mode != 'release':
s0_logs = await manager.server_open_log(servers[0].server_id)
await s0_logs.wait_for('crash-before-prev-ip-removed hit, killing the node')
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
await manager.api.enable_injection(servers[0].ip_addr, 'ip-change-raft-sync-delay', one_shot=False)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1]])
await manager.server_start(servers[2].server_id)
servers[2] = ServerInfo(servers[2].server_id, s2_new_ip, s2_new_ip, servers[2].datacenter, servers[2].rack)
await reconnect_driver(manager)
await wait_proper_ips([servers[0], servers[1], servers[2]])
await table.add_column(column=Column("str_c", TextType))
await random_tables.verify_schema()
host0 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[0]], time.time() + 60))[0]
await manager.get_cql().run_async(f"USE {random_tables.keyspace}")
await manager.get_cql().run_async("insert into t1(pk, int_c, str_c) values (1, 2, 'test-val')", host=host0)
rows = list(await manager.get_cql().run_async("select * from t1", host=host0))
assert len(rows) == 1
row = rows[0]
assert row.pk == 1
assert row.int_c == 2
assert row.str_c == 'test-val'