171 lines
9.5 KiB
Python
171 lines
9.5 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import new_test_keyspace
|
|
from cassandra import WriteFailure
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_cleanup_when_unnecessary(manager: ManagerClient):
|
|
"""The test runs two bootstraps and checks that there is no cleanup in between.
|
|
Then it runs a decommission and checks that cleanup runs automatically and then
|
|
it runs one more decommission and checks that no cleanup runs again.
|
|
Second part checks manual cleanup triggering. It adds a node. Triggers cleanup
|
|
through the REST API, checks that is runs, decommissions a node and check that the
|
|
cleanup did not run again.
|
|
"""
|
|
logger.info("start first server")
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack1"})
|
|
|
|
logger.info("start another two servers")
|
|
servers = await manager.running_servers()
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack2"})
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack3"})
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
servers = await manager.running_servers()
|
|
host_id_2 = await manager.get_host_id(servers[2].server_id)
|
|
logger.info(f"decommission {servers[2].server_id}")
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[2].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark)
|
|
for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 2
|
|
coordinator_log_matches = await logs[0].grep(
|
|
f"vnodes cleanup required by 'leave' of the node {host_id_2}: running global_token_metadata_barrier",
|
|
from_mark=marks[0])
|
|
assert len(coordinator_log_matches) == 1
|
|
|
|
servers = await manager.running_servers()
|
|
logger.info(f"decommission {servers[1].server_id}")
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[1].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
logger.info("add another server")
|
|
await manager.server_add(property_file={"dc": "dc1", "rack": "rack4"})
|
|
servers = await manager.running_servers()
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.api.cleanup_all(servers[0].ip_addr)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 1
|
|
|
|
logger.info(f"decommission {servers[1].server_id}")
|
|
marks = [await log.mark() for log in logs]
|
|
await manager.decommission_node(servers[1].server_id)
|
|
matches = [await log.grep("raft_topology - start vnodes_cleanup", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('debug', 'dev is enough')
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_cleanup_waits_for_stale_writes(manager: ManagerClient):
|
|
"""Scenario:
|
|
* Start two nodes, a vnodes-based table with an rf=2
|
|
* Run insert while bootstrapping another node, suspend this insert in database_apply_wait injection
|
|
* Bootstrap succeeds, capture the final topology version
|
|
* Start decommission -> triggers global barrier, which we fail on another injection
|
|
* This failure is not fatal, the cleanup procedure continues and blocks on waiting for the stale write
|
|
* We release the database_apply_wait injection, cleanup succeeds, write fails with 'stale topology exception'
|
|
"""
|
|
|
|
config = {'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
|
|
logger.info("start first server")
|
|
servers = [await manager.server_add(property_file={"dc": "dc1", "rack": "rack1"}, config=config)]
|
|
servers += [await manager.server_add(property_file={"dc": "dc1", "rack": "rack2"}, config=config)]
|
|
|
|
(cql, hosts) = await manager.get_ready_cql(servers)
|
|
log0 = await manager.server_open_log(servers[0].server_id)
|
|
log1 = await manager.server_open_log(servers[1].server_id)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
|
|
logger.info("Create table my_test_table")
|
|
await cql.run_async(f"CREATE TABLE {ks}.my_test_table (pk int PRIMARY KEY, c int);")
|
|
|
|
# Have a bootstrapping node hang in write_both_read_new
|
|
logger.info("Add third node")
|
|
servers += [await manager.server_add(
|
|
property_file={"dc": "dc1", "rack": "rack3"},
|
|
config=config,
|
|
start=False
|
|
)]
|
|
logger.info("Enable 'topology_coordinator/write_both_read_new/after_barrier' injection")
|
|
await manager.api.enable_injection(servers[0].ip_addr,
|
|
'topology_coordinator/write_both_read_new/after_barrier',
|
|
True)
|
|
logger.info("Start bootstrapping the third node")
|
|
bootstrap_task = asyncio.create_task(manager.server_start(servers[2].server_id))
|
|
logger.info("Waiting for topology_coordinator/write_both_read_new/after_barrier")
|
|
await log0.wait_for("topology_coordinator/write_both_read_new/after_barrier: waiting for message")
|
|
|
|
# Have a write request with write_both_read_new version stuck on both nodes:
|
|
# - On the first node, this exercises the coordinator fencing code path.
|
|
# - On the second node, this exercises the replica code path.
|
|
logger.info("Enable 'database_apply_wait' injection")
|
|
for s in servers[:-1]:
|
|
await manager.api.enable_injection(s.ip_addr, 'database_apply_wait',
|
|
False, parameters={'cf_name': 'my_test_table'})
|
|
logger.info("Start write")
|
|
write_task = cql.run_async(f"INSERT INTO {ks}.my_test_table (pk, c) VALUES (1, 1)", host=hosts[0])
|
|
logger.info("Waiting for database_apply_wait")
|
|
await log0.wait_for("database_apply_wait: wait")
|
|
await log1.wait_for("database_apply_wait: wait")
|
|
|
|
# Finish bootstrapping the node
|
|
logger.info("Trigger topology_coordinator/write_both_read_new/after_barrier")
|
|
await manager.api.message_injection(servers[0].ip_addr, "topology_coordinator/write_both_read_new/after_barrier")
|
|
await bootstrap_task
|
|
rows = await cql.run_async(
|
|
"select version from system.topology where key = 'topology'",
|
|
host=hosts[0])
|
|
version_after_node2_bootstrap = rows[0].version
|
|
host1_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Have a cleanup started by decommission and failed on global barrier wait for the stale write
|
|
logger.info("Enable 'raft_topology_barrier_and_drain_fail_before' and 'raft_topology_barrier_fail' injections")
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'raft_topology_barrier_and_drain_fail_before', True)
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'raft_topology_barrier_fail', True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, 'raft_topology_barrier_and_drain_fail_before', True)
|
|
logger.info("Start decommission the new node")
|
|
decommission_task = asyncio.create_task(
|
|
manager.decommission_node(servers[1].server_id))
|
|
logger.info("Waiting for global_token_metadata_barrier to fail")
|
|
await log0.wait_for(f"vnodes cleanup required by 'leave' of the node {host1_id}: global_token_metadata_barrier threw an error", timeout=15)
|
|
await log0.wait_for(f"update_fence_version: new fence_version {version_after_node2_bootstrap} is set, prev fence_version {version_after_node2_bootstrap - 1}, pending stale writes 1", timeout=15)
|
|
await log0.wait_for("vnodes_cleanup: wait for stale pending writes", timeout=15)
|
|
flush_matches = await log0.grep("vnodes_cleanup: flush_all_tables")
|
|
assert len(flush_matches) == 0
|
|
await log1.wait_for(f"update_fence_version: new fence_version {version_after_node2_bootstrap} is set, prev fence_version {version_after_node2_bootstrap - 1}, pending stale writes 1", timeout=15)
|
|
await log1.wait_for("vnodes_cleanup: wait for stale pending writes", timeout=15)
|
|
flush_matches = await log0.grep("vnodes_cleanup: flush_all_tables")
|
|
assert len(flush_matches) == 0
|
|
|
|
# Release the write -- the cleanup process should resume and the decommission succeed
|
|
await manager.api.message_injection(servers[0].ip_addr, "database_apply_wait")
|
|
await log0.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
|
|
await manager.api.message_injection(servers[1].ip_addr, "database_apply_wait")
|
|
await log1.wait_for("vnodes_cleanup: flush_all_tables", timeout=15)
|
|
|
|
await decommission_task
|
|
|
|
with pytest.raises(WriteFailure, match="stale topology exception"):
|
|
await write_task
|