Files
scylladb/test/cluster/test_tablets.py
Dimitrios Symonidis 80b74d7df2 tablet options: Add max_tablet_count tablet option to enforce tablet count upper bounds
Introduced a new max_tablet_count tablet option that caps the maximum number of tablets a table can have. This feature is designed primarily for backup and restore workflows.
During backup, when load balancing is disabled for snapshot consistency, the current tablet count is recorded in the backup manifest.
During restore, max_tablet_count is set to this recorded value, ensuring the restored table's tablet count never exceeds the original snapshot's tablet distribution.
This guarantee enables efficient file-based SSTable streaming during restore, as each SSTable remains fully contained within a single tablet boundary.

Closes scylladb/scylladb#28450
2026-03-03 11:19:24 +03:00

1704 lines
89 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import uuid
from cassandra.protocol import ConfigurationException, InvalidRequest, SyntaxException
from cassandra.query import SimpleStatement, ConsistencyLevel
from test.cluster.tasks.task_manager_client import TaskManagerClient
from test.cluster.test_tablets2 import safe_rolling_restart
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.repair import create_table_insert_data_for_repair
from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
from test.pylib.util import unique_name, wait_for, wait_for_first_completed
from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, \
get_topology_coordinator, parse_replication_options, get_replication, get_replica_count, find_server_by_host_id
from contextlib import nullcontext as does_not_raise
import time
import pytest
import logging
import asyncio
import re
import requests
import random
import os
import glob
import shutil
logger = logging.getLogger(__name__)
# The glob below is designed to match the version-generation-format-component.extension format, e.g.
# da-3gqu_1hke_4919c2kfgur9y2bm77-bti-Data.db
# me-1-big-TOC.txt
sstable_filename_glob = "??-*-???-*.*"
@pytest.mark.asyncio
async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
# This test verifies that Scylla rejects creating a table if there are too few token-owning nodes.
# That means that a keyspace must already be in place, but that's impossible with RF-rack-valid
# keyspaces being enforced. We could go over this constraint by creating 3 nodes and then
# decommissioning one of them before attempting to create a table, but if we decide to constraint
# decommission later on, this test will have to be modified again. Let's simply disable the option.
cfg = cfg | {'rf_rack_valid_keyspaces': False}
servers = await manager.servers_add(2, config=cfg)
cql = manager.get_cql()
res = await cql.run_async("SELECT data_center FROM system.local")
this_dc = res[0].data_center
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 3}}") as ks:
with pytest.raises(ConfigurationException, match=f"Datacenter {this_dc} doesn't have enough token-owning nodes"):
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
@pytest.mark.asyncio
async def test_tablet_scaling_option_is_respected(manager: ManagerClient):
# 32 is high enough to ensure we demand more tablets than the default choice.
cfg = {'tablets_mode_for_new_keyspaces': 'enabled', 'tablets_initial_scale_factor': 32}
servers = await manager.servers_add(1, config=cfg, cmdline=['--smp', '2'])
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
tablets = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
assert len(tablets) == 64
@pytest.mark.asyncio
async def test_tablet_cannot_decommision_below_replication_factor(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(4, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"},
{"dc": "dc1", "rack": "r3"}
])
logger.info("Creating table")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
logger.info("Decommission some node")
await manager.decommission_node(servers[0].server_id)
with pytest.raises(HTTPError, match="Decommission failed"):
logger.info("Decommission another node")
await manager.decommission_node(servers[1].server_id)
# Three nodes should still provide CL=3
logger.info("Checking table")
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.THREE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
async def test_reshape_with_tablets(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
server = (await manager.servers_add(1, config=cfg, cmdline=['--smp', '1', '--logger-log-level', 'compaction=debug']))[0]
logger.info("Creating table")
cql = manager.get_cql()
number_of_tablets = 2
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': {number_of_tablets} }}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Disabling autocompaction for the table")
await manager.api.disable_autocompaction(server.ip_addr, ks, "test")
logger.info("Populating table")
loop_count = 32
for _ in range(loop_count):
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(64)])
await manager.api.keyspace_flush(server.ip_addr, ks, "test")
# After populating the table, expect loop_count number of sstables per tablet
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets * loop_count
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
# Restart the server and verify that the sstables have been reshaped down to one sstable per tablet
logger.info("Restart the server")
await manager.server_restart(server.server_id)
await reconnect_driver(manager)
await log.wait_for(f"Reshape {ks}.test .* Reshaped 32 sstables to .*", from_mark=mark, timeout=30)
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets
@pytest.mark.parametrize("direction", ["up", "down", "none"])
@pytest.mark.asyncio
async def test_tablet_rf_change(manager: ManagerClient, direction):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
res = await cql.run_async("SELECT data_center FROM system.local")
this_dc = res[0].data_center
if direction == 'up':
rf_from_opt = "['rack1']"
rf_to_opt = "['rack1', 'rack2']"
rf_from = 1
rf_to = 2
if direction == 'down':
rf_from_opt = "['rack1', 'rack2']"
rf_to_opt = "['rack1']"
rf_from = 2
rf_to = 1
if direction == 'none':
rf_from_opt = "['rack1']"
rf_to_opt = "['rack1']"
rf_from = 1
rf_to = 1
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from_opt}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.test_mv AS SELECT pk FROM {ks}.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
async def check_allocated_replica(expected: int):
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], ks, 'test_mv', is_view=True)
for r in replicas:
logger.info(f"{r.replicas}")
assert len(r.replicas) == expected
logger.info(f"Checking {rf_from} allocated replicas")
await check_allocated_replica(rf_from)
logger.info(f"Altering RF {rf_from} -> {rf_to}")
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_to_opt}}}")
logger.info(f"Checking {rf_to} re-allocated replicas")
await check_allocated_replica(rf_to)
if direction != 'up':
# Don't check fragments for up/none changes, scylla crashes when checking nodes
# that (validly) miss the replica, see scylladb/scylladb#18786
return
fragments = { pk: set() for pk in random.sample(range(128), 17) }
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
await read_barrier(manager.api, s.ip_addr) # scylladb/scylladb#18199
for k in fragments:
res = await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
for fragment in res:
if fragment.partition_region == 0: # partition start
fragments[k].add(host_id)
logger.info("Checking fragments")
for k in fragments:
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
@pytest.mark.asyncio
async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClient):
"""Check that MUTATION_FRAGMENTS() queries handle the case when a partition
not owned by the node is attempted to be read."""
cfg = {'enable_user_defined_functions': False,
'tablets_mode_for_new_keyspaces': 'enabled' }
servers = await manager.servers_add(3, config=cfg, property_file=[
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r1"},
{"dc": "dc1", "rack": "r2"}
])
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
for k in range(4):
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
# The test checks that describe_ring and range_to_address_map API return
# information that's consistent with system.tablets contents
@pytest.mark.parametrize("endpoint", ["describe_ring", "range_to_endpoint", "tokens_endpoint"])
@pytest.mark.asyncio
async def test_tablets_api_consistency(manager: ManagerClient, endpoint):
servers = []
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack1'})
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack2'})
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack3'})
await manager.disable_tablet_balancing()
hosts = { await manager.get_host_id(s.server_id): s.ip_addr for s in servers }
cql = manager.get_cql()
expected_tablet_count = 4
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH TABLETS = {{'min_tablet_count': {expected_tablet_count}, 'max_tablet_count': {expected_tablet_count}}};")
def columnar(lst):
return f"\n {'\n '.join([f'{x}' for x in lst])}"
replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
logger.info(f'system.tablets: {columnar(replicas)}')
# replicas contains a list of objects, one per tablet, so in the end len(replicas) returns the current number of tablets.
# with load balancing disabled and min_tablet_count being equal to max_tablet_count,
# we expect the same number of tablets as the min/max_tablet_count value
assert len(replicas) == expected_tablet_count, f"Expected {expected_tablet_count} tablets, got {len(replicas)}"
if endpoint == 'describe_ring':
ring_info = await manager.api.describe_ring(servers[0].ip_addr, ks, "test")
logger.info(f'api.describe_ring: {columnar(ring_info)}')
api_data = [ { 'token': x['end_token'], 'nodes': set(x['endpoints']) } for x in ring_info ]
elif endpoint == 'range_to_endpoint':
rte_info = await manager.api.range_to_endpoint_map(servers[0].ip_addr, ks, "test")
rte_info.sort(key = lambda x: int(x['key'][1])) # sort by end token
logger.info(f'api.range_to_endpoint_map: {columnar(rte_info)}')
api_data = [ { 'token': x['key'][1], 'nodes': set(x['value']) } for x in rte_info ]
elif endpoint == 'tokens_endpoint':
te_info = await manager.api.tokens_endpoint(servers[0].ip_addr, ks, "test")
logger.info(f'api.tokens_endpoint: {columnar(te_info)}')
api_data = [ { 'token': x['key'], 'node': x['value'] } for x in te_info ]
else:
raise RuntimeError('invalid endpoint parameter')
assert len(replicas) == len(api_data), f"{endpoint} returned wrong number of ranges"
for x, rep in zip(api_data, replicas):
assert x['token'] == f'{rep.last_token}'
nodes = set([hosts[r[0]] for r in rep.replicas])
if 'nodes' in x:
assert x['nodes'] == nodes
elif 'node' in x:
assert x['node'] in nodes
else:
raise RuntimeError('api_data is badly prepared')
# ALTER KEYSPACE cannot change the replication factor by more than 1 at a time.
# That provides us with a guarantee that the old and the new QUORUM overlap.
# In this test, we verify that in a simple scenario with one DC. We explicitly disable
# enforcing RF-rack-valid keyspaces to be able to perform more flexible alterations.
@pytest.mark.asyncio
async def test_singledc_alter_tablets_rf(manager: ManagerClient):
await manager.server_add(config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, property_file={"dc": "dc1", "rack": "r1"})
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
async def change_rf(rf):
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {rf}}}")
await change_rf(2) # Increasing the RF by 1 should be OK.
await change_rf(3) # Increasing the RF by 1 again should also be OK.
await change_rf(3) # Setting the same RF shouldn't cause any problems.
await change_rf(4) # Increasing the RF by 1 again should still be OK.
await change_rf(3) # Decreasing the RF by 1 should be OK.
with pytest.raises(InvalidRequest):
await change_rf(5) # Trying to increase the RF by 2 should fail.
with pytest.raises(InvalidRequest):
await change_rf(1) # Trying to decrease the RF by 2 should fail.
with pytest.raises(InvalidRequest):
await change_rf(10) # Trying to increase the RF by more than 2 should fail.
with pytest.raises(InvalidRequest):
await change_rf(0) # Trying to decrease the RF by more than 2 should fail.
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
# See also `test_singledc_alter_tablets_rf` above for basic scenarios tested
@pytest.mark.asyncio
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
cql = manager.get_cql()
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
# need to create a table to not change only the schema, but also tablets replicas
await cql.run_async(f"create table {ks}.t (pk int primary key)")
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
# changing RF of dc2 from 0 to 2 should fail
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1', 'rack2']}}")
# changing RF of dc2 from 0 to 1 should succeed
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1']}}")
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
repl = get_replication(cql, ks)
logger.info(f"repl = {repl}")
assert len(repl['dc1']) == 1
assert repl['dc2'] == ['rack1']
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2'], 'dc2': ['rack1', 'rack2']}}")
# as above, but decrementing
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}}")
# as above, but decrement 1 RF and increment the other
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1','rack2'], 'dc2': 0}}")
# as above, but RFs are swapped
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': ['rack1', 'rack2']}}")
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 0}}")
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0}}")
@pytest.mark.asyncio
async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
async def check_rf(ks: str, expected_dc1_rf: int, expected_dc2_rf: int):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication)
logger.info(f"repl = {repl}")
assert get_replica_count(repl['dc1']) == expected_dc1_rf if expected_dc1_rf > 0 else 'dc1' not in repl
assert get_replica_count(repl['dc2']) == expected_dc2_rf if expected_dc2_rf > 0 else 'dc2' not in repl
return repl
cql = manager.get_cql()
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
await cql.run_async(f"create table {ks}.t (pk int primary key)")
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
with pytest.raises(ConfigurationException, match="Attempted to implicitly drop replicas in datacenter dc2. If this is the desired behavior, set replication factor to 0 in dc2 explicitly."):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
repl = await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
dc1_rf = repl['dc1']
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {dc1_rf}, 'dc2': 0}}")
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=0)
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
await cql.run_async(f"alter keyspace {ks} with durable_writes = true")
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
injection = "create_with_numeric"
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [injection]}
servers = [await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1a'}),
await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1b'}),
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2a'}),
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2b'})]
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
await cql.run_async("create table ks2.t (pk int primary key);")
repl = await get_replication_options("ks2")
assert repl['dc1'] == '1'
assert repl['dc2'] == '2'
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks3.t (pk int primary key);")
repl = await get_replication_options("ks3")
assert repl['dc1'] == '1'
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
await cql.run_async("create table ks4.t (pk int primary key);")
repl = await get_replication_options("ks4")
assert repl['dc1'] == '1'
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks5.t (pk int primary key);")
repl = await get_replication_options("ks5")
assert repl['dc1'] == '2'
assert repl['dc2'] == '2'
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks6.t (pk int primary key);")
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
assert len(tablet_replicas) == 4
for r in tablet_replicas:
assert len(r.replicas) == 1
assert r.replicas[0][0] == host_ids[1]
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
repl = await get_replication_options("ks2")
assert repl['dc1'] == ['rack1a']
assert len(repl['dc2']) == 2
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks2", "t")
assert len(tablet_replicas) == 4
for r in tablet_replicas:
assert len(r.replicas) == 3
ks_replicas = [host_ids[0], host_ids[2], host_ids[3]]
assert all(replica[0] in ks_replicas for replica in r.replicas)
assert all(host_id in [r[0] for r in r.replicas] for host_id in ks_replicas)
try:
await cql.run_async("alter keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a']};")
assert False
except ConfigurationException:
pass
try:
await cql.run_async("alter keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b']};")
assert False
except ConfigurationException:
pass
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
repl = await get_replication_options("ks5")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
assert repl['dc2'] == '2'
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
repl = await get_replication_options("ks6")
assert repl['dc1'] == '2'
assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a'
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
injection = "create_with_numeric"
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [injection]}
servers = [await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc1', 'rack': 'rack1a'}),
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc1', 'rack': 'rack1b'}),
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2a'}),
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
cql = manager.get_cql()
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await manager.server_stop_gracefully(servers[-1].server_id)
failed = False
try:
await manager.server_start(server_id=servers[-1].server_id, wait_others=3, connect_driver=True, cmdline_options_override=["--enforce-rack-list", "true", "--smp", "2"])
except Exception:
failed = True
await manager.server_stop_gracefully(servers[-1].server_id)
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
assert failed
servers = servers[0:-1]
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
repl = await get_replication_options("ks1")
assert repl['dc1'] == ['rack1b']
logging.info("Rolling restart")
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks2")
assert len(repl['dc1']) == 2
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
repl = await get_replication_options("ks3")
assert len(repl['dc1']) == 1
assert len(repl['dc2']) == 1
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
assert 'rack2a' in repl['dc2']
await cql.run_async("create keyspace ksv2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1};")
failed = False
try:
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2};")
except Exception:
failed = True
assert failed
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
repl = await get_replication_options("ks1")
assert len(repl['dc1']) == 1
assert repl['dc1'][0] == 'rack1b'
assert len(repl['dc2']) == 1
assert repl['dc2'][0] == 'rack2a'
# dc1 - removed; dc2 - the same value
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 1};")
await manager.server_add(config=config, cmdline=["--enforce-rack-list", "true", "--smp", "2"], property_file={'dc': 'dc2', 'rack': 'rack2b'})
await cql.run_async("alter keyspace ksv with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1};")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
async def get_replication_options(ks: str):
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
return repl
numeric_injection = "create_with_numeric"
colocation_injection = "wait_with_rack_list_colocation"
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [numeric_injection, colocation_injection]}
cmdline = [
'--logger-log-level', 'load_balancer=debug',
'--smp=2',
]
servers = [await manager.server_add(config=config, cmdline=cmdline, property_file={'dc': 'dc1', 'rack': 'rack1a'}),
await manager.server_add(config=config, cmdline=cmdline, property_file={'dc': 'dc1', 'rack': 'rack1b'})]
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
cql = manager.get_cql()
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
await cql.run_async("create table ks1.t (pk int primary key);")
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
s1_log = await manager.server_open_log(coord_serv.server_id)
s1_mark = await s1_log.mark()
async def alter_keyspace():
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
alter_task = asyncio.create_task(alter_keyspace())
await s1_log.wait_for('In make_rack_list_colocation_plan', from_mark=s1_mark)
task_manager_client = TaskManagerClient(manager.api)
tasks = await task_manager_client.list_tasks(servers[0].ip_addr, "global_topology_requests")
rf_change_tasks = [t for t in tasks if t.type == "keyspace_rf_change"]
assert len(rf_change_tasks) == 1
task_id = rf_change_tasks[0].task_id
await task_manager_client.abort_task(servers[0].ip_addr, task_id)
task = await task_manager_client.wait_for_task(servers[0].ip_addr, task_id)
assert task.state == "failed"
failed = False
try:
await alter_task
except Exception as e:
failed = True
assert failed
repl = await get_replication_options("ks1")
assert repl['dc1'] == '1'
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
# Check that an existing cached read, will be cleaned up when the tablet it reads
# from is migrated away.
@pytest.mark.asyncio
async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode):
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
if build_mode != "release":
cfg['error_injections_at_startup'] = [{'name': 'querier-cache-ttl-seconds', 'value': 999999999}]
servers = await manager.servers_add(2, config=cfg)
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH"
" replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
" and tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, c int, PRIMARY KEY (pk, ck));")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, ck, c) VALUES (0, {k}, 0);") for k in range(128)])
statement = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 0", fetch_size=10)
cql.execute(statement)
def get_querier_cache_population(server):
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
pattern = re.compile("^scylla_database_querier_cache_population")
for metric in metrics.split('\n'):
if pattern.match(metric) is not None:
return int(float(metric.split()[1]))
assert any(map(lambda x: x > 0, [get_querier_cache_population(server) for server in servers]))
table_id = await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'")
table_id = table_id[0].id
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
tablet_infos = list(tablet_infos)
assert len(tablet_infos) == 1
tablet_info = tablet_infos[0]
assert len(tablet_info.replicas) == 1
hosts = {await manager.get_host_id(server.server_id) for server in servers}
print(f"HOSTS: {hosts}")
source_host, source_shard = tablet_info.replicas[0]
hosts.remove(str(source_host))
target_host, target_shard = list(hosts)[0], source_shard
await manager.api.move_tablet(
node_ip=servers[0].ip_addr,
ks=ks,
table="test",
src_host=source_host,
src_shard=source_shard,
dst_host=target_host,
dst_shard=target_shard,
token=tablet_info.last_token)
# The tablet move should have evicted the cached reader.
assert all(map(lambda x: x == 0, [get_querier_cache_population(server) for server in servers]))
# Reproducer for https://github.com/scylladb/scylladb/issues/19052
# 1) table A has N tablets and views
# 2) migration starts for a tablet of A from node 1 to 2.
# 3) migration is at write_both_read_old stage
# 4) coordinator will push writes to both nodes
# 5) A has view, so writes to it will also result in reads (table::push_view_replica_updates())
# 6) tablet's update_effective_replication_map() is not refreshing tablet sstable set (for new tablet migrating in)
# 7) so read on step 5 is not being able to find sstable set for tablet migrating in
@pytest.mark.parametrize("with_cache", ['false', 'true'])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_read_of_pending_replica_during_migration(manager: ManagerClient, with_cache):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--enable-cache', with_cache,
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
# Drop cache to remove dummy entry indicating that underlying mutation source is empty
await manager.api.drop_sstable_caches(servers[1].ip_addr)
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
s1_mark = await s1_log.mark()
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# Release abandoned streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# This test checks that --enable-tablets option and the TABLETS parameters of the CQL CREATE KEYSPACE
# statemement are mutually correct from the "the least surprising behavior" concept. See comments inside
# the test code for more details.
@pytest.mark.parametrize("with_tablets", [True, False])
@pytest.mark.parametrize("replication_strategy", ["NetworkTopologyStrategy", "SimpleStrategy", "EverywhereStrategy", "LocalStrategy"])
@pytest.mark.asyncio
async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, with_tablets, replication_strategy):
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if with_tablets else 'disabled'}
server = await manager.server_add(config=cfg)
cql = manager.get_cql()
# Tablets are only possible when the replication strategy is NetworkTopology
tablets_possible = (replication_strategy == 'NetworkTopologyStrategy')
tablets_enabled_by_default = tablets_possible and with_tablets
# First, check if a kesypace is able to be created with default CQL statement that
# doesn't contain tablets parameters. When possible, tablets should be activated
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}}") as ks:
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
if tablets_enabled_by_default:
assert res.initial_tablets == 0
else:
assert res is None
# Next, check that explicit CQL request for enabling tablets can only be satisfied when
# tablets are possible. Tablets must be activated in this case
if tablets_possible:
expectation = does_not_raise()
else:
expectation = pytest.raises(ConfigurationException)
with expectation:
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}}")
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
assert res.initial_tablets == 0
await cql.run_async(f"drop keyspace {ks}")
# Finally, check that explicitly disabling tablets in CQL results in vnode-based keyspace
# whenever tablets are enabled or not in config
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}}") as ks:
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
assert res is None
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/21564
1) Create a table with 1 initial tablet and populate it
2) Create a view on the table but prevent the generator from processing it using error injection
3) Start migration of the tablet from node 1 to 2
4) Once migration completes, the view should have the correct number of rows
"""
logger.info("Starting Node 1")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
logger.info("Create table, populate it and flush the table to disk")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
num_of_rows = 64
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(num_of_rows)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Inject error to make view building worker pause before processing the sstable")
injection_name = "view_building_worker_pause_before_consume"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Create view")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
async def check_table():
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
if len(result) == 1:
return True
else:
return None
await wait_for(check_table, time.time() + 30)
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
assert len(list(rows)) == num_of_rows
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/19149
1) Create a table with 1 initial tablet and populate it
2) Create a view on the table but prevent the generator
3) Inject error to prevent processing of new sstables in view generator
4) Create an sstable, move it into upload directory of test table and start upload
5) Start migration of the tablet from node 1 to 2
6) Once migration completes, the view should have the correct number of rows
"""
logger.info("Starting Node 1")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
logger.info("Create view")
# Pause view building
await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True)
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
# Check if view building was started
async def check_mv_status():
mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'")
return len(mv_status) == 1 and mv_status[0].status == "STARTED"
await wait_for(check_mv_status, time.time() + 30)
logger.info("Generate an sstable and move it to upload directory of test table")
# create an sstable using a dummy table
await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
node_workdir = await manager.server_get_workdir(servers[0].server_id)
dummy_table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "dummy-*"))[0]
test_table_upload_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*", "upload"))[0]
for src_path in glob.glob(os.path.join(dummy_table_dir, sstable_filename_glob)):
dst_path = os.path.join(test_table_upload_dir, os.path.basename(src_path))
os.rename(src_path, dst_path)
await cql.run_async(f"DROP TABLE {ks}.dummy;")
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Inject error to prevent view generator from processing staged sstables")
injection_name = "view_update_generator_consume_staging_sstable"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Load the sstables from upload directory")
await manager.api.load_new_sstables(servers[0].ip_addr, ks, "test")
# The table now has both staged and unstaged sstables.
# Verify that tablet migration handles them both without causing any base-view inconsistencies.
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
async def check_view_is_built():
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
if len(result) == 1:
return True
else:
return None
await wait_for(check_view_is_built, time.time() + 60)
expected_num_of_rows = 128
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == expected_num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
assert len(list(rows)) == expected_num_of_rows
@pytest.mark.asyncio
async def test_orphaned_sstables_on_startup(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/18038
1) Start a node (node1)
2) Create a table with 1 initial tablet and populate it
3) Start another node (node2)
4) Migrate the existing tablet from node1 to node2
5) Stop node1
6) Copy the sstables from node2 to node1
7) Attempting to start node1 should fail as it now has an 'orphaned' sstable
"""
logger.info("Starting Node 1")
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.disable_tablet_balancing()
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(256)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
node0_workdir = await manager.server_get_workdir(servers[0].server_id)
node0_table_dir = glob.glob(os.path.join(node0_workdir, "data", ks, "test-*"))[0]
logger.info("Start Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
await manager.disable_tablet_balancing()
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Migrate the tablet from node1 to node2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
logger.info("Stop node1 and copy the sstables from node2")
await manager.server_stop(servers[0].server_id)
for src_path in glob.glob(os.path.join(node1_table_dir, sstable_filename_glob)):
dst_path = os.path.join(node0_table_dir, os.path.basename(src_path))
shutil.copy(src_path, dst_path)
# try starting the server again
logger.info("Start node1 with the orphaned sstables and expect it to fail")
# Error thrown is of format : "Unable to load SSTable {sstable_name} : Storage wasn't found for tablet {tablet_id} of table {ks}.test"
await manager.server_start(servers[0].server_id, expected_error="Storage wasn't found for tablet", expected_crash=True)
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
"""
Reproducer for #21826
Verify that a node cannot be removed with tablets when
there are not enough nodes in a datacenter to satisfy the configured replication factor,
even when there is a zero-token node in the same datacenter and in another datacenter,
and when there is another down node in the datacenter, leaving no normal token owners.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
extra_node = 0 if with_zero_token_node else 1
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
node_to_remove = servers['dc1'][0]
node_to_replace = servers['dc1'][1]
replaced_host_id = await manager.get_host_id(node_to_replace.server_id)
initiator_node = servers['dc2'][0]
# Stop both token owners in dc1 to leave no token owners in the datacenter
await manager.server_stop_gracefully(node_to_remove.server_id)
await manager.server_stop_gracefully(node_to_replace.server_id)
logger.info("Attempting removenode - expected to fail")
with pytest.raises(Exception, match=r"Removenode failed"):
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id, ignore_dead=[replaced_host_id])
logger.info(f"Replacing {node_to_replace} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
ignore_dead_nodes=[replaced_host_id])
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
@pytest.mark.asyncio
async def test_excludenode(manager: ManagerClient):
"""
Verifies recovery scenario involving marking the node as excluded using excludenode.
1. Create a cluster with 3 racks, 1 node in rack1, 2 nodes in rack2, and 1 in rack3.
2. The keyspace is initially replicated to rack1 and rack2.
3. We down one node in rack2, which should cause unavailability.
4. We mark the node as excluded using excludenode. This unblocks the next ALTER
5. We add rack3 to RF of the keyspace. This wouldn't succeed without marking the node as excluded.
6. We verify that downed node can be removed successfully, while there are still tablets on it. That's
why we need two nodes in rack2.
"""
servers = await manager.servers_add(servers_num=3, auto_rack_dc='dc1')
await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack2'})
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', "
"'dc1': ['rack1', 'rack2']} AND tablets = { 'initial': 8 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
live_node = servers[0]
node_to_remove = servers[1]
with pytest.raises(Exception, match="Cannot mark host .* as excluded because it's alive"):
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
with pytest.raises(Exception, match=".* does not belong to this cluster"):
await manager.api.exclude_node(live_node.ip_addr, hosts=[str(uuid.uuid4())])
await manager.server_stop(node_to_remove.server_id)
await manager.others_not_see_server(node_to_remove.ip_addr)
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
# Check that tablets can be rebuilt in a new rack with rack2 down.
await cql.run_async(f"ALTER KEYSPACE {ks} WITH REPLICATION = {{ 'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2', 'rack3']}}")
# Check that removenode succeeds on the node which is excluded
await manager.remove_node(live_node.server_id, server_id=node_to_remove.server_id)
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_token_node: bool):
"""
Verify that a node cannot be removed with tablets when
there are not enough nodes in a datacenter to satisfy the configured replication factor,
even when there is a zero-token node in the same datacenter and in another datacenter.
And then verify that that node can be replaced successfully.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
servers['dc2'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
node_to_remove = servers['dc1'][0]
initiator_node = servers['dc2'][0]
await manager.server_stop_gracefully(node_to_remove.server_id)
logger.info("Attempting removenode - expected to fail")
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id,
expected_error="Removenode failed")
logger.info(f"Replacing {node_to_remove} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
@pytest.mark.asyncio
@pytest.mark.nightly
@pytest.mark.parametrize("with_zero_token_node", [False, True])
async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
"""
Verify that nodes can be successfully replaced with tablets when
even when there are not enough nodes in a datacenter to satisfy the configured replication factor,
with and without zero-token nodes in the same datacenter and in another datacenter,
and when there is another down node in the datacenter, leaving no normal token owners,
but other datacenters can be used to rebuild the data.
"""
servers: dict[str, list[ServerInfo]] = dict()
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
{'dc': 'dc1', 'rack': 'rack1_1'},
{'dc': 'dc1', 'rack': 'rack1_2'}])
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
extra_node = 0 if with_zero_token_node else 1
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
if with_zero_token_node:
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ALL
keys = range(256)
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
nodes_to_replace = servers['dc1'][0:2]
replaced_host_id = await manager.get_host_id(nodes_to_replace[1].server_id)
# Stop both token owners in dc1 to leave no token owners in the datacenter
for node in nodes_to_replace:
await manager.server_stop_gracefully(node.server_id)
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
ignore_dead_nodes=[replaced_host_id])
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[0].property_file())
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[1].property_file())
logger.info("Verifying data")
for node in servers['dc2']:
await manager.server_stop_gracefully(node.server_id)
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
# For dropping the keyspace
await asyncio.gather(*[manager.server_start(node.server_id) for node in servers['dc2']])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_drop_keyspace_while_split(manager: ManagerClient):
# Reproducer for: https://github.com/scylladb/scylladb/issues/22431
# This tests if the split ready compaction groups are correctly created
# on a shard with several storage groups for the same table
logger.info("Bootstrapping cluster")
cmdline = [ '--target-tablet-size-in-bytes', '8192',
'--smp', '2' ]
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
servers = [await manager.server_add(config=config, cmdline=cmdline)]
s0_log = await manager.server_open_log(servers[0].server_id)
cql = manager.get_cql()
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
await manager.disable_tablet_balancing()
# create a table so that it has at least 2 tablets (and storage groups) per shard
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
keys = range(2048)
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await manager.api.enable_injection(servers[0].ip_addr, 'truncate_compaction_disabled_wait', one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
# enable the load balancer which should emmit a tablet split
await manager.enable_tablet_balancing()
# wait for compaction groups to be created and split to begin
await s0_log.wait_for('split_storage_groups_wait: wait')
# start a DROP and wait for it to disable compaction
drop_ks_task = cql.run_async(f'DROP KEYSPACE {ks};')
await s0_log.wait_for('truncate_compaction_disabled_wait: wait')
# release split
await manager.api.message_injection(servers[0].ip_addr, "split_storage_groups_wait")
# release drop and wait for it to complete
await manager.api.message_injection(servers[0].ip_addr, "truncate_compaction_disabled_wait")
await drop_ks_task
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
# Reproducer for https://github.com/scylladb/scylladb/issues/25706
logger.info('Bootstrapping cluster')
cfg = { 'enable_tablets': True }
cmdline = ['--smp', '2' ]
server = await manager.server_add(cmdline=cmdline, config=cfg)
cql = manager.get_cql()
# We don't want the load balancer to migrate tablets during the test
await manager.disable_tablet_balancing()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
# Create the table, insert data and flush
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(100)])
await manager.api.flush_keyspace(server.ip_addr, ks)
# Get the current location of the tablet
token = 0
replica = await get_tablet_replica(manager, server, ks, "test", token)
await manager.api.enable_injection(server.ip_addr, "wait_before_stop_compaction_groups", one_shot=True)
await manager.api.enable_injection(server.ip_addr, "truncate_compaction_disabled_wait", one_shot=True)
slog = await manager.server_open_log(server.server_id)
smark = await slog.mark()
# Start migrating the tablet
dst_shard = 1 if replica[0] == 0 else 1
migration_task = asyncio.create_task(
manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], replica[1], replica[0], dst_shard, token))
# Wait until the leaving replica is about to be cleaned up.
# storage_group's gate has been closed, but the compaction groups have not yet been stopped and disabled
await slog.wait_for("wait_before_stop_compaction_groups: wait", from_mark=smark)
# Start dropping the table
drop_future = cql.run_async(f"DROP TABLE {ks}.test;")
# Wait for truncate to complete disabling compaction
await slog.wait_for("truncate_compaction_disabled_wait: wait", from_mark=smark)
# Release the migration's tablet cleanup
await manager.api.message_injection(server.ip_addr, "wait_before_stop_compaction_groups")
# Release drop/truncate
await manager.api.message_injection(server.ip_addr, "truncate_compaction_disabled_wait")
await drop_future
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_two_tablets_concurrent_repair_and_migration(manager: ManagerClient):
injection = "repair_shard_repair_task_impl_do_repair_ranges"
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
await manager.disable_tablet_balancing()
all_replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
all_replicas.sort(key=lambda x: x.last_token)
assert len(all_replicas) >= 3
repair_replicas = all_replicas[1]
migration_replicas = all_replicas[0]
logs = [await manager.server_open_log(s.server_id) for s in servers]
marks = [await log.mark() for log in logs]
async def repair_task():
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", repair_replicas.last_token)
async def migration_task():
await wait_for_first_completed([log.wait_for('Started to repair', from_mark=mark) for log, mark in zip(logs, marks)])
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await asyncio.gather(repair_task(), migration_task())
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_split_finalization_with_migrations(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/21762
1) Start a cluster with two nodes with error injected to prevent resize finalisatio
2) Create and populate `test` table
3) Trigger a split in the table by increasing `min_tablet_count`
4) Wait for the table `test` to reach split finalization stage
5) Create and populate another table `blocker`
6) Disable tablet balancing and move all tablets of `test` and `blocker` table from node 2 to node 1
7) Enable tablet balancing and disable error injection to allow migration and finalisation to proceed.
8) Expect finalization in `test` to be preferred over migrations in both tables
"""
logger.info("Starting Cluster")
cfg = {
'enable_user_defined_functions': False, 'enable_tablets': True,
'error_injections_at_startup': [
# intially disable transitioning into tablet_resize_finalization topology state
'tablet_split_finalization_postpone',
],
'tablet_load_stats_refresh_interval_in_seconds': 1
}
cmdline = [
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'load_balancer=debug',
]
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg)
logger.info("Create and populate test table")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await manager.api.disable_autocompaction(servers[0].ip_addr, "test")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
test_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'test'"))[0].id
logger.info("Trigger split in table")
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
# Wait for splits to finalise; they don't execute yet as they are prevented by the error injection
logger.info("Wait for tablets to split")
log = await manager.server_open_log(servers[0].server_id)
await log.wait_for(f"Finalizing resize decision for table {test_table_id} as all replicas agree on sequence number 1")
logger.info("Create and populate `blocker` table")
await cql.run_async("CREATE TABLE test.blocker (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.blocker (pk, c) VALUES ({k}, {k%3});") for k in range(128)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "blocker")
blocker_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'blocker'"))[0].id
s0_host_id = await manager.get_host_id(servers[0].server_id)
for cf in ["test", "blocker"]:
logger.info(f"Move all tablets of test.{cf} from Node 2 to Node 1")
await manager.disable_tablet_balancing()
s1_replicas = await get_all_tablet_replicas(manager, servers[1], "test", cf)
migration_tasks = [
manager.api.move_tablet(servers[0].ip_addr, "test", cf,
tablet.replicas[0][0], tablet.replicas[0][1],
s0_host_id, 0, tablet.last_token)
for tablet in s1_replicas
]
await asyncio.gather(*migration_tasks)
logger.info("Re-enable tablet balancing; it should be blocked by pending split finalization")
await manager.enable_tablet_balancing()
mark, _ = await log.wait_for("Setting tablet balancing to true")
logger.info("Unblock resize finalisation and verify that the finalisation is preferred over migrations")
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
split_finalization_mark, _ = await log.wait_for("Finished tablet resize finalization", from_mark=mark)
for table_id in [test_table_id, blocker_table_id]:
migration_mark, _ = await log.wait_for(f"Will set tablet {table_id}:\\d+ stage to write_both_read_old", from_mark=mark)
assert split_finalization_mark < migration_mark, f"Tablet migration of {table_id} was scheduled before resize finalization"
# ensure all migrations complete
logger.info("Waiting for migrations to complete")
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):
injection = "repair_writer_impl_create_writer_wait"
cmdline = [
'--logger-log-level', 'repair=debug',
'--hinted-handoff-enabled', '0',
]
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
await manager.disable_tablet_balancing()
async def insert_with_down(down_server):
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
cql = await safe_rolling_restart(manager, [servers[0]], with_down=insert_with_down)
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
all_replicas = await get_all_tablet_replicas(manager, servers[1], ks, "test")
migration_replicas = all_replicas[0]
logs = [await manager.server_open_log(s.server_id) for s in servers]
marks = [await log.mark() for log in logs]
async def repair_task():
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test2", token="all")
async def migration_task():
await wait_for_first_completed([log.wait_for(f'repair_writer: keyspace={ks}', from_mark=mark) for log, mark in zip(logs, marks)])
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
await asyncio.gather(repair_task(), migration_task())
async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
logger.info("Bootstrapping cluster")
cfg = {'enable_user_defined_functions': False, 'enable_tablets': True}
if fail:
cfg['error_injections_at_startup'] = ['rebuild_repair_stage_fail']
host_ids = []
servers = []
async def make_server(rack: str):
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
servers.append(s)
host_ids.append(await manager.get_host_id(s.server_id))
await make_server("r1")
await make_server("r1")
await make_server("r2")
await manager.disable_tablet_balancing()
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is on [{replicas}]")
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
replicas = [ r[0] for r in replicas[0].replicas ]
for h in host_ids:
if h not in replicas:
new_replica = (h, 0)
break
else:
assert False, "Cannot find node without replica"
logs = []
for s in servers:
logs.append(await manager.server_open_log(s.server_id))
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
assert sum([len(await log.grep(rf'.*Will set tablet .* stage to rebuild_repair.*')) for log in logs]) == 1
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
logger.info(f"Tablet is now on [{replicas}]")
assert len(replicas) == 1
replicas = [ r[0] for r in replicas[0].replicas ]
assert len(replicas) == 2 if fail else len(replicas) == 3
for h, s in zip(host_ids, servers):
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
if h != host_ids[0]:
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
logger.info(f"Host {h} reports {res} as mutation fragments count")
if h in replicas:
assert res[0].count != 0
else:
assert res[0].count == 0
@pytest.mark.asyncio
async def test_tablet_rebuild(manager: ManagerClient):
await check_tablet_rebuild_with_repair(manager, False)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_rebuild_failure(manager: ManagerClient):
await check_tablet_rebuild_with_repair(manager, True)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_repair_with_invalid_session_id(manager: ManagerClient):
injection = "handle_tablet_migration_repair_random_session"
token = -1
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
logs = [await manager.server_open_log(s.server_id) for s in servers]
marks = [await log.mark() for log in logs]
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
matches = [await log.grep(r"std::runtime_error \(Session not found", from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) > 0
@pytest.mark.asyncio
async def test_moving_replica_to_replica(manager: ManagerClient):
"""
Verify that trying to move a tablet replica to a node that is already
a replica is prevented with an appropriate error message.
"""
ks = "ks"
table = "my_table"
# For convenience when moving tablets.
cmdline = ["--smp=1"]
s1, s2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
await manager.disable_tablet_balancing()
host_id1 = await manager.get_host_id(s1.server_id)
host_id2 = await manager.get_host_id(s2.server_id)
cql, _ = await manager.get_ready_cql([s1, s2])
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} "
f"AND tablets = {{'enabled': true, 'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
# Doesn't matter. There's only one tablet.
tablet_token = 0
with pytest.raises(Exception, match=rf"Tablet .* has replica on {host_id2}"):
await manager.api.move_tablet(
node_ip=s1.ip_addr,
ks=ks,
table=table,
src_host=host_id1,
src_shard=0,
dst_host=host_id2,
dst_shard=0,
token=tablet_token)
@pytest.mark.asyncio
async def test_moving_replica_within_single_rack(manager: ManagerClient):
"""
Verify that it's possible to move a tablet from a replica node to a node
that's not a replica.
"""
ks = "ks"
table = "my_table"
# For convenience when moving tablets.
cmdline = ["--smp=1"]
s1, s2 = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "r1"})
await manager.disable_tablet_balancing()
host_id1 = await manager.get_host_id(s1.server_id)
host_id2 = await manager.get_host_id(s2.server_id)
cql, _ = await manager.get_ready_cql([s1, s2])
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} "
f"AND tablets = {{'enabled': true, 'initial': 1}}")
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
# Doesn't matter. There's only one tablet.
tablet_token = 0
host_id, _ = await get_tablet_replica(manager, s1, ks, table, tablet_token)
if host_id != host_id1:
s1, s2 = s2, s1
host_id1, host_id2 = host_id2, host_id1
await manager.api.move_tablet(
node_ip=s1.ip_addr,
ks=ks,
table=table,
src_host=host_id1,
src_shard=0,
dst_host=host_id2,
dst_shard=0,
token=tablet_token)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_disabling_balancing_preempts_balancer(manager: ManagerClient):
servers = await manager.servers_add(2, auto_rack_dc="dc1")
coord_srv = servers[0]
await manager.api.enable_injection(coord_srv.ip_addr, "tablet_allocator_shuffle", one_shot=False)
await manager.api.enable_injection(coord_srv.ip_addr, "tablet_keep_repairing", one_shot=False)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy'}}") as ks:
cql = manager.get_cql()
log = await manager.server_open_log(coord_srv.server_id)
mark = await log.mark()
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await log.wait_for('Initiating tablet', from_mark=mark)
# Should preempt balancing
await manager.disable_tablet_balancing()
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_table_creation_wakes_up_balancer(manager: ManagerClient):
"""
Reproduces both https://github.com/scylladb/scylladb/issues/25163 and https://github.com/scylladb/scylladb/issues/27958
Scenario:
1. Start a cluster
2. Block the topology coordinator right before it goes to sleep
3. Create a table, which should wake up the coordinator
4. Verify that the coordinator didn't go to sleep
"""
server = await manager.server_add()
log = await manager.server_open_log(server.server_id)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
# Block coordinator right before going to sleep
# We use node bootstrap as an operation which is going to be trapped on exit, but it's arbitrary.
mark = await log.mark()
await manager.api.enable_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep', one_shot=True)
await manager.server_add()
await log.wait_for('wait-before-topology-coordinator-goes-to-sleep: wait', from_mark=mark)
# Create a table, which should prevent the coordinator from sleeping
await manager.api.enable_injection(server.ip_addr, 'wait-after-topology-coordinator-gets-event', one_shot=True)
mark = await log.mark()
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
# Verify it didn't go to sleep. If it did, the wait for wakeup would be 30s on average,
# up to stats refresh period, which is 60s. So use a small timeout.
await manager.api.message_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep')
await log.wait_for('wait-after-topology-coordinator-gets-event: wait', from_mark=mark, timeout=5)