Currently, the test assumes that when 'topology_coordinator_pause_before_processing_backlog: waiting' is logged, the task for decommission must be there. This was based on the assumption that topology coordinator is idle and decommission request wakes it up. But if the server is slow enough, it may still be running the load balancer in reaction to table creation, and block on that injection point before decommission request was added. Fix by waiting for the task to appear rather than the injection. Fixes SCYLLADB-715
531 lines
24 KiB
Python
531 lines
24 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
|
from test.cluster.util import get_coordinator_host, new_test_keyspace, ensure_group0_leader_on
|
|
from test.pylib.internal_types import ServerInfo, IPAddress
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.scylla_cluster import gather_safely
|
|
from test.pylib.tablets import get_replica_count_by_host
|
|
from test.pylib.util import wait_for
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def count_requests_queued(manager: ManagerClient, coord_srv: ServerInfo, request_type: str) -> int:
|
|
tasks = await manager.api.get_tasks(coord_srv.ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
return len(list(filter(lambda t: t['type'] == request_type, tasks)))
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablets_are_drained_in_parallel(manager: ManagerClient):
|
|
"""
|
|
Verifies that when we start decommissioning two nodes at the same time,
|
|
migrations draining tablets from both nodes are interleaved, indicating that
|
|
the decommissions are indeed progressing in parallel and not sequentially.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
|
|
servers = await manager.servers_add(4, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1', 'rack2']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_srv = await get_coordinator_host(manager)
|
|
log = await manager.server_open_log(coord_srv.server_id) # group0 leader
|
|
mark = await log.mark()
|
|
await manager.api.enable_injection(coord_srv.ip_addr, "topology_coordinator_pause_before_processing_backlog", one_shot=True)
|
|
|
|
decomm_task1 = asyncio.create_task(manager.decommission_node(servers[2].server_id)) # rack1
|
|
decomm_task2 = asyncio.create_task(manager.decommission_node(servers[3].server_id)) # rack2
|
|
|
|
mark, _ = await log.wait_for('topology_coordinator_pause_before_processing_backlog: waiting', from_mark=mark)
|
|
|
|
# Pause topology until all requests are queued to workaround for group0 concurrent modification
|
|
# preventing second request from being queued due to fast migrations on behalf of the first requests.
|
|
# This problem is more pronounced in test environment where migrations are instant.
|
|
logger.info("Waiting for requests to be queued")
|
|
async def requests_queued():
|
|
count = await count_requests_queued(manager, coord_srv, 'decommission')
|
|
return count if count == 2 else None
|
|
await wait_for(requests_queued, time.time() + 60)
|
|
await manager.api.message_injection(coord_srv.ip_addr, "topology_coordinator_pause_before_processing_backlog")
|
|
|
|
decomm_hostid_1 = await manager.get_host_id(servers[2].server_id)
|
|
decomm_hostid_2 = await manager.get_host_id(servers[3].server_id)
|
|
|
|
# Verify that migrations are interleaved, which indicates parallel decommission.
|
|
mark, _ = await log.wait_for(f"Initiating tablet cleanup of (.*) on {decomm_hostid_1}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Initiating tablet cleanup of (.*) on {decomm_hostid_2}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Initiating tablet cleanup of (.*) on {decomm_hostid_1}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Initiating tablet cleanup of (.*) on {decomm_hostid_2}", from_mark=mark)
|
|
|
|
await decomm_task1
|
|
await decomm_task2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("same_rack", [False, True])
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablets_are_rebuilt_in_parallel(manager: ManagerClient, same_rack):
|
|
"""
|
|
Verifies that when we start removing two nodes at the same time,
|
|
tablet replica rebuilds for both nodes are interleaved, indicating that
|
|
the removals are indeed progressing in parallel and not sequentially.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
|
|
# same_rack == True
|
|
# rack1: 2 servers
|
|
# rack2: 3 servers (will have 2 removed)
|
|
#
|
|
# same_rack == False
|
|
# rack1: 3 servers (will have 1 removed)
|
|
# rack2: 2 servers (will have 1 removed)
|
|
|
|
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack2" if same_rack else "rack1"}, # will be removed
|
|
{"dc": "dc1", "rack": "rack2"}, # will be removed
|
|
])
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
coord_srv = servers[0]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1', 'rack2']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
servers_to_remove = [servers[3], servers[4]]
|
|
host_ids_to_remove = await gather_safely(*(manager.get_host_id(s.server_id) for s in servers_to_remove))
|
|
|
|
logger.info("Stopping servers to be removed")
|
|
|
|
await gather_safely(*(manager.server_stop_gracefully(srv.server_id) for srv in servers_to_remove))
|
|
|
|
for srv in servers_to_remove:
|
|
await manager.server_not_sees_other_server(coord_srv.ip_addr, srv.ip_addr)
|
|
|
|
log = await manager.server_open_log(coord_srv.server_id)
|
|
mark = await log.mark()
|
|
await manager.api.enable_injection(coord_srv.ip_addr, "topology_coordinator_pause_before_processing_backlog", one_shot=True)
|
|
|
|
logger.info("Removing servers")
|
|
await manager.api.exclude_node(coord_srv.ip_addr, host_ids_to_remove)
|
|
tasks = [asyncio.create_task(manager.remove_node(coord_srv.server_id, srv.server_id)) for srv in servers_to_remove]
|
|
|
|
mark, _ = await log.wait_for('topology_coordinator_pause_before_processing_backlog: waiting', from_mark=mark)
|
|
|
|
# Pause topology until all requests are queued to workaround for group0 concurrent modification
|
|
# preventing second request from being queued due to fast migrations on behalf of the first requests.
|
|
# This problem is more pronounced in test environment where migrations are instant.
|
|
logger.info("Waiting for requests to be queued")
|
|
async def requests_queued():
|
|
count = await count_requests_queued(manager, coord_srv, 'remove node')
|
|
return count if count == 2 else None
|
|
await wait_for(requests_queued, time.time() + 60)
|
|
await manager.api.message_injection(coord_srv.ip_addr, "topology_coordinator_pause_before_processing_backlog")
|
|
|
|
# Verify that migrations are interleaved.
|
|
mark, _ = await log.wait_for(f"Tablet cleanup of (.*) on {host_ids_to_remove[0]}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Tablet cleanup of (.*) on {host_ids_to_remove[1]}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Tablet cleanup of (.*) on {host_ids_to_remove[0]}", from_mark=mark)
|
|
mark, _ = await log.wait_for(f"Tablet cleanup of (.*) on {host_ids_to_remove[1]}", from_mark=mark)
|
|
|
|
await gather_safely(*tasks)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_decommission_can_be_canceled(manager: ManagerClient):
|
|
"""
|
|
Verifies that decommission can be canceled when it's still in the phase of migrating tablets.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
|
|
servers = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "rack1"})
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
coord_serv = servers[0]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
mark = await coord_log.mark()
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "topology_coordinator_pause_before_processing_backlog", one_shot=True)
|
|
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
|
|
decomm_hostid = await manager.get_host_id(servers[1].server_id)
|
|
|
|
tm = TaskManagerClient(manager.api)
|
|
task = await tm.wait_task_appears(servers[0].ip_addr, 'node_ops', task_type='decommission', entity=decomm_hostid)
|
|
logger.info(f'task: {task}')
|
|
task_id = task.task_id
|
|
logger.info(f'decommission task: {task_id}')
|
|
|
|
await coord_log.wait_for('topology_coordinator_pause_before_processing_backlog: waiting', from_mark=mark)
|
|
|
|
# Aborting in a pending or paused state should be immediate even if migrations are ongoing.
|
|
await manager.api.abort_task(servers[0].ip_addr, task_id)
|
|
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
|
|
|
tasks = await manager.api.get_tasks(servers[0].ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "topology_coordinator_pause_before_processing_backlog")
|
|
|
|
with pytest.raises(Exception, match="aborted on user request"):
|
|
await decomm_task
|
|
|
|
# Verify that decommissioned node is still serving tablets and not drained.
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|
|
load = await get_replica_count_by_host(manager, servers[0], ks, "tab")
|
|
assert load[decomm_hostid] > 0
|
|
|
|
logger.info('Verify start_time is preserved by abort')
|
|
task2 = await manager.api.get_task_status(servers[0].ip_addr, task_id)
|
|
assert task2['start_time'] == task.start_time
|
|
|
|
logger.info('Verify aborting during paused state')
|
|
|
|
mark = await coord_log.mark()
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup", one_shot=True)
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
|
|
await coord_log.wait_for('Waiting after tablet cleanup', from_mark=mark)
|
|
task = await tm.wait_task_appears(servers[0].ip_addr, 'node_ops', task_type='decommission', entity=decomm_hostid)
|
|
task_id = task.task_id
|
|
await manager.api.abort_task(servers[0].ip_addr, task_id)
|
|
await manager.api.wait_task(servers[0].ip_addr, task_id)
|
|
await manager.api.message_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup")
|
|
|
|
with pytest.raises(Exception, match="aborted on user request"):
|
|
await decomm_task
|
|
|
|
# Verify decommission can be retried and completes successfully.
|
|
await manager.decommission_node(servers[1].server_id)
|
|
load = await get_replica_count_by_host(manager, servers[0], ks, "tab")
|
|
assert load[decomm_hostid] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_decommission_is_rejected_when_another_one_is_still_pending(manager: ManagerClient):
|
|
"""
|
|
Verify that when there is pending decommission, the next one is already validated
|
|
taking into account that the node will be removed by the first one.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
coord_serv = servers[0]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1', 'rack2']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
mark = await coord_log.mark()
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup", one_shot=True)
|
|
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
|
|
|
|
# Trap the first decommission in the tablet draining stage
|
|
await coord_log.wait_for('Waiting after tablet cleanup', from_mark=mark)
|
|
|
|
# Second decommission should fail, because if both decommissions succeeded, the rack would be empty.
|
|
with pytest.raises(Exception, match="node decommission rejected: "):
|
|
await manager.decommission_node(servers[2].server_id)
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup")
|
|
# The first one should succeed.
|
|
await decomm_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_remove_is_canceled_if_there_is_node_down(manager: ManagerClient):
|
|
"""
|
|
Verifies that request is canceled if the vnode part would fail due to node being down.
|
|
Preserves the old behavior of serial decommission, where topology coordinator
|
|
decides to cancel all pending requests in get_next_task() if none of them can proceed.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
servers = await manager.servers_add(5, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
coord_serv = servers[0]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1', 'rack2']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
await manager.server_stop_gracefully(servers[3].server_id)
|
|
await manager.server_stop_gracefully(servers[2].server_id)
|
|
|
|
with pytest.raises(Exception, match="Canceled. Dead nodes: "):
|
|
await manager.remove_node(coord_serv.server_id, servers[2].server_id)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_decommission_start_time_is_stable(manager: ManagerClient):
|
|
"""
|
|
Verifies that task's start time is constant across the whole operation.
|
|
"""
|
|
|
|
servers = await manager.servers_add(2, property_file={"dc": "dc1", "rack": "rack1"})
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_serv = servers[0]
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
mark = await coord_log.mark()
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "topology_coordinator_pause_after_node_transition", one_shot=True)
|
|
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
|
|
decomm_hostid = await manager.get_host_id(servers[1].server_id)
|
|
|
|
await coord_log.wait_for('topology_coordinator_pause_after_node_transition: waiting for message', from_mark=mark)
|
|
|
|
tasks = await manager.api.get_tasks(servers[0].ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
task = next(t for t in tasks if t['type'] == 'decommission')
|
|
logger.info(f'decommission task: {task}')
|
|
assert task['entity'] == decomm_hostid
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "topology_coordinator_pause_after_node_transition")
|
|
await decomm_task
|
|
|
|
tasks = await manager.api.get_tasks(servers[0].ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
task2 = next(t for t in tasks if t['type'] == 'decommission')
|
|
assert task2['start_time'] == task['start_time']
|
|
|
|
await manager.api.quiesce_topology(servers[0].ip_addr)
|
|
|
|
tasks = await manager.api.get_tasks(servers[0].ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
task2 = next(t for t in tasks if t['type'] == 'decommission')
|
|
assert task2['start_time'] == task['start_time']
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_decommission_can_not_be_canceled_once_running(manager: ManagerClient):
|
|
"""
|
|
Verifies that attempt to abort decommission is rejected if it's already in the vnode transition phase.
|
|
"""
|
|
|
|
servers = await manager.servers_add(2, property_file={"dc": "dc1", "rack": "rack1"})
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_serv = servers[0]
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
mark = await coord_log.mark()
|
|
|
|
# Could be any injection in the middle of decommission without group0 guard held.
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "in_left_token_ring_transition", one_shot=True)
|
|
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
|
|
decomm_hostid = await manager.get_host_id(servers[1].server_id)
|
|
|
|
await coord_log.wait_for('in_left_token_ring_transition: waiting', from_mark=mark)
|
|
|
|
tasks = await manager.api.get_tasks(servers[0].ip_addr, 'node_ops')
|
|
logger.info(f'tasks: {tasks}')
|
|
task_to_cancel = next(t['task_id'] for t in tasks if t['type'] == 'decommission')
|
|
logger.info(f'decommission task: {task_to_cancel}')
|
|
|
|
with pytest.raises(Exception, match="cannot be aborted"):
|
|
await manager.api.abort_task(servers[0].ip_addr, task_to_cancel)
|
|
|
|
status = await manager.api.get_task_status(servers[0].ip_addr, task_to_cancel)
|
|
logger.info(f'status: {status}')
|
|
assert status['state'] == 'running'
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "in_left_token_ring_transition")
|
|
await decomm_task
|
|
|
|
load = await get_replica_count_by_host(manager, servers[0], ks, "tab")
|
|
assert load[decomm_hostid] == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_decommission_fails_if_capacity_is_gone_during_draining(manager: ManagerClient):
|
|
"""
|
|
Verifies the scenario of drain not being able to progress because there is no viable
|
|
replica to take over the tablets being drained. Decommission should fail in this case
|
|
and not hang forever.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
|
|
servers = await manager.servers_add(3, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack2']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_serv = servers[0]
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
|
|
mark = await coord_log.mark()
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup", one_shot=True)
|
|
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[2].server_id))
|
|
|
|
await coord_log.wait_for('Waiting after tablet cleanup', from_mark=mark)
|
|
|
|
srv_to_remove = servers[1]
|
|
await manager.server_stop_gracefully(srv_to_remove.server_id)
|
|
await manager.server_not_sees_other_server(coord_serv.ip_addr, srv_to_remove.ip_addr)
|
|
await manager.api.exclude_node(coord_serv.ip_addr, [await manager.get_host_id(srv_to_remove.server_id)])
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "wait_after_tablet_cleanup")
|
|
|
|
with pytest.raises(Exception, match="Decommission failed"):
|
|
await decomm_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_node_lost_during_decommission_drain(manager: ManagerClient):
|
|
"""
|
|
Verifies the scenario when decommissioned node is lost and marked as excluded during draining.
|
|
In such case, drain will complete and do the rebuilds. But decommission should fail
|
|
since we cannot tell the user that we decommissioned the node without data loss.
|
|
The user will have to do removenode on it.
|
|
"""
|
|
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
|
|
servers = await manager.servers_add(2, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
])
|
|
await ensure_group0_leader_on(manager, servers[0])
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy',"
|
|
" 'dc1': ['rack1']} AND tablets = {'initial': 32};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.tab (pk int PRIMARY KEY);")
|
|
|
|
coord_serv = servers[0]
|
|
coord_log = await manager.server_open_log(coord_serv.server_id) # group0 leader
|
|
|
|
mark = await coord_log.mark()
|
|
|
|
await manager.api.quiesce_topology(coord_serv.ip_addr)
|
|
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "migration_streaming_wait", one_shot=True)
|
|
|
|
srv_to_remove = servers[1]
|
|
nodetool_task = asyncio.create_task(manager.decommission_node(srv_to_remove.server_id))
|
|
|
|
mark, _ = await coord_log.wait_for('migration_streaming_wait: start', from_mark=mark)
|
|
await manager.api.enable_injection(coord_serv.ip_addr, "topology_coordinator_pause_before_processing_backlog", one_shot=True)
|
|
await manager.api.message_injection(coord_serv.ip_addr, "migration_streaming_wait")
|
|
|
|
tm = TaskManagerClient(manager.api)
|
|
task = await tm.wait_task_appears(servers[0].ip_addr, 'node_ops', task_type='decommission')
|
|
decomm_task = task.task_id
|
|
logger.info(f'decommission task: {decomm_task}')
|
|
|
|
await coord_log.wait_for('topology_coordinator_pause_before_processing_backlog: waiting', from_mark=mark)
|
|
|
|
await manager.server_stop(srv_to_remove.server_id)
|
|
await manager.server_not_sees_other_server(coord_serv.ip_addr, srv_to_remove.ip_addr)
|
|
await manager.api.exclude_node(coord_serv.ip_addr, [await manager.get_host_id(srv_to_remove.server_id)])
|
|
|
|
await manager.api.message_injection(coord_serv.ip_addr, "topology_coordinator_pause_before_processing_backlog")
|
|
|
|
# Node is down, connection was dropped
|
|
with pytest.raises(Exception):
|
|
await nodetool_task
|
|
|
|
await manager.api.wait_task(servers[0].ip_addr, decomm_task)
|
|
|
|
# Verify that decommission failed.
|
|
status = await manager.api.get_task_status(servers[0].ip_addr, decomm_task)
|
|
logger.info(f"status: {status}")
|
|
assert status['state'] == 'failed'
|
|
|
|
# No tablet rebuilds must be started.
|
|
assert not await coord_log.grep("Initiating tablet.*rebuild")
|
|
|
|
await manager.remove_node(coord_serv.server_id, srv_to_remove.server_id)
|