Rework `ScyllaLogFile.wait_for()` method to make it easier to add required methods to ScyllaNode class of ccm-like shim. Also, added `ScyllaLogFile.grep_for_errors()` method and reworked `ScyllaLogFile.grep()`
140 lines
6.9 KiB
Python
140 lines
6.9 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
"""
|
|
Test replacing node in different scenarios
|
|
"""
|
|
import time
|
|
from test.pylib.scylla_cluster import ReplaceConfig
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.cluster.util import wait_for_token_ring_and_group0_consistency, wait_for_cql_and_get_hosts, wait_for
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from test.pylib.random_tables import RandomTables, Column, TextType
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replace_different_ip(manager: ManagerClient) -> None:
|
|
"""Replace an existing node with new node using a different IP address"""
|
|
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
|
logger.info(f"cluster started, servers {servers}")
|
|
|
|
logger.info(f"replacing server {servers[0]}")
|
|
await manager.server_stop(servers[0].server_id)
|
|
replaced_server = servers[0]
|
|
replace_cfg = ReplaceConfig(replaced_id = replaced_server.server_id, reuse_ip_addr = False, use_host_id = False)
|
|
new_server = await manager.server_add(replace_cfg)
|
|
cql = manager.get_cql()
|
|
servers = await manager.running_servers()
|
|
all_ips = set([s.rpc_address for s in servers])
|
|
logger.info(f"new server {new_server} started, all ips {all_ips}, "
|
|
"waiting for token ring and group0 consistency")
|
|
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
|
|
|
for s in servers:
|
|
peers_to_see = all_ips - {s.rpc_address}
|
|
|
|
logger.info(f'waiting for cql and get hosts for {s}')
|
|
h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
|
|
|
|
logger.info(f"waiting for {s} to see its peers {peers_to_see}")
|
|
async def check_peers_and_gossiper():
|
|
peers = set([r.peer for r in await cql.run_async("select peer from system.peers", host=h)])
|
|
remaining = peers_to_see - peers
|
|
if remaining:
|
|
logger.info(f"server {h} doesn't see its peers, all_ips {all_ips}, peers_to_see {peers_to_see}, remaining {remaining}, continue waiting")
|
|
return None
|
|
|
|
alive_eps = await manager.api.get_alive_endpoints(s.ip_addr)
|
|
if replaced_server.ip_addr in alive_eps:
|
|
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in alive eps {alive_eps}, continue waiting")
|
|
return None
|
|
|
|
down_eps = await manager.api.get_down_endpoints(s.ip_addr)
|
|
if replaced_server.ip_addr in down_eps:
|
|
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in down eps {down_eps}, continue waiting")
|
|
return None
|
|
|
|
return True
|
|
await wait_for(check_peers_and_gossiper, time.time() + 60)
|
|
logger.info(f"server {s} system.peers and gossiper state is valid")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
|
|
"""Replace an existing node with new node reusing the replaced node host id"""
|
|
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
|
await manager.server_stop(servers[0].server_id)
|
|
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
|
|
await manager.server_add(replace_cfg)
|
|
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replace_reuse_ip(request, manager: ManagerClient) -> None:
|
|
"""Replace an existing node with new node using the same IP address"""
|
|
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}, auto_rack_dc="dc1")
|
|
host2 = (await wait_for_cql_and_get_hosts(manager.get_cql(), [servers[2]], time.time() + 60))[0]
|
|
|
|
logger.info(f"creating test table")
|
|
random_tables = RandomTables(request.node.name, manager, "ks", 3)
|
|
await random_tables.add_table(name='test_table', pks=1, columns=[
|
|
Column(name="key", ctype=TextType),
|
|
Column(name="value", ctype=TextType)
|
|
])
|
|
|
|
await manager.server_stop_gracefully(servers[0].server_id)
|
|
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = False)
|
|
replace_future = asyncio.create_task(manager.server_add(replace_cfg, property_file=servers[0].property_file()))
|
|
start_time = time.time()
|
|
next_id = 0
|
|
logger.info(f"running write requests in a loop while the replacing node is starting")
|
|
expected_data = []
|
|
while not replace_future.done():
|
|
i = next_id
|
|
next_id += 1
|
|
k = f'key_{i}'
|
|
v = f'value_{i}'
|
|
expected_data.append((k, v))
|
|
await manager.get_cql().run_async(SimpleStatement("insert into ks.test_table(key, value) values (%s, %s)",
|
|
consistency_level=ConsistencyLevel.QUORUM),
|
|
parameters=[k, v],
|
|
host=host2)
|
|
finish_time = time.time()
|
|
s = await replace_future
|
|
logger.info(f"done, writes count {next_id}, took {finish_time - start_time} seconds")
|
|
|
|
# make sure that after we start snapshot transfer we no longer have stale writes
|
|
log = await manager.server_open_log(s.server_id)
|
|
m, _ = await log.wait_for("group0_raft_sm - transfer snapshot from ")
|
|
errs = await log.grep("storage_proxy - Failed to apply mutation from", from_mark=m)
|
|
assert len(errs) == 0
|
|
|
|
result_set = await manager.get_cql().run_async(SimpleStatement("select * from ks.test_table",
|
|
consistency_level=ConsistencyLevel.QUORUM),
|
|
host=host2, all_pages=True)
|
|
read_data = [(row.key, row.value) for row in result_set]
|
|
expected_data.sort()
|
|
read_data.sort()
|
|
logger.info(f"expected data:\n{expected_data}\nread_data:\n{read_data}")
|
|
assert read_data == expected_data
|
|
logger.info("the data is correct")
|
|
|
|
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
|
await manager.server_sees_other_server(servers[1].ip_addr, servers[0].ip_addr)
|
|
await manager.server_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replace_reuse_ip_using_host_id(manager: ManagerClient) -> None:
|
|
"""Replace an existing node with new node using the same IP address and same host id"""
|
|
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
|
|
await manager.server_stop(servers[0].server_id)
|
|
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = True, use_host_id = True)
|
|
await manager.server_add(replace_cfg)
|
|
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|