Files
scylladb/test/cluster/test_topology_remove_decom.py
Abhinav Jha ab0e0eab90 raft topology: skip non-idempotent steps in decommission path to avoid problems during races
In the present scenario, there are issues in left_token_ring transition state
execution in the decommissioning path. In case of concurrent mutation race
conditions, we enter left_token_ring more than once, and apparently if
we enter left token ring second time, we try to barrier the decommisioned
node, which at this point is no longer possible. That's what causes the errors.

This pr resolves the issue by adding a check right in the start of
left_token_ring to check if the first topology state update, which marks
the request as done is completed. In this case, its confirmed that this
is the second time flow is entering left_token_ring and the steps preceding
the request status update should be skipped. In such cases, all the rest
steps are skipped and topology node status update( which threw error in
previous trial) is executed directly. Node removal status from group0 is
also checked and remove operation is retried if failed last time.

Although these changes are done with regard to the decommission operation
behavior in `left_token_ring` transition state, but since the pr doesn't
interfere with the core logic, it should not derail any rollback specific
logic. The changes just prevent some non-idempotent operations from
re-occuring in case of failures. Rest of the core logic remain intact.

Test is also added to confirm the proper working of the same.

Fixes: scylladb/scylladb#20865

Backport is not needed, since this is not a super critical bug fix.

Closes scylladb/scylladb#26717
2025-11-07 10:07:49 +01:00

220 lines
10 KiB
Python

#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Test consistency of schema changes with topology changes.
"""
import logging
import asyncio
import random
import time
from test import pylib
from test.cluster.conftest import skip_mode
from test.pylib.util import wait_for
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.scylla_cluster import gather_safely
from test.cluster.util import check_token_ring_and_group0_consistency, \
wait_for_token_ring_and_group0_consistency
import pytest
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.prepare_3_racks_cluster
@pytest.mark.asyncio
async def test_remove_node_add_column(manager: ManagerClient, random_tables: RandomTables):
"""Add a node, remove an original node, add a column"""
servers = await manager.running_servers()
table = await random_tables.add_table(ncolumns=5)
await manager.server_add(property_file=servers[1].property_file())
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
await check_token_ring_and_group0_consistency(manager)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
async def test_decommission_node_add_column(manager: ManagerClient, random_tables: RandomTables):
"""Add a node, remove an original node, add a column"""
table = await random_tables.add_table(ncolumns=5)
servers = await manager.running_servers()
decommission_target = servers[1]
# The sleep injections significantly increase the probability of reproducing #11780:
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
# enters sleep before calling storage_service::notify_joined
# 3. we start decommission on decommission_target
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
# 5. bootstrapped_server receives the RPC and enters sleep
# 6. decommission_target handle_state_normal wakes up,
# calls storage_service::notify_joined which drops some RPC clients
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
await manager.api.enable_injection(
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
bootstrapped_server = await manager.server_add(property_file=decommission_target.property_file())
async def no_joining_nodes():
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
return not joining_nodes
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
await manager.api.enable_injection(
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
await manager.decommission_node(decommission_target.server_id)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
await table.add_column()
await random_tables.verify_schema()
@pytest.mark.asyncio
@pytest.mark.skip(reason="Wait for @slow attribute, #11713")
async def test_remove_node_with_concurrent_ddl(manager: ManagerClient, random_tables: RandomTables):
stopped = False
ddl_failed = False
async def do_ddl() -> None:
nonlocal ddl_failed
iteration = 0
while not stopped:
logger.debug(f'ddl, iteration {iteration} started')
try:
# If the node was removed, the driver may retry "create table" on another node,
# but the request might have already been completed.
# The same applies to drop_table.
await random_tables.add_tables(5, 5, if_not_exists=True)
await random_tables.verify_schema()
while len(random_tables.tables) > 0:
await random_tables.drop_table(random_tables.tables[-1], if_exists=True)
logger.debug(f'ddl, iteration {iteration} finished')
except:
logger.exception(f'ddl, iteration {iteration} failed')
ddl_failed = True
raise
iteration += 1
async def do_remove_node() -> None:
for i in range(10):
logger.debug(f'do_remove_node [{i}], iteration started')
if ddl_failed:
logger.debug(f'do_remove_node [{i}], ddl failed, exiting')
break
server_infos = await manager.running_servers()
host_ids = await asyncio.gather(*(manager.get_host_id(s.server_id) for s in server_infos))
initiator_index, target_index = random.sample(range(len(server_infos)), 2)
initiator_info = server_infos[initiator_index]
target_info = server_infos[target_index]
target_host_id = host_ids[target_index]
logger.info(f'do_remove_node [{i}], running remove_node, '
f'initiator server [{initiator_info.ip_addr}], target ip [{target_info.ip_addr}], '
f'target host id [{target_host_id}]')
await manager.wait_for_host_known(initiator_info.ip_addr, target_host_id)
logger.info(f'do_remove_node [{i}], stopping target server [{target_info.ip_addr}], host_id [{target_host_id}]')
await manager.server_stop_gracefully(target_info.server_id)
logger.info(f'do_remove_node [{i}], invoking remove_node')
await manager.remove_node(initiator_info.server_id, target_info.server_id, [target_info.ip_addr])
# TODO: check that group 0 no longer contains the removed node (#12153)
logger.info(f'do_remove_node [{i}], remove_node done')
new_server_ip = await manager.server_add()
logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')
logger.info(f'do_remove_node [{i}], iteration finished')
ddl_task = asyncio.create_task(do_ddl())
try:
await do_remove_node()
finally:
logger.debug("do_remove_node finished, waiting for ddl fiber")
stopped = True
await ddl_task
logger.debug("ddl fiber done, finished")
@pytest.mark.asyncio
async def test_rebuild_node(manager: ManagerClient, random_tables: RandomTables):
"""rebuild a node"""
servers = await manager.running_servers()
await manager.rebuild_node(servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)
@pytest.mark.asyncio
async def test_concurrent_removenode_two_initiators_one_dead_node(manager: ManagerClient):
servers = await manager.running_servers()
assert len(servers) >= 3
await manager.server_stop_gracefully(servers[2].server_id)
try:
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id),
manager.remove_node(servers[1].server_id, servers[2].server_id)])
except Exception as e:
logger.info(f"exception raised due to concurrent remove node requests: {e}")
else:
raise Exception("concurrent removenode request should result in a failure, but unexpectedly succeeded")
@pytest.mark.asyncio
async def test_concurrent_removenode_one_initiator_two_dead_nodes(manager: ManagerClient):
"""
Tests the execution flow in case of performing remove node
operations concurrently for two distinct dead nodes.
"""
servers = await manager.running_servers()
await manager.servers_add(2, property_file=servers[0].property_file())
servers = await manager.running_servers()
assert len(servers) >= 5
await asyncio.gather(*[manager.server_stop_gracefully(servers[2].server_id),
manager.server_stop_gracefully(servers[1].server_id)])
ignore_nodes = [await manager.get_host_id(servers[1].server_id), await manager.get_host_id(servers[2].server_id)]
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id, ignore_dead=ignore_nodes),
manager.remove_node(servers[0].server_id, servers[1].server_id, ignore_dead=ignore_nodes)])
@pytest.mark.asyncio
async def test_concurrent_removenode_two_initiators_two_dead_nodes(manager: ManagerClient):
"""
Tests the execution flow in case of performing remove node
operations concurrently for two distinct dead nodes while
requests originating from separate intitiating nodes.
"""
servers = await manager.running_servers()
await manager.servers_add(2, property_file=servers[0].property_file())
servers = await manager.running_servers()
assert len(servers) >= 5
await asyncio.gather(*[manager.server_stop_gracefully(servers[2].server_id),
manager.server_stop_gracefully(servers[1].server_id)])
ignore_nodes = [await manager.get_host_id(servers[1].server_id), await manager.get_host_id(servers[2].server_id)]
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id, ignore_dead=ignore_nodes),
manager.remove_node(servers[3].server_id, servers[1].server_id, ignore_dead=ignore_nodes)])
@pytest.mark.asyncio
@skip_mode('release', 'error injection is not supported in release mode')
async def test_decommission_left_token_ring_retry(manager: ManagerClient):
"""
Tests the execution flow in case of performing decommission node
operation in left_token_ring transition state, while retrying the
left_token_ring handler due to failures in update_topology_state execution.
"""
servers = await manager.running_servers()
await manager.servers_add(1, property_file=servers[0].property_file())
servers = await manager.running_servers()
assert len(servers) >= 4
await gather_safely(*[manager.api.enable_injection(s.ip_addr, "finish_left_token_ring_transition_throw", one_shot=True) for s in servers])
await manager.decommission_node(servers[3].server_id)
servers = await manager.running_servers()
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
matches = [await log.grep("raft_topology - transition_state::left_token_ring, raft_topology_cmd::command::barrier failed") for log in logs]
assert sum(len(x) for x in matches) == 0