Files
scylladb/test/cluster/test_truncate_with_tablets.py
Gleb Natapov a9e99d1d3c topology coordinator: allow running multiple global commands in parallel
Now that we have a global request queue do not check that there is
global request before adding another one. Amend truncation test that
expects it explicitly and add another one that checks that two truncates
can be submitted in parallel.
2025-06-11 11:29:33 +03:00

350 lines
16 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from cassandra.query import SimpleStatement, ConsistencyLevel
from cassandra.protocol import InvalidRequest
from cassandra.cluster import TruncateError
from cassandra.policies import FallthroughRetryPolicy
from test.pylib.manager_client import ManagerClient
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, new_test_keyspace
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.util import wait_for_cql_and_get_hosts
import time
import pytest
import logging
import asyncio
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_while_migration(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['migration_streaming_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
# Add a node to the cluster. This will cause the tablet load balancer to migrate one tablet to the new node
servers.append(await manager.server_add(config=cfg))
# Wait for tablet streaming to start
pending_node = servers[1]
pending_log = await manager.server_open_log(pending_node.server_id)
await pending_log.wait_for('migration_streaming_wait: start')
await manager.api.message_injection(pending_node.ip_addr, 'migration_streaming_wait')
# Do a TRUNCATE TABLE while the tablet is being streamed
await cql.run_async(f'TRUNCATE TABLE {ks}.test')
# Wait for streaming to complete
await pending_log.wait_for('raft_topology - Streaming for tablet migration of.*successful')
# Check if we have any data
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
async def get_raft_leader_and_log(manager: ManagerClient, servers):
raft_leader_host_id = await get_topology_coordinator(manager)
for s in servers:
if raft_leader_host_id == await manager.get_host_id(s.server_id):
raft_leader = s
break
raft_leader_log = await manager.server_open_log(raft_leader.server_id)
return (raft_leader, raft_leader_log)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_with_concurrent_drop(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['truncate_table_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
if raft_leader == servers[0]:
trunc_host = hosts[1]
drop_host = hosts[2]
elif raft_leader == servers[1]:
trunc_host = hosts[0]
drop_host = hosts[2]
elif raft_leader == servers[2]:
trunc_host = hosts[0]
drop_host = hosts[1]
else:
assert False, 'Unable to determine raft leader'
# Start a TRUNCATE in the background
trunc_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=trunc_host)
# Wait for the topology coordinator to reach a point wher it is about to start sending the truncate RPCs
await raft_leader_log.wait_for('truncate_table_wait: waiting for message')
# Execute DROP TABLE
await cql.run_async(f'DROP TABLE {ks}.test', host=drop_host)
# Release TRUNCATE table in topology coordinator
await manager.api.message_injection(raft_leader.ip_addr, 'truncate_table_wait')
# Check we received an error
with pytest.raises(InvalidRequest, match='unconfigured table test'):
await trunc_future
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_while_node_restart(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled' }
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
# Decide which node to restart; select a node with a replica but not the raft leader
tablet_replicas = await get_all_tablet_replicas(manager, raft_leader, ks, 'test')
replica_hosts = [tr.replicas[0][0] for tr in tablet_replicas]
for s in servers:
if s != raft_leader:
host_id = await manager.get_host_id(s.server_id)
if host_id in replica_hosts:
restart_node = s
break
# Shutdown the node containing a replica
await manager.server_stop_gracefully(restart_node.server_id)
# Start truncating in the background
trunc_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=hosts[0])
# Restart the node
await manager.server_start(restart_node.server_id)
# Wait for truncate to complete
await trunc_future
# Check if truncate was successful
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_with_coordinator_crash(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled' }
servers = []
servers.append(await manager.server_add(config=cfg))
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
if raft_leader == servers[0]:
trunc_host = hosts[1]
else:
trunc_host = hosts[0]
# Enable injection to crash the raft leader after truncate cleared the session ID
await manager.api.enable_injection(raft_leader.ip_addr, 'truncate_crash_after_session_clear', one_shot=False)
# Start a TRUNCATE in the background
trunc_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=trunc_host)
# Wait for the topology coordinator to crash
await raft_leader_log.wait_for('truncate_crash_after_session_clear hit, killing the node')
await manager.server_stop(raft_leader.server_id)
# Restart the crashed node
await manager.server_start(raft_leader.server_id)
# Wait for truncate to complete
await trunc_future
# Check if we have any data
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_truncate_while_truncate_already_waiting(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['migration_streaming_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
# Add a node to the cluster. This will cause the load balancer to migrate one tablet to the new node
servers.append(await manager.server_add(config=cfg))
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
s1_log = await manager.server_open_log(servers[1].server_id)
# Wait for tablet streaming to start
await s1_log.wait_for('migration_streaming_wait: start')
# Run a truncate which will quickly time out, but the truncate fiber remains alive
# Do not attempt to retry automatically (hense the FallthroughRetryPolicy)
with pytest.raises((TruncateError), match=f'Timeout during TRUNCATE TABLE of {ks}.test'):
await cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test USING TIMEOUT 100ms', retry_policy=FallthroughRetryPolicy()))
# Run another truncate on the same table while the timedout one is still waiting
truncate_future = cql.run_async(f'TRUNCATE TABLE {ks}.test', host=hosts[1])
# Release streaming
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
# Wait for the joined truncate to complete
await truncate_future
# Check if we have any data
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
# Reproduces https://github.com/scylladb/scylladb/issues/23771.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_replay_position_check_during_truncate(manager):
logger.info("Bootstrapping cluster")
cfg = { 'auto_snapshot': True }
cmdline = ['--smp=1']
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
server = servers[0]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(10)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
#await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.enable_injection(server.ip_addr, "database_truncate_wait", True)
s1_log = await manager.server_open_log(server.server_id)
s1_mark = await s1_log.mark()
truncate_task = cql.run_async(f"TRUNCATE {ks}.test")
await s1_log.wait_for(f"database_truncate_wait: waiting", from_mark=s1_mark)
keys = range(10)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await manager.api.flush_keyspace(server.ip_addr, ks)
await manager.api.message_injection(server.ip_addr, "database_truncate_wait")
await s1_log.wait_for(f"database_truncate_wait: message received", from_mark=s1_mark)
await truncate_task
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_parallel_truncate(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'tablets_mode_for_new_keyspaces': 'enabled',
'error_injections_at_startup': ['migration_streaming_wait']
}
servers = []
servers.append(await manager.server_add(config=cfg))
cql = manager.get_cql()
# Create a keyspace with tablets and initial_tablets == 2, then insert data
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f'CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);')
await cql.run_async(f'CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, c int);')
keys = range(1024)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test1 (pk, c) VALUES ({k}, {k});') for k in keys])
# Add a node to the cluster. This will cause the load balancer to migrate one tablet to the new node
servers.append(await manager.server_add(config=cfg))
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
s1_log = await manager.server_open_log(servers[1].server_id)
# Wait for tablet streaming to start
await s1_log.wait_for('migration_streaming_wait: start')
tf1 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test', retry_policy=FallthroughRetryPolicy()))
tf2 = cql.run_async(SimpleStatement(f'TRUNCATE TABLE {ks}.test1', retry_policy=FallthroughRetryPolicy()))
# Release streaming
await manager.api.message_injection(servers[1].ip_addr, 'migration_streaming_wait')
# Wait for the joined truncate to complete
await tf1
await tf2
# Check if we have any data
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0
row = await cql.run_async(SimpleStatement(f'SELECT COUNT(*) FROM {ks}.test1', consistency_level=ConsistencyLevel.ALL))
assert row[0].count == 0