mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
In the present scenario, there are issues in left_token_ring transition state execution in the decommissioning path. In case of concurrent mutation race conditions, we enter left_token_ring more than once, and apparently if we enter left token ring second time, we try to barrier the decommisioned node, which at this point is no longer possible. That's what causes the errors. This pr resolves the issue by adding a check right in the start of left_token_ring to check if the first topology state update, which marks the request as done is completed. In this case, its confirmed that this is the second time flow is entering left_token_ring and the steps preceding the request status update should be skipped. In such cases, all the rest steps are skipped and topology node status update( which threw error in previous trial) is executed directly. Node removal status from group0 is also checked and remove operation is retried if failed last time. Although these changes are done with regard to the decommission operation behavior in `left_token_ring` transition state, but since the pr doesn't interfere with the core logic, it should not derail any rollback specific logic. The changes just prevent some non-idempotent operations from re-occuring in case of failures. Rest of the core logic remain intact. Test is also added to confirm the proper working of the same. Fixes: scylladb/scylladb#20865 Backport is not needed, since this is not a super critical bug fix. Closes scylladb/scylladb#26717
220 lines
10 KiB
Python
220 lines
10 KiB
Python
#
|
|
# Copyright (C) 2022-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
"""
|
|
Test consistency of schema changes with topology changes.
|
|
"""
|
|
import logging
|
|
import asyncio
|
|
import random
|
|
import time
|
|
from test import pylib
|
|
from test.cluster.conftest import skip_mode
|
|
from test.pylib.util import wait_for
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.random_tables import RandomTables
|
|
from test.pylib.scylla_cluster import gather_safely
|
|
from test.cluster.util import check_token_ring_and_group0_consistency, \
|
|
wait_for_token_ring_and_group0_consistency
|
|
import pytest
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
pytestmark = pytest.mark.prepare_3_racks_cluster
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_node_add_column(manager: ManagerClient, random_tables: RandomTables):
|
|
"""Add a node, remove an original node, add a column"""
|
|
servers = await manager.running_servers()
|
|
table = await random_tables.add_table(ncolumns=5)
|
|
await manager.server_add(property_file=servers[1].property_file())
|
|
await manager.server_stop_gracefully(servers[1].server_id) # stop [1]
|
|
await manager.remove_node(servers[0].server_id, servers[1].server_id) # Remove [1]
|
|
await check_token_ring_and_group0_consistency(manager)
|
|
await table.add_column()
|
|
await random_tables.verify_schema()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_node_add_column(manager: ManagerClient, random_tables: RandomTables):
|
|
"""Add a node, remove an original node, add a column"""
|
|
table = await random_tables.add_table(ncolumns=5)
|
|
servers = await manager.running_servers()
|
|
decommission_target = servers[1]
|
|
# The sleep injections significantly increase the probability of reproducing #11780:
|
|
# 1. bootstrapped_server finishes bootstrapping and enters NORMAL state
|
|
# 2. decommission_target starts storage_service::handle_state_normal(bootstrapped_server),
|
|
# enters sleep before calling storage_service::notify_joined
|
|
# 3. we start decommission on decommission_target
|
|
# 4. decommission_target sends node_ops_verb with decommission_prepare request to bootstrapped_server
|
|
# 5. bootstrapped_server receives the RPC and enters sleep
|
|
# 6. decommission_target handle_state_normal wakes up,
|
|
# calls storage_service::notify_joined which drops some RPC clients
|
|
# 7. If #11780 is not fixed, this will fail the node_ops_verb RPC, causing decommission to fail
|
|
await manager.api.enable_injection(
|
|
decommission_target.ip_addr, 'storage_service_notify_joined_sleep', one_shot=True)
|
|
bootstrapped_server = await manager.server_add(property_file=decommission_target.property_file())
|
|
async def no_joining_nodes():
|
|
joining_nodes = await manager.api.get_joining_nodes(decommission_target.ip_addr)
|
|
return not joining_nodes
|
|
# Wait until decommission_target thinks that bootstrapped_server is NORMAL
|
|
# note: when this wait finishes, we're usually in the middle of storage_service::handle_state_normal
|
|
await wait_for(no_joining_nodes, time.time() + 30, period=.1)
|
|
await manager.api.enable_injection(
|
|
bootstrapped_server.ip_addr, 'storage_service_decommission_prepare_handler_sleep', one_shot=True)
|
|
await manager.decommission_node(decommission_target.server_id)
|
|
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
|
await table.add_column()
|
|
await random_tables.verify_schema()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip(reason="Wait for @slow attribute, #11713")
|
|
async def test_remove_node_with_concurrent_ddl(manager: ManagerClient, random_tables: RandomTables):
|
|
stopped = False
|
|
ddl_failed = False
|
|
|
|
async def do_ddl() -> None:
|
|
nonlocal ddl_failed
|
|
iteration = 0
|
|
while not stopped:
|
|
logger.debug(f'ddl, iteration {iteration} started')
|
|
try:
|
|
# If the node was removed, the driver may retry "create table" on another node,
|
|
# but the request might have already been completed.
|
|
# The same applies to drop_table.
|
|
|
|
await random_tables.add_tables(5, 5, if_not_exists=True)
|
|
await random_tables.verify_schema()
|
|
while len(random_tables.tables) > 0:
|
|
await random_tables.drop_table(random_tables.tables[-1], if_exists=True)
|
|
logger.debug(f'ddl, iteration {iteration} finished')
|
|
except:
|
|
logger.exception(f'ddl, iteration {iteration} failed')
|
|
ddl_failed = True
|
|
raise
|
|
iteration += 1
|
|
|
|
async def do_remove_node() -> None:
|
|
for i in range(10):
|
|
logger.debug(f'do_remove_node [{i}], iteration started')
|
|
if ddl_failed:
|
|
logger.debug(f'do_remove_node [{i}], ddl failed, exiting')
|
|
break
|
|
server_infos = await manager.running_servers()
|
|
host_ids = await asyncio.gather(*(manager.get_host_id(s.server_id) for s in server_infos))
|
|
initiator_index, target_index = random.sample(range(len(server_infos)), 2)
|
|
initiator_info = server_infos[initiator_index]
|
|
target_info = server_infos[target_index]
|
|
target_host_id = host_ids[target_index]
|
|
logger.info(f'do_remove_node [{i}], running remove_node, '
|
|
f'initiator server [{initiator_info.ip_addr}], target ip [{target_info.ip_addr}], '
|
|
f'target host id [{target_host_id}]')
|
|
await manager.wait_for_host_known(initiator_info.ip_addr, target_host_id)
|
|
logger.info(f'do_remove_node [{i}], stopping target server [{target_info.ip_addr}], host_id [{target_host_id}]')
|
|
await manager.server_stop_gracefully(target_info.server_id)
|
|
logger.info(f'do_remove_node [{i}], invoking remove_node')
|
|
await manager.remove_node(initiator_info.server_id, target_info.server_id, [target_info.ip_addr])
|
|
# TODO: check that group 0 no longer contains the removed node (#12153)
|
|
logger.info(f'do_remove_node [{i}], remove_node done')
|
|
new_server_ip = await manager.server_add()
|
|
logger.info(f'do_remove_node [{i}], server_add [{new_server_ip}] done')
|
|
logger.info(f'do_remove_node [{i}], iteration finished')
|
|
|
|
ddl_task = asyncio.create_task(do_ddl())
|
|
try:
|
|
await do_remove_node()
|
|
finally:
|
|
logger.debug("do_remove_node finished, waiting for ddl fiber")
|
|
stopped = True
|
|
await ddl_task
|
|
logger.debug("ddl fiber done, finished")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rebuild_node(manager: ManagerClient, random_tables: RandomTables):
|
|
"""rebuild a node"""
|
|
servers = await manager.running_servers()
|
|
await manager.rebuild_node(servers[0].server_id)
|
|
await check_token_ring_and_group0_consistency(manager)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_removenode_two_initiators_one_dead_node(manager: ManagerClient):
|
|
servers = await manager.running_servers()
|
|
assert len(servers) >= 3
|
|
|
|
await manager.server_stop_gracefully(servers[2].server_id)
|
|
|
|
try:
|
|
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id),
|
|
manager.remove_node(servers[1].server_id, servers[2].server_id)])
|
|
except Exception as e:
|
|
logger.info(f"exception raised due to concurrent remove node requests: {e}")
|
|
else:
|
|
raise Exception("concurrent removenode request should result in a failure, but unexpectedly succeeded")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_removenode_one_initiator_two_dead_nodes(manager: ManagerClient):
|
|
"""
|
|
Tests the execution flow in case of performing remove node
|
|
operations concurrently for two distinct dead nodes.
|
|
|
|
"""
|
|
servers = await manager.running_servers()
|
|
await manager.servers_add(2, property_file=servers[0].property_file())
|
|
servers = await manager.running_servers()
|
|
assert len(servers) >= 5
|
|
|
|
await asyncio.gather(*[manager.server_stop_gracefully(servers[2].server_id),
|
|
manager.server_stop_gracefully(servers[1].server_id)])
|
|
|
|
ignore_nodes = [await manager.get_host_id(servers[1].server_id), await manager.get_host_id(servers[2].server_id)]
|
|
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id, ignore_dead=ignore_nodes),
|
|
manager.remove_node(servers[0].server_id, servers[1].server_id, ignore_dead=ignore_nodes)])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_removenode_two_initiators_two_dead_nodes(manager: ManagerClient):
|
|
"""
|
|
Tests the execution flow in case of performing remove node
|
|
operations concurrently for two distinct dead nodes while
|
|
requests originating from separate intitiating nodes.
|
|
|
|
"""
|
|
servers = await manager.running_servers()
|
|
await manager.servers_add(2, property_file=servers[0].property_file())
|
|
servers = await manager.running_servers()
|
|
assert len(servers) >= 5
|
|
|
|
await asyncio.gather(*[manager.server_stop_gracefully(servers[2].server_id),
|
|
manager.server_stop_gracefully(servers[1].server_id)])
|
|
|
|
ignore_nodes = [await manager.get_host_id(servers[1].server_id), await manager.get_host_id(servers[2].server_id)]
|
|
await asyncio.gather(*[manager.remove_node(servers[0].server_id, servers[2].server_id, ignore_dead=ignore_nodes),
|
|
manager.remove_node(servers[3].server_id, servers[1].server_id, ignore_dead=ignore_nodes)])
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injection is not supported in release mode')
|
|
async def test_decommission_left_token_ring_retry(manager: ManagerClient):
|
|
"""
|
|
Tests the execution flow in case of performing decommission node
|
|
operation in left_token_ring transition state, while retrying the
|
|
left_token_ring handler due to failures in update_topology_state execution.
|
|
|
|
"""
|
|
servers = await manager.running_servers()
|
|
await manager.servers_add(1, property_file=servers[0].property_file())
|
|
servers = await manager.running_servers()
|
|
assert len(servers) >= 4
|
|
|
|
await gather_safely(*[manager.api.enable_injection(s.ip_addr, "finish_left_token_ring_transition_throw", one_shot=True) for s in servers])
|
|
|
|
await manager.decommission_node(servers[3].server_id)
|
|
|
|
servers = await manager.running_servers()
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
|
|
matches = [await log.grep("raft_topology - transition_state::left_token_ring, raft_topology_cmd::command::barrier failed") for log in logs]
|
|
assert sum(len(x) for x in matches) == 0
|