Files
scylladb/test/cluster/test_replace.py
Marcin Maliszkiewicz e414b2b0b9 test/cluster: scale failure_detector_timeout_in_ms by build mode
Six cluster test files override failure_detector_timeout_in_ms to 2000ms
for faster failure detection. In debug and sanitize builds, this causes
flaky node join failures. The following log analysis shows how.

The coordinator (server 614, IP 127.2.115.3) accepts the joining node
(server 615, host_id 53b01f0b, IP 127.2.115.2) into group0:

  20:10:57,049 [shard 0] raft_group0 - server 614 entered
    'join group0' transition state for 53b01f0b

The joining node begins receiving the raft snapshot 100ms later:

  20:10:57,150 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then spends ~280ms applying schema changes -- creating 6 keyspaces
and 12+ tables from the snapshot:

  20:10:57,511 [shard 0] migration_manager - Creating keyspace
    system_auth_v2
  ...
  20:10:57,788 [shard 0] migration_manager - Creating
    system_auth_v2.role_members

Meanwhile, the coordinator's failure detector pings the joining node.
Under debug+ASan load the RPC call times out after ~4.6 seconds:

  20:11:01,643 [shard 0] direct_failure_detector - unexpected exception
    when pinging 53b01f0b: seastar::rpc::timeout_error
    (rpc call timed out)

25ms later, the coordinator marks the joining node DOWN and removes it:

  20:11:01,668 [shard 0] raft_group0 - failure_detector_loop:
    Mark node 53b01f0b as DOWN
  20:11:01,717 [shard 0] raft_group0 - bootstrap: failed to accept
    53b01f0b

The joining node was still retrying the snapshot transfer at that point:

  20:11:01,745 [shard 0] raft_group0 - transfer snapshot from 9fa48539

It then receives the ban notification and aborts:

  20:11:01,844 [shard 0] raft_group0 - received notification of being
    banned from the cluster

Replace the hardcoded 2000ms with the failure_detector_timeout fixture
from conftest.py, which scales by MODES_TIMEOUT_FACTOR: 3x for
debug/sanitize (6000ms), 2x for dev (4000ms), 1x for release (2000ms).

Test measurements (before -> after fix):

  debug mode:
  test_replace_with_same_ip_twice           24.02s ->  25.02s
  test_banned_node_notification            217.22s -> 221.72s
  test_kill_coordinator_during_op          116.11s -> 127.13s
  test_node_failure_during_tablet_migration
    [streaming-source]                     183.25s -> 192.69s
  test_replace (4 tests)        skipped in debug (skip_in_debug)
  test_raft_replace_ignore_nodes  skipped in debug (run_in_dev only)

  dev mode:
  test_replace_different_ip                 10.51s ->  11.50s
  test_replace_different_ip_using_host_id   10.01s ->  12.01s
  test_replace_reuse_ip                     10.51s ->  12.03s
  test_replace_reuse_ip_using_host_id       13.01s ->  12.01s
  test_raft_replace_ignore_nodes            19.52s ->  19.52s
2026-04-20 15:28:34 +02:00

140 lines
7.0 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
"""
Test replacing node in different scenarios
"""
import time
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.cluster.util import wait_for_token_ring_and_group0_consistency, wait_for_cql_and_get_hosts, wait_for
import pytest
import logging
import asyncio
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.pylib.random_tables import RandomTables, Column, TextType
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
await manager.server_stop(servers[0].server_id)
replaced_server = servers[0]
replace_cfg = ReplaceConfig(replaced_id = replaced_server.server_id, reuse_ip_addr = False, use_host_id = False)
new_server = await manager.server_add(replace_cfg)
cql = manager.get_cql()
servers = await manager.running_servers()
all_ips = set([s.rpc_address for s in servers])
logger.info(f"new server {new_server} started, all ips {all_ips}, "
"waiting for token ring and group0 consistency")
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
for s in servers:
peers_to_see = all_ips - {s.rpc_address}
logger.info(f'waiting for cql and get hosts for {s}')
h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
logger.info(f"waiting for {s} to see its peers {peers_to_see}")
async def check_peers_and_gossiper():
peers = set([r.peer for r in await cql.run_async("select peer from system.peers", host=h)])
remaining = peers_to_see - peers
if remaining:
logger.info(f"server {h} doesn't see its peers, all_ips {all_ips}, peers_to_see {peers_to_see}, remaining {remaining}, continue waiting")
return None
alive_eps = await manager.api.get_alive_endpoints(s.ip_addr)
if replaced_server.ip_addr in alive_eps:
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in alive eps {alive_eps}, continue waiting")
return None
down_eps = await manager.api.get_down_endpoints(s.ip_addr)
if replaced_server.ip_addr in down_eps:
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in down eps {down_eps}, continue waiting")
return None
return True
await wait_for(check_peers_and_gossiper, time.time() + 60)
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
@pytest.mark.asyncio
async def test_replace_reuse_ip(request, manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout}, auto_rack_dc="dc1")
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
logger.info(f"creating test table")
random_tables = RandomTables(request.node.name, manager, "ks", 3)
await random_tables.add_table(name='test_table', pks=1, columns=[
Column(name="key", ctype=TextType),
Column(name="value", ctype=TextType)
])
await manager.server_stop_gracefully(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
replace_future = asyncio.create_task(manager.server_add(replace_cfg, property_file=servers[0].property_file()))
start_time = time.time()
next_id = 0
logger.info(f"running write requests in a loop while the replacing node is starting")
expected_data = []
while not replace_future.done():
i = next_id
next_id += 1
k = f'key_{i}'
v = f'value_{i}'
expected_data.append((k, v))
await manager.get_cql().run_async(SimpleStatement("insert into ks.test_table(key, value) values (%s, %s)",
consistency_level=ConsistencyLevel.QUORUM),
parameters=[k, v],
host=host2)
finish_time = time.time()
s = await replace_future
logger.info(f"done, writes count {next_id}, took {finish_time - start_time} seconds")
# make sure that after we start snapshot transfer we no longer have stale writes
log = await manager.server_open_log(s.server_id)
m, _ = await log.wait_for("group0_raft_sm - transfer snapshot from ")
errs = await log.grep("storage_proxy - Failed to apply mutation from", from_mark=m)
assert len(errs) == 0
result_set = await manager.get_cql().run_async(SimpleStatement("select * from ks.test_table",
consistency_level=ConsistencyLevel.QUORUM),
host=host2, all_pages=True)
read_data = [(row.key, row.value) for row in result_set]
expected_data.sort()
read_data.sort()
logger.info(f"expected data:\n{expected_data}\nread_data:\n{read_data}")
assert read_data == expected_data
logger.info("the data is correct")
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await manager.server_sees_other_server(servers[1].ip_addr, servers[0].ip_addr)
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
@pytest.mark.asyncio
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient, failure_detector_timeout) -> None:
"""Replace an existing node with new node using the same IP address and same host id"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': failure_detector_timeout})
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
await manager.server_add(replace_cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)