Files
scylladb/test/cluster/test_tablets2.py
Raphael S. Carvalho 3143134968 test: avoid split/major compaction deadlock in tablet split test
Run keyspace compaction asynchronously in
`test_tombstone_gc_correctness_during_tablet_split` and only await it
after `split_sstable_rewrite` is disabled.

The problem is that `keyspace_compaction()` starts with a flush, and that
flush can take around five seconds. During that window the split
compaction is stopped before major compaction is retried. The stop aborts
the in-flight major compaction attempt, then the split proceeds far enough
to enter the `split_sstable_rewrite` injection point.

At that point the test used to wait synchronously for major compaction to
finish, but major compaction cannot finish yet: when it retries, it needs
the same semaphore that is still effectively tied up behind the blocked
split rewrite. So the test waits for major compaction, while the split
waits for the injection to be released, and the code that would release
that injection never runs.

Starting major compaction as a task breaks that cycle. The test can first
disable `split_sstable_rewrite`, let the split get out of the way, and
only then wait for major compaction to complete.

Fixes https://scylladb.atlassian.net/browse/SCYLLADB-827.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes scylladb/scylladb#29066
2026-03-19 11:12:21 +02:00

2301 lines
107 KiB
Python

#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from typing import Any
from cassandra.query import ConsistencyLevel, SimpleStatement
from cassandra.policies import FallthroughRetryPolicy
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace, get_topology_version
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
import pytest
import asyncio
import logging
import time
import random
import os
import glob
from collections import defaultdict
from collections.abc import Iterable
from contextlib import asynccontextmanager
import itertools
import re
logger = logging.getLogger(__name__)
async def inject_error_one_shot_on(manager, error_name, servers):
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def inject_error_on(manager, error_name, servers):
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
await asyncio.gather(*errs)
async def disable_injection_on(manager, error_name, servers):
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
await asyncio.gather(*errs)
async def safe_server_stop_gracefully(manager, server_id, timeout: float = 60, reconnect: bool = False):
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
manager.driver_close()
await manager.server_stop_gracefully(server_id, timeout)
cql = None
if reconnect:
cql = await reconnect_driver(manager)
return cql
async def safe_rolling_restart(manager, servers, with_down):
# https://github.com/scylladb/python-driver/issues/230 is not fixed yet, so for sake of CI stability,
# driver must be reconnected after rolling restart of servers.
await manager.rolling_restart(servers, with_down)
cql = await reconnect_driver(manager)
return cql
async def wait_for_valid_load_stats(cql, table_id, timeout=120):
started = time.time()
# Wait until the given table has no missing tablet sizes
while True:
missing_cnt = 0
found_cnt = 0
for r in await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table_id};"):
found_cnt += 1
if len(r.missing_replicas) > 0:
missing_cnt += 1
if missing_cnt == 0 and found_cnt > 0:
break
assert time.time() - started < timeout, "Timed out while waiting for valid load_stats"
await asyncio.sleep(0.2)
@pytest.mark.asyncio
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
"""Test that you can create a table and insert and query data"""
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_proxy=trace',
'--logger-log-level', 'cql_server=trace',
'--logger-log-level', 'query_processor=trace',
'--logger-log-level', 'gossip=trace',
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'raft_topology=trace',
'--logger-log-level', 'messaging_service=trace',
'--logger-log-level', 'rpc=trace',
]
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
s0 = servers[0].server_id
not_s0 = servers[1:]
# s0 should miss schema and tablet changes
cql = await safe_server_stop_gracefully(manager, s0, reconnect=True)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 100}") as ks:
# force s0 to catch up later from the snapshot and not the raft log
await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0)
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(10)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 1);") for k in keys])
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(list(rows)) == len(keys)
for r in rows:
assert r.c == 1
manager.driver_close()
await manager.server_start(s0, wait_others=2)
await manager.driver_connect(server=servers[0])
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
# Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as test_dummy:
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
for k in keys])
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == 2
conn_logger = logging.getLogger("conn_messages")
conn_logger.setLevel(logging.DEBUG)
try:
# Check that after rolling restart the tablet metadata is still there
await manager.rolling_restart(servers)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
for k in keys])
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == 3
finally:
conn_logger.setLevel(logging.INFO)
@pytest.mark.asyncio
async def test_scans(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = await manager.servers_add(3)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
rows = await cql.run_async(f"SELECT count(*) FROM {ks}.test;")
assert rows[0].count == len(keys)
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
@pytest.mark.asyncio
async def test_table_drop_with_auto_snapshot(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = { 'auto_snapshot': True }
servers = await manager.servers_add(3, config = cfg)
cql = manager.get_cql()
# Increases the chance of tablet migration concurrent with schema change
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
for i in range(3):
await cql.run_async("DROP KEYSPACE IF EXISTS test;")
await cql.run_async("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8 };")
await cql.run_async("CREATE TABLE IF NOT EXISTS test.tbl_sample_kv (id int, value text, PRIMARY KEY (id));")
await cql.run_async("INSERT INTO test.tbl_sample_kv (id, value) VALUES (1, 'ala');")
await cql.run_async("DROP KEYSPACE test;")
@pytest.mark.asyncio
async def test_topology_changes(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = await manager.servers_add(3)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 32}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
expected_rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
async def check():
logger.info("Checking table")
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert rows == expected_rows
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
logger.info("Adding new server")
await manager.server_add()
await check()
logger.info("Adding new server")
await manager.server_add()
await check()
time.sleep(5) # Give load balancer some time to do work
await check()
await manager.decommission_node(servers[0].server_id)
await check()
async def get_two_servers_to_move_tablet(manager: ManagerClient):
"""
The first server in servers list is source node to move the tablet from. The second server is the dest node.
"""
logger.info("Bootstrapping cluster")
cmdline = ['--enable-file-stream', 'false']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add(cmdline=cmdline))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
logger.info(f'{replica=}')
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
# Make sure servers[0] is the source node to move the tablet from
if replica[0] != s0_host_id:
servers.reverse()
s0_host_id, s1_host_id = s1_host_id, s0_host_id
dst_shard = 0
return (servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_streaming_rx_error_no_failed_message_with_fail_stream_plan(manager: ManagerClient):
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=30))
await s1_log.wait_for('stream_manager: Failed stream_session for stream_plan', from_mark=s1_mark)
s1_mark = await s1_log.mark()
await manager.api.disable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error")
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# Sanity test
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
await cql.run_async(f"TRUNCATE {ks}.test")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
# Verify that there is no data resurrection
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
# Verify that moving the tablet back works
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_streaming_rx_error_no_failed_message_no_fail_stream_plan_hang(manager: ManagerClient):
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=10))
try:
logger.info("Waiting for migration to finish")
await migration_task
assert False # The move tablet is not supposed to finish
except TimeoutError:
logger.info("Migration timeout as expected")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=trace',
'--logger-log-level', 'raft_topology=trace',
'--enable-file-stream', 'false',
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add(cmdline=cmdline))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
# Wait for the replica-side writer of streaming to reach a place where it already
# received writes from the leaving replica but haven't applied them yet.
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
# The place we block the writer in should not hold to erm or topology_guard because that will block the migration
# below and prevent test from proceeding.
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
s1_mark = await s1_log.mark()
# Should cause streaming to fail and be retried while leaving behind the replica-side writer.
await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr)
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
# Sanity test
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
await cql.run_async(f"TRUNCATE {ks}.test")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
# Release abandoned streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
# Verify that there is no data resurrection
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
# Verify that moving the tablet back works
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 0
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_table_dropped_during_streaming(manager: ManagerClient):
"""
Verifies that load balancing recovers when table is dropped during streaming phase of tablet migration.
Recovering means that state machine is not stuck and later migrations can proceed.
"""
logger.info("Bootstrapping cluster")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add())
logger.info("Populating tables")
key = 7 # Whatever
value = 3 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, {value})")
await cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({key}, {value})")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
rows = await cql.run_async(f"SELECT pk from {ks}.test2")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
logger.info("Starting tablet migration")
s1_host_id = await manager.get_host_id(servers[1].server_id)
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token))
# Wait for the replica-side writer of streaming to reach a place where it already
# received writes from the leaving replica but haven't applied them yet.
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
# We want to drop the table while streaming is deep in the process, where it will attempt to apply writes
# to the dropped table.
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
# Streaming blocks table drop, so we can't wait here.
drop_task = cql.run_async(f"DROP TABLE {ks}.test")
# Release streaming as late as possible to increase probability of drop causing problems.
await s1_log.wait_for('Dropping', from_mark=s1_mark)
# Unblock streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await drop_task
logger.info("Waiting for migration to finish")
try:
await migration_task
except HTTPError as e:
assert 'Tablet map not found' in e.message
logger.info("Verifying that moving the other tablet works")
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
assert replica[0] == s0_host_id
await manager.api.move_tablet(servers[0].ip_addr, ks, "test2", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Verifying tablet replica")
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
assert replica == (s1_host_id, 0)
@pytest.mark.asyncio
async def test_tablet_cleanup(manager: ManagerClient):
cmdline = ['--smp=2', '--commitlog-sync=batch']
logger.info("Start first node")
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
logger.info("Populate table")
cql = manager.get_cql()
n_tablets = 32
n_partitions = 1000
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(1000)])
logger.info("Start second node")
servers.append(await manager.server_add())
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Read system.tablets")
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablet_replicas) == n_tablets
# Randomly select half of all tablets.
sample = random.sample(tablet_replicas, n_tablets // 2)
moved_tokens = [x.last_token for x in sample]
moved_src = [x.replicas[0] for x in sample]
moved_dst = [(s1_host_id, random.choice([0, 1])) for _ in sample]
# Migrate the selected tablets to second node.
logger.info("Migrate half of all tablets to second node")
for t, s, d in zip(moved_tokens, moved_src, moved_dst):
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
# Sanity check. All data we inserted should be still there.
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
# Wipe data on second node.
logger.info("Wipe data on second node")
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
await manager.server_wipe_sstables(servers[1].server_id, ks, "test")
await manager.server_start(servers[1].server_id)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
partitions_after_loss = (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
assert partitions_after_loss < n_partitions
# Migrate all tablets back to their original position.
# Check that this doesn't resurrect cleaned data.
logger.info("Migrate the migrated tablets back")
for t, s, d in zip(moved_tokens, moved_dst, moved_src):
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
# Kill and restart first node.
# Check that this doesn't resurrect cleaned data.
logger.info("Brutally restart first node")
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
# Bonus: check that commitlog_cleanups doesn't have any garbage after restart.
assert 0 == (await cql.run_async("SELECT COUNT(*) FROM system.commitlog_cleanups", host=hosts[0]))[0].count
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_cleanup_failure(manager: ManagerClient):
cmdline = ['--smp=1']
servers = [await manager.server_add(cmdline=cmdline)]
# Disable load balancing, so that after the move_tablet API the load balancer does
# not attempt to migrate the tablet back to the first node.
await manager.disable_tablet_balancing()
cql = manager.get_cql()
n_tablets = 1
n_partitions = 1000
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
await inject_error_one_shot_on(manager, "tablet_cleanup_failure", servers)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
servers.append(await manager.server_add())
tablet_token = 0
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
logger.info("Waiting for injected cleanup failure...")
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
logger.info("Waiting for cleanup success on retry...")
await s0_log.wait_for(f'Cleaned up tablet .* of table {ks}.test successfully.', from_mark=s0_mark)
logger.info("Waiting for cleanup success on retry...")
await s0_log.wait_for('updating topology state: Finished tablet migration', from_mark=s0_mark)
logger.info("Waiting for migration task...")
await migration_task
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
node_workdir = await manager.server_get_workdir(servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
ssts = glob.glob(os.path.join(table_dir, "*-Data.db"))
logger.info("Guarantee source node of migration left no sstables undeleted")
assert len(ssts) == 0
@pytest.mark.asyncio
async def test_tablet_resharding(manager: ManagerClient):
cmdline = ['--smp=3']
config = {'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(1, cmdline=cmdline)
server = servers[0]
logger.info("Populate table")
cql = manager.get_cql()
n_tablets = 32
n_partitions = 1000
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
await manager.server_stop_gracefully(server.server_id, timeout=120)
await manager.server_update_cmdline(server.server_id, ['--smp=2'])
await manager.server_start(
server.server_id,
expected_error="Detected a tablet with invalid replica shard, reducing shard count with tablet-enabled tables is not yet supported. Replace the node instead.")
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_split(manager: ManagerClient, injection_error: str):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--target-tablet-size-in-bytes', '1024',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# enough to trigger multiple splits with max size of 1024 bytes.
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
# Increases the chance of tablet migration concurrent with split
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
# Now there's a split and migration need, so they'll potentially run concurrently.
await manager.enable_tablet_balancing()
await check()
time.sleep(5) # Give load balancer some time to do work
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await check()
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
await manager.api.message_injection(servers[0].ip_addr, injection_error)
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
await compaction_task
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_correctness_of_tablet_split_finalization_after_restart(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--target-tablet-size-in-bytes', '1024',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
servers.append(await manager.server_add(config={
'error_injections_at_startup': ['delay_split_compaction']
}, cmdline=cmdline))
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH compaction = {{'class': 'NullCompactionStrategy'}};")
# enough to trigger multiple splits with max size of 1024 bytes.
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await check()
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 2
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
await manager.enable_tablet_balancing()
await s1_log.wait_for('Finalizing resize decision for table', from_mark=s1_mark)
# Delays refresh of tablet stats, so balancer works with whichever it got last.
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
time.sleep(1)
await manager.disable_tablet_balancing()
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
await manager.server_start(servers[1].server_id)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
await manager.enable_tablet_balancing()
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 2
await check()
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, injection_error):
logger.info("Bootstrapping cluster")
cmdline = []
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check():
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await check()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
s1_host_id = await manager.get_host_id(servers[1].server_id)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
logger.info("Started major compaction")
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
t = tablet_replicas[0]
logger.info("Migrating tablet")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
await manager.api.message_injection(servers[0].ip_addr, injection_error)
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
await compaction_task
if injection_error == "major_compaction_wait":
logger.info("Check that major was successfully aborted on migration")
await s1_log.wait_for(f"Compaction for {ks}/test was stopped due to: tablet cleanup", from_mark=s1_mark)
await check()
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_concurrent_table_drop_and_major(manager: ManagerClient):
logger.info("Bootstrapping cluster")
injection_error = "major_compaction_wait"
cmdline = ['--logger-log-level', 'compaction_manager=debug',]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
logger.info("Started major compaction")
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
logger.info("Dropping table")
await cql.run_async(f"DROP TABLE {ks}.test")
await manager.api.message_injection(servers[0].ip_addr, injection_error)
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
await compaction_task
if injection_error == "major_compaction_wait":
logger.info("Check that major was successfully aborted on migration")
await s1_log.wait_for(f"ongoing compactions for table {ks}.test .* due to table removal", from_mark=s1_mark)
async def assert_tablet_count_metric_value_for_shards(manager: ManagerClient, server: ServerInfo, expected_count_per_shard: list[int]):
tablet_count_metric_name = "scylla_tablets_count"
metrics = await manager.metrics.query(server.ip_addr)
for shard_id in range(0, len(expected_count_per_shard)):
expected_tablet_count = expected_count_per_shard[shard_id]
tablet_count = metrics.get(tablet_count_metric_name, {'shard':str(shard_id)})
assert int(tablet_count) == expected_tablet_count
async def get_tablet_tokens_from_host_on_shard(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, shard: int) -> list[int]:
host = await manager.get_host_id(server.server_id)
table_tablets = await get_all_tablet_replicas(manager, server, keyspace_name, table_name)
tokens = []
for tablet_replica in table_tablets:
for host_id, shard_id in tablet_replica.replicas:
if host_id == host and shard_id == shard:
tokens.append(tablet_replica.last_token)
return tokens
async def get_tablet_count_per_shard_for_host(manager: ManagerClient, server: ServerInfo, full_tables: dict[str, list[str]], shards_count: int = 2) -> list[int]:
dict_result = await get_tablet_count_per_shard_for_hosts(manager, [server], full_tables, shards_count)
return dict_result[server.server_id]
async def get_tablet_count_per_shard_for_hosts(manager: ManagerClient, servers: Iterable[ServerInfo], full_tables: dict[str, list[str]], shards_per_node: int = 2) -> dict[ServerNum, list[int]]:
result = dict[ServerNum, list[int]]()
hosts = dict[HostID, ServerNum]()
server1 = None
for server in servers:
if not server1:
server1 = server
result[server.server_id] = [0] * shards_per_node
hosts[await manager.get_host_id(server.server_id)] = server.server_id
for keyspace, tables in full_tables.items():
for table in tables:
table_tablets = await get_all_tablet_replicas(manager, server1, keyspace, table)
for tablet_replica in table_tablets:
for host_id, shard_id in tablet_replica.replicas:
if host_id in hosts:
result[hosts[host_id]][shard_id] += 1
return result
def get_shard_that_has_tablets(tablet_count_per_shard: list[int]) -> int:
for shard_id in range(0, len(tablet_count_per_shard)):
if tablet_count_per_shard[shard_id] > 0:
return shard_id
return -1
@pytest.mark.asyncio
async def test_tablet_count_metric_per_shard(manager: ManagerClient):
# Given two running servers
shards_count = 4
cmdline = ['--smp=4']
servers = await manager.servers_add(2, cmdline=cmdline)
# And given disabled load balancing
await manager.disable_tablet_balancing()
# When two tables are created
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
await cql.run_async(f"CREATE TABLE {ks}.mytable2 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
# Then tablet count metric for each shard depicts the actual state
tables = { ks: ["mytable1", "mytable2"] }
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
# When third table is created
await cql.run_async(f"CREATE TABLE {ks}.mytable3 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
# Then tablet count metric for each shard depicts the actual state
tables = { ks: ["mytable1", "mytable2", "mytable3"] }
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
# When one of tables is dropped
await cql.run_async(f"DROP TABLE {ks}.mytable2;")
# Then tablet count metric for each shard depicts the actual state
tables = { ks: ["mytable1", "mytable3"] }
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
# And when moving tablets from one shard of src_host to (dest_host, shard_3)
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_0)
if shard_id_to_move != -1:
src_server = servers[0]
dest_server = servers[1]
src_expected_count_per_shard = expected_count_per_shard_for_host_0
dest_expected_count_per_shard = expected_count_per_shard_for_host_1
else:
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_1)
src_server = servers[1]
dest_server = servers[0]
src_expected_count_per_shard = expected_count_per_shard_for_host_1
dest_expected_count_per_shard = expected_count_per_shard_for_host_0
tokens_on_shard_to_move = {
"mytable1" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable1", shard_id_to_move),
"mytable3" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable3", shard_id_to_move)
}
count_of_tokens_on_src_shard_to_move = len(tokens_on_shard_to_move["mytable1"]) + len(tokens_on_shard_to_move["mytable3"])
assert count_of_tokens_on_src_shard_to_move > 0
src_host_id = await manager.get_host_id(src_server.server_id)
dest_host_id = await manager.get_host_id(dest_server.server_id)
for table_name, tokens in tokens_on_shard_to_move.items():
for token in tokens:
await manager.api.move_tablet(node_ip=src_server.ip_addr, ks=ks, table=table_name, src_host=src_host_id, src_shard=shard_id_to_move, dst_host=dest_host_id, dst_shard=3, token=token)
# And when ensuring that local tablet metadata on the queried node reflects the finalized tablet movement
await read_barrier(manager.api, servers[0].ip_addr)
await read_barrier(manager.api, servers[1].ip_addr)
# Then tablet count metric is adjusted to depict that situation on src_host - all tablets from selected shard have been moved
src_expected_count_per_shard[shard_id_to_move] = 0
await assert_tablet_count_metric_value_for_shards(manager, src_server, src_expected_count_per_shard)
# And then tablet count metric is increased on dest_host - tablets have been moved to shard_3
dest_expected_count_per_shard[3] += count_of_tokens_on_src_shard_to_move
await assert_tablet_count_metric_value_for_shards(manager, dest_server, dest_expected_count_per_shard)
@pytest.mark.parametrize("primary_replica_only", [False, True])
async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_only):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--smp', '1',
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async def create_table(tablet_count : int) -> str:
# Creates multiple tablets in the same shard
ks_name = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" \
f" AND tablets = {{ 'initial': {tablet_count} }};")
await cql.run_async(f"CREATE TABLE {ks_name}.test (pk int PRIMARY KEY, c int);")
return ks_name
ks = await create_table(5) # 5 is rounded up to next power-of-two
# Populate tablets
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check(ks_name: str):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await check(ks)
node_workdir = await manager.server_get_workdir(servers[0].server_id)
ks2 = await create_table(16)
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
dst_table_dir = glob.glob(os.path.join(node_workdir, "data", ks2, "test-*"))[0]
logger.info(f"Dst table dir: {dst_table_dir}")
def move_sstables_to_upload(table_dir: str, dst_table_dir: str):
logger.info("Moving sstables to upload dir of destination table")
table_upload_dir = os.path.join(dst_table_dir, "upload")
moved_files = []
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
moved_files.append(dst_path)
return moved_files
moved_sstable_files = move_sstables_to_upload(table_dir, dst_table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.disable_tablet_balancing()
logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))
# Trigger concurrent migration and load-and-stream
await manager.enable_tablet_balancing()
await manager.api.load_new_sstables(servers[0].ip_addr, ks2, "test", primary_replica_only)
time.sleep(1)
await check(ks2)
logger.info("Checking that streamed SSTables are deleted from upload directory")
remaining_files = []
for file_path in moved_sstable_files:
if os.path.exists(file_path):
logger.info(f"SSTable file still exists: {file_path}")
remaining_files.append(file_path)
if remaining_files:
raise AssertionError(f"SSTable files were not deleted after load_and_stream: {remaining_files}")
logger.info("All SSTable files successfully deleted after streaming")
await asyncio.gather(*[cql.run_async(f"drop keyspace {i}") for i in [ks, ks2]])
@pytest.mark.asyncio
async def test_storage_service_api_uneven_ownership_keyspace_and_table_params_used(manager: ManagerClient):
# Given two running servers
shards_count = 4
cmdline = ['--smp=4']
servers = await manager.servers_add(2, cmdline=cmdline)
# When table is created with initial tablets set to 1
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
# And when ownership for this table is queried
actual_ownerships = await manager.api.get_ownership(servers[0].ip_addr, ks, "mytable1")
# Then ensure that returned ownerships is 0.0 and 1.0 (which node gets 0.0 and 1.0 is unspecified)
expected_ips = {servers[0].ip_addr, servers[1].ip_addr}
expected_ownerships = [0.0, 1.0]
delta = 0.0001
already_verified = set()
sorted_actual_ownerships = sorted(actual_ownerships, key=lambda e: e["value"])
assert len(sorted_actual_ownerships) == len(expected_ownerships)
for i in range(0, len(sorted_actual_ownerships)):
entry = sorted_actual_ownerships[i]
actual_ip = entry["key"]
actual_ownership = float(entry["value"])
assert actual_ip in expected_ips
assert actual_ip not in already_verified
assert actual_ownership == pytest.approx(expected_ownerships[i], abs=delta)
already_verified.add(actual_ip)
@pytest.mark.asyncio
async def test_tablet_storage_freeing(manager: ManagerClient):
logger.info("Start first node")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Create a table with two tablets and populate it with a moderate amount of data.")
n_tablets = 2
n_partitions = 1000
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v text) WITH compression = {{'sstable_compression': ''}};")
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, v) VALUES (?, ?);")
payload = "a"*10000
max_concurrency = 100
for batch in itertools.batched(range(n_partitions), max_concurrency):
await asyncio.gather(*[cql.run_async(insert_stmt, [k, payload]) for k in batch])
await manager.api.keyspace_flush(servers[0].ip_addr, ks)
logger.info("Start second node.")
servers.append(await manager.server_add())
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Check the table's disk usage on first node.")
size_before = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
assert size_before > n_partitions * len(payload)
logger.info("Read system.tablets.")
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablet_replicas) == n_tablets
logger.info("Migrate one of the two tablets from the first node to the second node.")
t = tablet_replicas[0]
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
logger.info("Verify that the table's disk usage on first node shrunk by about half.")
size_after = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
assert size_before * 0.33 < size_after < size_before * 0.66
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_schema_change_during_cleanup(manager: ManagerClient):
logger.info("Start first node")
servers = [await manager.server_add()]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
logger.info("Start second node.")
servers.append(await manager.server_add())
s1_host_id = await manager.get_host_id(servers[1].server_id)
await inject_error_on(manager, "delay_tablet_compaction_groups_cleanup", servers)
logger.info("Read system.tablets.")
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(tablet_replicas) == 1
logger.info("Migrating one tablet to another node.")
t = tablet_replicas[0]
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
logger.info("Waiting for log")
await s1_log.wait_for('Initiating tablet cleanup of', from_mark=s1_mark, timeout=120)
time.sleep(1)
await cql.run_async(f"ALTER TABLE {ks}.test WITH gc_grace_seconds = 0;")
await migration_task
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
'--target-tablet-size-in-bytes', '5000',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH gc_grace_seconds=0;")
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
keys = range(100)
logger.info("Generating sstable with shadowed data")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Generating another sstable with tombstones")
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
async def assert_empty_table():
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await assert_empty_table()
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count == 1
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
# Waits for tombstones to be expired.
time.sleep(1)
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
logger.info("Enable balancing so split will be emitted")
await manager.enable_tablet_balancing()
logger.info("Waits for split of sstable containing expired tombstones")
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
s1_mark = await s1_log.mark()
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
logger.info("Pause split of sstable containing deleted data")
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
s1_mark = await s1_log.mark()
logger.info("Force compaction of split sstable containing expired tombstone")
await manager.api.stop_compaction(servers[0].ip_addr, "SPLIT")
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
await manager.api.disable_injection(servers[0].ip_addr, "split_sstable_rewrite")
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
await compaction_task
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
assert tablet_count > 1
logger.info("Verify data is not resurrected")
await assert_empty_table()
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int, config: dict[str, Any] = None) -> dict[ServerNum, ServerInfo]:
logger.debug(f"Creating cluster: num_dcs={num_dcs} num_racks={num_racks} nodes_per_rack={nodes_per_rack}")
servers: dict[ServerNum, ServerInfo] = dict()
for dc in range(1, num_dcs + 1):
for rack in range(1, num_racks + 1):
rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
for s in rack_servers:
servers[s.server_id] = s
logger.debug(f"Created servers={list(servers.values())}")
return servers
class Context:
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
self.ks = ks
self.table = table
self.rf = rf
self.initial_tablets = initial_tablets
self.num_keys = num_keys
@asynccontextmanager
async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial_tablets: int = 64, num_keys: int = 0):
ks = ""
table = unique_name()
if not num_keys:
num_keys = initial_tablets * 4
logger.info(f"Creating table and populating data {ks}.{table}: rf={rf} initial_tablets={initial_tablets} num_keys={num_keys}")
cql = manager.get_cql()
try:
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
yield Context(ks, table, rf, initial_tablets, num_keys)
finally:
await cql.run_async(f"DROP KEYSPACE {ks}")
def get_expected_replicas_per_server(live_servers: Iterable[ServerInfo], dead_servers: Iterable[ServerInfo], initial_tablets: int, rf: int) -> dict[ServerNum, float]:
total_replicas = initial_tablets * rf
nodes_per_rack: defaultdict[str, set[ServerNum]] = defaultdict(set)
for s in live_servers:
nodes_per_rack[s.rack].add(s.server_id)
replicas_per_rack = total_replicas / len(nodes_per_rack.keys())
result: dict[ServerNum, float] = dict()
for rack_servers in nodes_per_rack.values():
replicas_per_node = replicas_per_rack / len(rack_servers)
for id in rack_servers:
result[id] = replicas_per_node
for s in dead_servers:
result[s.server_id] = 0
return result
def verify_replicas_per_server(desc: str, expected_replicas_per_server: dict[ServerNum, float], tablet_count: dict[ServerNum, list[int]], initial_tablets: int, rf: int, shards_per_node: int = 2):
logger.debug(f"{desc}: expected_replicas_per_server={expected_replicas_per_server} tablet_count={tablet_count}")
total = 0
for server_id, host_replicas in tablet_count.items():
expected_replicas_per_shard = expected_replicas_per_server[server_id] / shards_per_node
for i in host_replicas:
total += i
assert abs(i - expected_replicas_per_shard) <= 1
assert total == initial_tablets * rf
@pytest.mark.asyncio
async def test_decommission_rack_basic(manager: ManagerClient):
"""
Test decommissioning of all nodes in a rack
when there are enough remaining racks to satisfy
the replication factor.
"""
logger.info("Bootstrapping multi-rack cluster")
num_racks = 3
nodes_per_rack = 2
rf = num_racks - 1
# We need to disable this option to be able to create a keyspace. This can be ditched
# once we've implemented scylladb/scylladb#23426 and we can add new racks with the option enabled.
# Then we can create `rf` nodes, create the keyspace, and add another node.
config = {"rf_rack_valid_keyspaces": False}
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack, config)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.info("Verify tablet replicas distribution")
tables = {ctx.ks: [ctx.table]}
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
decommision_rack = f"rack{num_racks}"
logger.debug(f"Decommissioning rack={decommision_rack}")
live_servers: dict[ServerNum, ServerInfo] = dict()
dead_servers: dict[ServerNum, ServerInfo] = dict()
for id, s in all_servers.items():
if s.rack == decommision_rack:
logger.debug(f"Decommissioning server={s}")
await manager.decommission_node(s.server_id)
dead_servers[id] = s
else:
live_servers[id] = s
logger.info("Verify tablet replicas distribution")
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, live_servers.values(), tables)
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
@pytest.mark.asyncio
async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
"""
Test decommissioning a rack, after a rack with new nodes is added
"""
logger.info("Bootstrapping multi-rack cluster")
initial_num_racks = 3
num_racks = initial_num_racks + 1
nodes_per_rack = 2
rf = initial_num_racks
# We can't add a new rack if we create a keyspace.
# Once scylladb/scylladb#23426 has been implemented, this can be ditched.
config = {"rf_rack_valid_keyspaces": False}
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.debug("Temporarily disable tablet load balancing")
await manager.disable_tablet_balancing()
logger.info("Add a new rack")
new_rack = f"rack{num_racks}"
# copy initial_servers into all_servers, don't just assign it (by reference)
all_servers: list[ServerInfo] = list(initial_servers.values())
new_rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": "dc1", "rack": new_rack})
all_servers.extend(new_rack_servers)
logger.info("Verify tablet replicas distribution")
tables = {ctx.ks: [ctx.table]}
expected_replicas_per_server = get_expected_replicas_per_server(initial_servers.values(), new_rack_servers, ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
logger.debug("Reenable tablet load balancing")
await manager.enable_tablet_balancing()
live_servers: dict[ServerNum, ServerInfo] = dict()
dead_servers: dict[ServerNum, ServerInfo] = dict()
decommision_rack = f"rack{initial_num_racks}"
logger.debug(f"Decommissioning rack={decommision_rack}")
for s in all_servers:
if s.rack == decommision_rack:
logger.debug(f"Decommissioning server={s}")
await manager.decommission_node(s.server_id)
dead_servers[s.server_id] = s
else:
live_servers[s.server_id] = s
logger.info("Verify tablet replicas distribution")
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
@pytest.mark.asyncio
async def test_decommission_not_enough_racks(manager: ManagerClient):
"""
Test that decommissioning a rack fails if the number of rack is
insufficient to satisfy replication factor, even if the number of
nodes is sufficient.
Reproduces https://github.com/scylladb/scylladb/issues/19475
"""
logger.info("Bootstrapping multi-rack cluster")
num_racks = 3
nodes_per_rack = 2
rf = num_racks
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack)
async with create_and_populate_table(manager, rf=rf) as ctx:
logger.info("Verify tablet replicas distribution")
tables = {ctx.ks: [ctx.table]}
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
live_servers: dict[ServerNum, ServerInfo] = dict()
dead_servers: dict[ServerNum, ServerInfo] = dict()
decommision_rack = f"rack{num_racks}"
decommision_count = 0
for s in all_servers.values():
if s.rack == decommision_rack:
logger.debug(f"Decommissioning server={s}")
decommision_count += 1
expected_error = "Decommission failed" if decommision_count == nodes_per_rack else None
await manager.decommission_node(s.server_id, expected_error=expected_error)
if not expected_error:
dead_servers[s.server_id] = s
else:
live_servers[s.server_id] = s
else:
live_servers[s.server_id] = s
logger.info("Verify tablet replicas distribution")
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
cmdline = ['--smp=1']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
n_tablets = 1
n_partitions = 1000
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
await inject_error_one_shot_on(manager, "tablet_cleanup_failure_post_deletion", servers)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
servers.append(await manager.server_add())
tablet_token = 0
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
logger.info("Waiting for injected cleanup failure...")
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot")
# Reproduces assert failure when truncating table, either triggered by DROP TABLE or TRUNCATE.
# See: https://github.com/scylladb/scylladb/issues/18059
# It's achieved by migrating a tablet away that contains the highest replay position of a shard,
# so when drop/truncate happens, the highest replay position will be greater than all the data
# found in the table (includes data in memtable).
@pytest.mark.asyncio
@pytest.mark.parametrize("operation", ['DROP TABLE', 'TRUNCATE'])
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, operation):
cmdline = [ '--smp=2' ]
cfg = { 'auto_snapshot': True }
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
tablet_replicas_in_s0 = list[TabletReplicas]()
for replica in tablet_replicas:
if replica.replicas[0][1] == 0:
tablet_replicas_in_s0.append(replica)
assert len(tablet_replicas_in_s0) == 2
target_tablet = tablet_replicas_in_s0[0]
s0_host_id = await manager.get_host_id(servers[0].server_id)
logger.info("Migrating 1st tablet to shard 1")
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *(s0_host_id, 0), *(s0_host_id, 1), target_tablet.last_token)
await manager.api.enable_injection(servers[0].ip_addr, "truncate_disable_compaction_delay", one_shot=True)
logger.info(f"Running {operation} {ks}.test")
await cql.run_async(f"{operation} {ks}.test")
@pytest.mark.asyncio
@pytest.mark.nightly
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_truncate_during_topology_change(manager: ManagerClient):
"""Test truncate operation during topology change."""
# Start 3 node cluster
servers = await manager.servers_add(3, config = { 'enable_tablets': True }, auto_rack_dc="dc1")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (k int PRIMARY KEY, v int)")
logger.info("Populating table")
stmt = cql.prepare(f"INSERT INTO {ks}.test (k, v) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.QUORUM
await asyncio.gather(*(cql.run_async(stmt, [k, k]) for k in range(10000)))
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
async def truncate_table():
await asyncio.sleep(10)
logger.info("Executing truncate during bootstrap")
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
truncate_task = asyncio.create_task(truncate_table())
logger.info("Adding fourth node")
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True},
property_file=servers[0].property_file())
await truncate_task
# Wait for bootstrap completion
await wait_for_cql_and_get_hosts(cql, servers + [new_server], time.time() + 60)
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
assert rows[0].count == 0, "Table should be empty after truncation"
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
cmdline = ['--smp=2']
servers = [await manager.server_add(cmdline=cmdline)]
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
table = f"{ks}.test"
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
stop_compaction = False
async def background_compaction():
while stop_compaction == False:
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
compaction_task = asyncio.create_task(background_compaction())
for i in range(5):
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
stop_compaction = True
await compaction_task
async def force_minor_compaction():
for i in range(4):
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
await force_minor_compaction()
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
await force_minor_compaction()
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/24153
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--smp', '1', # single cpu is needed to prevent intra-node migration which interacts badly with injection splitting_mutation_writer_switch_wait.
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.disable_tablet_balancing()
initial_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
# force split on the test table
expected_tablet_count = 4
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
log = await manager.server_open_log(server.server_id)
log_mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
await manager.enable_tablet_balancing()
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
# needs to skip update of repaired_at on merge, since it stops split task.
await manager.api.enable_injection(server.ip_addr, "skip_update_repaired_at_for_merge", one_shot=False)
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
expected_tablet_count = 1
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
# wait for merge to complete
actual_tablet_count = 0
started = time.time()
while expected_tablet_count != actual_tablet_count:
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
await asyncio.sleep(.1)
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await asyncio.sleep(.1)
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
@pytest.mark.parametrize("primary_replica_only", [False, True])
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'table=debug',
]
servers = [await manager.server_add(config={
'tablet_load_stats_refresh_interval_in_seconds': 1
}, cmdline=cmdline)]
server = servers[0]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
keys = range(100)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
async def check(ks_name: str):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await check(ks)
node_workdir = await manager.server_get_workdir(servers[0].server_id)
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
logger.info(f"Table dir: {table_dir}")
def move_sstables_to_upload(table_dir: str):
logger.info("Moving sstables to upload dir")
table_upload_dir = os.path.join(table_dir, "upload")
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
logger.info(f"Moving sstable file {src_path} to {dst_path}")
os.rename(src_path, dst_path)
move_sstables_to_upload(table_dir)
await manager.server_start(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
assert len(rows) == 0
await manager.disable_tablet_balancing()
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
await manager.enable_tablet_balancing()
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
await load_and_stream_task
await check(ks)
async def test_update_load_stats_after_rebuild(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'raft_topology=debug',
]
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack2"},
])
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
cql = manager.get_cql()
async def get_tablet_sizes(table):
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
table_id = await manager.get_table_or_view_id(ks, 'test')
# Wait for the coordinator to refresh load_stats
while True:
rows = await get_tablet_sizes(table_id)
if len(rows) == 1 and len(rows[0].replicas) == 1:
replica_host = [str(u) for u in rows[0].replicas.keys()][0]
if s0_host_id == replica_host:
break
# Increase load_stats refresh so that it does not race with the
# topology coordinator updating load_stats during end_migration
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
# Wait for the current load_stats refresh interval to elapse
await asyncio.sleep(2)
# Increase RF to trigger a tablet rebuild
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
# Check that the new tablet size was added to the host in rack3
rows = await get_tablet_sizes(table_id)
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
assert len(replica_hosts) == 2
assert s0_host_id in replica_hosts and s1_host_id in replica_hosts, "Tablet size was added to load_stats after rebuild"
async def test_update_load_stats_after_migration(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'raft_topology=debug',
]
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack1"},
])
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
cql = manager.get_cql()
async def get_tablet_sizes(table):
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
# Disable load balancing to avoid the balancer moving the tablet from a node with less to a node with more
# available disk space. Otherwise, the move_tablet API can fail (if the tablet is already in transisiton) or
# be a no-op (in case the tablet has already been migrated)
await manager.disable_tablet_balancing()
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
table_id = await manager.get_table_or_view_id(ks, 'test')
# Wait for the coordinator to refresh load_stats
while True:
rows = await get_tablet_sizes(table_id)
if len(rows) == 1:
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
if s0_host_id in replica_hosts or s1_host_id in replica_hosts:
break
# Increase load_stats refresh so that it does not race with the
# topology coordinator updating load_stats during end_migration
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
# Wait for the current load_stats refresh interval to elapse
await asyncio.sleep(2)
# Migrate a tablet between nodes in rack1
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
leaving_replica = replicas[0].replicas[0]
pending_replica = (s0_host_id if leaving_replica[0] == s1_host_id else s1_host_id, 0)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *leaving_replica, *pending_replica, 0)
# Check that the new tablet size was moved from leaving to pending host
rows = await get_tablet_sizes(table_id)
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
logger.info(f'replica_hosts: {replica_hosts}')
assert leaving_replica[0] not in replica_hosts, "Leaving replica tablet size is not in load_stats any more"
assert pending_replica[0] in replica_hosts, "Pending replica tablet size is in load_stats"
@pytest.mark.asyncio
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
async def test_crash_on_missing_table_from_load_stats(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'raft_topology=debug',
'--smp', '2',
]
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, property_file=[
{"dc": "dc1", "rack": "rack1"},
{"dc": "dc1", "rack": "rack1"},
])
cql = manager.get_cql()
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
# Make sure load_stats has been refreshed and that the coordinator has cached load_stats
table_id = await manager.get_table_or_view_id(ks, 'test')
await wait_for_valid_load_stats(cql, table_id)
# Kill the non-coordinator node
await manager.server_stop_gracefully(servers[1].server_id)
# Drop the table; this leaves the table size in the cached load_stats on the coordinator
await cql.run_async(f"DROP TABLE {ks}.test")
# Wait for the next load_stats refresh
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
await s0_log.wait_for('raft topology: Refreshed table load stats for all DC', from_mark=s0_mark)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablets_barrier_waits_for_replica_erms(manager: ManagerClient):
"""
The test verifies that tablet replicas hold ERMS while processing requests,
and that the tablet's global barrier waits for all replicas to acknowledge it.
To do this, the test starts a read request and makes it hang on the
`replica_query_wait` injection, then initiates tablet migration. Finally, it
checks that the tablet's global barrier waits for the replica handling that
request to complete.
"""
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--range-request-timeout-in-ms', '1000', # shorten time coordinator abandon the request, releasing erm
'--read-request-timeout-in-ms', '1000',
'--abort-on-internal-error', 'true',
]
servers = [await manager.server_add(cmdline=cmdline)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
servers.append(await manager.server_add(cmdline=cmdline))
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
await manager.api.enable_injection(servers[0].ip_addr, "replica_query_wait", one_shot=False, parameters={"table": "test"})
replica_query = cql.run_async(f"SELECT * from {ks}.test where pk={key} BYPASS CACHE", host=hosts[1])
await s0_log.wait_for('replica_query_wait: waiting', from_mark=s0_mark)
version_before_move = await get_topology_version(cql, hosts[0])
s0_mark = await s0_log.mark()
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
# migration should proceed once replica query times out on coordinator, causing it to be abandoned
new_version = version_before_move + 1
await s0_log.wait_for(re.escape(
f"Got raft_topology_cmd::barrier_and_drain, version {new_version}, "
f"current version {new_version}, "
f"stale versions (version: use_count): {{{version_before_move}: 1}}"),
from_mark=s0_mark)
await manager.api.message_injection(servers[0].ip_addr, "replica_query_wait")
await manager.api.disable_injection(servers[0].ip_addr, "replica_query_wait")
# swallow exception of timed out read.
try:
await replica_query
except:
pass
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
@pytest.mark.asyncio
@pytest.mark.parametrize("repair_before_split", [False, True])
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
]
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
cql = manager.get_cql()
await manager.disable_tablet_balancing()
initial_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
for server in servers:
await manager.api.flush_keyspace(server.ip_addr, ks)
s0_log = await manager.server_open_log(servers[0].server_id)
s0_mark = await s0_log.mark()
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
expected_tablet_count = 4 # expected tablet count post split
async def run_split_prepare():
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "test", "what": "throw"})
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
token = 'all'
await manager.enable_tablet_balancing()
if repair_before_split:
await generate_repair_work()
for server in servers:
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
await run_split_prepare()
for server in servers:
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
await repair_task
else:
await run_split_prepare()
await generate_repair_work()
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
await manager.server_stop(servers[0].server_id)
await manager.server_start(servers[0].server_id)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await manager.servers_see_each_other(servers)
@pytest.mark.asyncio
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
async def test_split_and_intranode_synchronization(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--logger-log-level', 'debug_error_injection=debug',
'--logger-log-level', 'compaction=debug',
'--smp', '2',
]
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
server = servers[0]
cql = manager.get_cql()
await manager.disable_tablet_balancing()
initial_tablets = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
host_id = await manager.get_host_id(server.server_id)
src_shard = replica[1]
# if tablet replica is at shard 0, move it to shard 1.
if src_shard == 0:
dst_shard = 1
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
await manager.enable_tablet_balancing()
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
# force split on the test table
expected_tablet_count = initial_tablets * 2
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
# Check that shard 0 ACKed split.
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
# Move tablet replica back to shard 0, where split was already ACKed.
src_shard = 1
dst_shard = 0
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
await migration_task
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
async def finished_splitting():
tablet_count = await get_tablet_count(manager, server, ks, 'test')
return tablet_count >= expected_tablet_count or None
# Give enough time for split to happen in debug mode
await wait_for(finished_splitting, time.time() + 120)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_split_stopped_on_shutdown(manager: ManagerClient):
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True,
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'debug_error_injection=debug',
'--smp', '1',
]
server = await manager.server_add(cmdline=cmdline, config=cfg)
logger.info(f'server_id = {server.server_id}')
cql = manager.get_cql()
await manager.disable_tablet_balancing()
initial_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
# insert data
pks = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
# flush the table
await manager.api.flush_keyspace(server.ip_addr, ks)
# force split on the test table
expected_tablet_count = 4
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
log = await manager.server_open_log(server.server_id)
log_mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
await manager.api.enable_injection(server.ip_addr, "storage_service_drain_wait", one_shot=True)
await manager.enable_tablet_balancing()
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
log_mark = await log.mark()
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
await log.wait_for('Stopping.*ongoing compactions')
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)
await log.wait_for('Failed to complete splitting of table', from_mark=log_mark)
await manager.api.message_injection(server.ip_addr, "storage_service_drain_wait")
await shutdown_task
errors = await log.grep_for_errors(from_mark=log_mark)
assert errors == []
await manager.server_start(server.server_id)
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
await log.wait_for('Detected tablet split for table', from_mark=log_mark)
tablet_count = await get_tablet_count(manager, server, ks, 'test')
assert tablet_count >= expected_tablet_count