Into a single database_apply one. Add three parameters: * ks_name and cf_name to filter the tables to be affected * what - what to do: throw or wait This leads to smaller footprint in the code and improved filtering for table names at the cost of some extra error injection params in the tests.
167 lines
9.4 KiB
Python
167 lines
9.4 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.cluster.util import new_test_keyspace, get_topology_version
|
|
from cassandra import WriteFailure
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_cleanup_when_unnecessary(manager: ManagerClient):
|
|
"""The test runs two bootstraps and checks that there is no cleanup in between.
|
|
Then it runs a decommission and checks that cleanup runs automatically and then
|
|
it runs one more decommission and checks that no cleanup runs again.
|
|
Second part checks manual cleanup triggering. It adds a node. Triggers cleanup
|
|
through the REST API, checks that is runs, decommissions a node and check that the
|
|
cleanup did not run again.
|
|
"""
|
|
logger.info("start first server")
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack1"})
|
|
|
|
logger.info("start another two servers")
|
|
servers = await manager.running_servers()
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack2"})
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack3"})
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
servers = await manager.running_servers()
|
|
host_id_2 = await manager.get_host_id(servers[2].server_id)
|
|
logger.info(f"decommission {servers[2].server_id}")
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[2].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark)
|
|
for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 2
|
|
coordinator_log_matches = await logs[0].grep(
|
|
f"vnodes cleanup required by 'leave' of the node {host_id_2}: running global_token_metadata_barrier",
|
|
from_mark=marks[0])
|
|
assert len(coordinator_log_matches) == 1
|
|
|
|
servers = await manager.running_servers()
|
|
logger.info(f"decommission {servers[1].server_id}")
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[1].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
logger.info("add another server")
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack4"})
|
|
servers = await manager.running_servers()
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.api.cleanup_all(servers[0].ip_addr)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 1
|
|
|
|
logger.info(f"decommission {servers[1].server_id}")
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[1].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='debug', reason='dev is enough')
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cleanup_waits_for_stale_writes(manager: ManagerClient):
|
|
"""Scenario:
|
|
* Start two nodes, a vnodes-based table with an rf=2
|
|
* Run insert while bootstrapping another node, suspend this insert in database_apply injection
|
|
* Bootstrap succeeds, capture the final topology version
|
|
* Start decommission -> triggers global barrier, which we fail on another injection
|
|
* This failure is not fatal, the cleanup procedure continues and blocks on waiting for the stale write
|
|
* We release the database_apply injection, cleanup succeeds, write fails with 'stale topology exception'
|
|
"""
|
|
|
|
config = {'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
|
|
logger.info("start first server")
|
|
servers = [await manager.server_add(property_file={"dc": "dc1", "rack": "rack1"}, config=config)]
|
|
servers += [await manager.server_add(property_file={"dc": "dc1", "rack": "rack2"}, config=config)]
|
|
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
log0 = await manager.server_open_log(servers[0].server_id)
|
|
log1 = await manager.server_open_log(servers[1].server_id)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
|
|
logger.info("Create table my_test_table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.my_test_table (pk int PRIMARY KEY, c int);")
|
|
|
|
# Have a bootstrapping node hang in write_both_read_new
|
|
logger.info("Add third node")
|
|
servers += [await manager.server_add(
|
|
property_file={"dc": "dc1", "rack": "rack3"},
|
|
config=config,
|
|
start=False
|
|
)]
|
|
logger.info("Enable 'topology_coordinator/write_both_read_new/after_barrier' injection")
|
|
await manager.api.enable_injection(servers[0].ip_addr,
|
|
'topology_coordinator/write_both_read_new/after_barrier',
|
|
True)
|
|
logger.info("Start bootstrapping the third node")
|
|
bootstrap_task = asyncio.create_task(manager.server_start(servers[2].server_id))
|
|
logger.info("Waiting for topology_coordinator/write_both_read_new/after_barrier")
|
|
await log0.wait_for("topology_coordinator/write_both_read_new/after_barrier: waiting for message")
|
|
|
|
# Have a write request with write_both_read_new version stuck on both nodes:
|
|
# - On the first node, this exercises the coordinator fencing code path.
|
|
# - On the second node, this exercises the replica code path.
|
|
logger.info("Enable 'database_apply' injection")
|
|
for s in servers[:-1]:
|
|
await manager.api.enable_injection(s.ip_addr, 'database_apply',
|
|
False, parameters={'ks_name': ks, 'cf_name': 'my_test_table', 'what': 'wait'})
|
|
logger.info("Start write")
|
|
write_task = cql.run_async(f"INSERT INTO {ks}.my_test_table (pk, c) VALUES (1, 1)", host=hosts[0])
|
|
logger.info("Waiting for database_apply")
|
|
await log0.wait_for("database_apply: wait")
|
|
await log1.wait_for("database_apply: wait")
|
|
|
|
# Finish bootstrapping the node
|
|
logger.info("Trigger topology_coordinator/write_both_read_new/after_barrier")
|
|
await manager.api.message_injection(servers[0].ip_addr, "topology_coordinator/write_both_read_new/after_barrier")
|
|
await bootstrap_task
|
|
version_after_node2_bootstrap = await get_topology_version(cql, hosts[0])
|
|
host1_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Have a cleanup started by decommission and failed on global barrier wait for the stale write
|
|
logger.info("Enable 'raft_topology_barrier_and_drain_fail_before' and 'raft_topology_barrier_fail' injections")
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'raft_topology_barrier_and_drain_fail_before', True)
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'raft_topology_barrier_fail', True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, 'raft_topology_barrier_and_drain_fail_before', True)
|
|
logger.info("Start decommission the new node")
|
|
decommission_task = asyncio.create_task(
|
|
manager.decommission_node(servers[1].server_id))
|
|
logger.info("Waiting for global_token_metadata_barrier to fail")
|
|
await log0.wait_for(f"vnodes cleanup required by 'leave' of the node {host1_id}: global_token_metadata_barrier threw an error", timeout=15)
|
|
await log0.wait_for(f"update_fence_version: new fence_version {version_after_node2_bootstrap} is set, prev fence_version {version_after_node2_bootstrap - 1}, pending stale writes 1", timeout=15)
|
|
await log0.wait_for("vnodes_cleanup: wait for stale pending writes", timeout=15)
|
|
flush_matches = await log0.grep("vnodes_cleanup: flush_all_tables")
|
|
assert len(flush_matches) == 0
|
|
await log1.wait_for(f"update_fence_version: new fence_version {version_after_node2_bootstrap} is set, prev fence_version {version_after_node2_bootstrap - 1}, pending stale writes 1", timeout=15)
|
|
await log1.wait_for("vnodes_cleanup: wait for stale pending writes", timeout=15)
|
|
flush_matches = await log0.grep("vnodes_cleanup: flush_all_tables")
|
|
assert len(flush_matches) == 0
|
|
|
|
# Release the write -- the cleanup process should resume and the decommission succeed
|
|
await manager.api.message_injection(servers[0].ip_addr, "database_apply")
|
|
await log0.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
|
|
await manager.api.message_injection(servers[1].ip_addr, "database_apply")
|
|
await log1.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
|
|
|
|
await decommission_task
|
|
|
|
with pytest.raises(WriteFailure, match="stale topology exception"):
|
|
await write_task
|