Introduced a new max_tablet_count tablet option that caps the maximum number of tablets a table can have. This feature is designed primarily for backup and restore workflows. During backup, when load balancing is disabled for snapshot consistency, the current tablet count is recorded in the backup manifest. During restore, max_tablet_count is set to this recorded value, ensuring the restored table's tablet count never exceeds the original snapshot's tablet distribution. This guarantee enables efficient file-based SSTable streaming during restore, as each SSTable remains fully contained within a single tablet boundary. Closes scylladb/scylladb#28450
1704 lines
89 KiB
Python
1704 lines
89 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import uuid
|
|
|
|
from cassandra.protocol import ConfigurationException, InvalidRequest, SyntaxException
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
|
from test.cluster.test_tablets2 import safe_rolling_restart
|
|
from test.pylib.internal_types import ServerInfo
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.repair import create_table_insert_data_for_repair
|
|
from test.pylib.rest_client import HTTPError, read_barrier
|
|
from test.pylib.scylla_cluster import ReplaceConfig
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
|
from test.pylib.util import unique_name, wait_for, wait_for_first_completed
|
|
from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, \
|
|
get_topology_coordinator, parse_replication_options, get_replication, get_replica_count, find_server_by_host_id
|
|
from contextlib import nullcontext as does_not_raise
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
import re
|
|
import requests
|
|
import random
|
|
import os
|
|
import glob
|
|
import shutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# The glob below is designed to match the version-generation-format-component.extension format, e.g.
|
|
# da-3gqu_1hke_4919c2kfgur9y2bm77-bti-Data.db
|
|
# me-1-big-TOC.txt
|
|
sstable_filename_glob = "??-*-???-*.*"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
# This test verifies that Scylla rejects creating a table if there are too few token-owning nodes.
|
|
# That means that a keyspace must already be in place, but that's impossible with RF-rack-valid
|
|
# keyspaces being enforced. We could go over this constraint by creating 3 nodes and then
|
|
# decommissioning one of them before attempting to create a table, but if we decide to constraint
|
|
# decommission later on, this test will have to be modified again. Let's simply disable the option.
|
|
cfg = cfg | {'rf_rack_valid_keyspaces': False}
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
res = await cql.run_async("SELECT data_center FROM system.local")
|
|
this_dc = res[0].data_center
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 3}}") as ks:
|
|
with pytest.raises(ConfigurationException, match=f"Datacenter {this_dc} doesn't have enough token-owning nodes"):
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_scaling_option_is_respected(manager: ManagerClient):
|
|
# 32 is high enough to ensure we demand more tablets than the default choice.
|
|
cfg = {'tablets_mode_for_new_keyspaces': 'enabled', 'tablets_initial_scale_factor': 32}
|
|
servers = await manager.servers_add(1, config=cfg, cmdline=['--smp', '2'])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
|
|
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
|
|
|
tablets = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
|
|
assert len(tablets) == 64
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_cannot_decommision_below_replication_factor(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(4, config=cfg, property_file=[
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r2"},
|
|
{"dc": "dc1", "rack": "r3"}
|
|
])
|
|
|
|
logger.info("Creating table")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
logger.info("Decommission some node")
|
|
await manager.decommission_node(servers[0].server_id)
|
|
|
|
with pytest.raises(HTTPError, match="Decommission failed"):
|
|
logger.info("Decommission another node")
|
|
await manager.decommission_node(servers[1].server_id)
|
|
|
|
# Three nodes should still provide CL=3
|
|
logger.info("Checking table")
|
|
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.THREE)
|
|
rows = await cql.run_async(query)
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
async def test_reshape_with_tablets(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
server = (await manager.servers_add(1, config=cfg, cmdline=['--smp', '1', '--logger-log-level', 'compaction=debug']))[0]
|
|
|
|
logger.info("Creating table")
|
|
cql = manager.get_cql()
|
|
number_of_tablets = 2
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': {number_of_tablets} }}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Disabling autocompaction for the table")
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, "test")
|
|
|
|
logger.info("Populating table")
|
|
loop_count = 32
|
|
for _ in range(loop_count):
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(64)])
|
|
await manager.api.keyspace_flush(server.ip_addr, ks, "test")
|
|
# After populating the table, expect loop_count number of sstables per tablet
|
|
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
|
|
assert len(sstable_info[0]['sstables']) == number_of_tablets * loop_count
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
mark = await log.mark()
|
|
|
|
# Restart the server and verify that the sstables have been reshaped down to one sstable per tablet
|
|
logger.info("Restart the server")
|
|
await manager.server_restart(server.server_id)
|
|
await reconnect_driver(manager)
|
|
|
|
await log.wait_for(f"Reshape {ks}.test .* Reshaped 32 sstables to .*", from_mark=mark, timeout=30)
|
|
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
|
|
assert len(sstable_info[0]['sstables']) == number_of_tablets
|
|
|
|
|
|
@pytest.mark.parametrize("direction", ["up", "down", "none"])
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_rf_change(manager: ManagerClient, direction):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
res = await cql.run_async("SELECT data_center FROM system.local")
|
|
this_dc = res[0].data_center
|
|
|
|
if direction == 'up':
|
|
rf_from_opt = "['rack1']"
|
|
rf_to_opt = "['rack1', 'rack2']"
|
|
rf_from = 1
|
|
rf_to = 2
|
|
if direction == 'down':
|
|
rf_from_opt = "['rack1', 'rack2']"
|
|
rf_to_opt = "['rack1']"
|
|
rf_from = 2
|
|
rf_to = 1
|
|
if direction == 'none':
|
|
rf_from_opt = "['rack1']"
|
|
rf_to_opt = "['rack1']"
|
|
rf_from = 1
|
|
rf_to = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from_opt}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.test_mv AS SELECT pk FROM {ks}.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
|
|
|
|
async def check_allocated_replica(expected: int):
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], ks, 'test_mv', is_view=True)
|
|
for r in replicas:
|
|
logger.info(f"{r.replicas}")
|
|
assert len(r.replicas) == expected
|
|
|
|
logger.info(f"Checking {rf_from} allocated replicas")
|
|
await check_allocated_replica(rf_from)
|
|
|
|
logger.info(f"Altering RF {rf_from} -> {rf_to}")
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_to_opt}}}")
|
|
|
|
logger.info(f"Checking {rf_to} re-allocated replicas")
|
|
await check_allocated_replica(rf_to)
|
|
|
|
if direction != 'up':
|
|
# Don't check fragments for up/none changes, scylla crashes when checking nodes
|
|
# that (validly) miss the replica, see scylladb/scylladb#18786
|
|
return
|
|
|
|
fragments = { pk: set() for pk in random.sample(range(128), 17) }
|
|
for s in servers:
|
|
host_id = await manager.get_host_id(s.server_id)
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
await read_barrier(manager.api, s.ip_addr) # scylladb/scylladb#18199
|
|
for k in fragments:
|
|
res = await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
|
|
for fragment in res:
|
|
if fragment.partition_region == 0: # partition start
|
|
fragments[k].add(host_id)
|
|
logger.info("Checking fragments")
|
|
for k in fragments:
|
|
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClient):
|
|
"""Check that MUTATION_FRAGMENTS() queries handle the case when a partition
|
|
not owned by the node is attempted to be read."""
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'tablets_mode_for_new_keyspaces': 'enabled' }
|
|
servers = await manager.servers_add(3, config=cfg, property_file=[
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r2"}
|
|
])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
|
|
|
|
for s in servers:
|
|
host_id = await manager.get_host_id(s.server_id)
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
for k in range(4):
|
|
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
|
|
|
|
|
|
# The test checks that describe_ring and range_to_address_map API return
|
|
# information that's consistent with system.tablets contents
|
|
@pytest.mark.parametrize("endpoint", ["describe_ring", "range_to_endpoint", "tokens_endpoint"])
|
|
@pytest.mark.asyncio
|
|
async def test_tablets_api_consistency(manager: ManagerClient, endpoint):
|
|
servers = []
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack1'})
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack2'})
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack3'})
|
|
await manager.disable_tablet_balancing()
|
|
hosts = { await manager.get_host_id(s.server_id): s.ip_addr for s in servers }
|
|
cql = manager.get_cql()
|
|
expected_tablet_count = 4
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH TABLETS = {{'min_tablet_count': {expected_tablet_count}, 'max_tablet_count': {expected_tablet_count}}};")
|
|
|
|
|
|
def columnar(lst):
|
|
return f"\n {'\n '.join([f'{x}' for x in lst])}"
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
|
|
logger.info(f'system.tablets: {columnar(replicas)}')
|
|
# replicas contains a list of objects, one per tablet, so in the end len(replicas) returns the current number of tablets.
|
|
# with load balancing disabled and min_tablet_count being equal to max_tablet_count,
|
|
# we expect the same number of tablets as the min/max_tablet_count value
|
|
assert len(replicas) == expected_tablet_count, f"Expected {expected_tablet_count} tablets, got {len(replicas)}"
|
|
|
|
if endpoint == 'describe_ring':
|
|
ring_info = await manager.api.describe_ring(servers[0].ip_addr, ks, "test")
|
|
logger.info(f'api.describe_ring: {columnar(ring_info)}')
|
|
api_data = [ { 'token': x['end_token'], 'nodes': set(x['endpoints']) } for x in ring_info ]
|
|
elif endpoint == 'range_to_endpoint':
|
|
rte_info = await manager.api.range_to_endpoint_map(servers[0].ip_addr, ks, "test")
|
|
rte_info.sort(key = lambda x: int(x['key'][1])) # sort by end token
|
|
logger.info(f'api.range_to_endpoint_map: {columnar(rte_info)}')
|
|
api_data = [ { 'token': x['key'][1], 'nodes': set(x['value']) } for x in rte_info ]
|
|
elif endpoint == 'tokens_endpoint':
|
|
te_info = await manager.api.tokens_endpoint(servers[0].ip_addr, ks, "test")
|
|
logger.info(f'api.tokens_endpoint: {columnar(te_info)}')
|
|
api_data = [ { 'token': x['key'], 'node': x['value'] } for x in te_info ]
|
|
else:
|
|
raise RuntimeError('invalid endpoint parameter')
|
|
|
|
assert len(replicas) == len(api_data), f"{endpoint} returned wrong number of ranges"
|
|
for x, rep in zip(api_data, replicas):
|
|
assert x['token'] == f'{rep.last_token}'
|
|
nodes = set([hosts[r[0]] for r in rep.replicas])
|
|
if 'nodes' in x:
|
|
assert x['nodes'] == nodes
|
|
elif 'node' in x:
|
|
assert x['node'] in nodes
|
|
else:
|
|
raise RuntimeError('api_data is badly prepared')
|
|
|
|
|
|
# ALTER KEYSPACE cannot change the replication factor by more than 1 at a time.
|
|
# That provides us with a guarantee that the old and the new QUORUM overlap.
|
|
# In this test, we verify that in a simple scenario with one DC. We explicitly disable
|
|
# enforcing RF-rack-valid keyspaces to be able to perform more flexible alterations.
|
|
@pytest.mark.asyncio
|
|
async def test_singledc_alter_tablets_rf(manager: ManagerClient):
|
|
await manager.server_add(config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, property_file={"dc": "dc1", "rack": "r1"})
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
|
|
async def change_rf(rf):
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {rf}}}")
|
|
|
|
await change_rf(2) # Increasing the RF by 1 should be OK.
|
|
await change_rf(3) # Increasing the RF by 1 again should also be OK.
|
|
await change_rf(3) # Setting the same RF shouldn't cause any problems.
|
|
await change_rf(4) # Increasing the RF by 1 again should still be OK.
|
|
await change_rf(3) # Decreasing the RF by 1 should be OK.
|
|
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(5) # Trying to increase the RF by 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(1) # Trying to decrease the RF by 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(10) # Trying to increase the RF by more than 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(0) # Trying to decrease the RF by more than 2 should fail.
|
|
|
|
|
|
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
|
|
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
|
|
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
|
|
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
|
|
# See also `test_singledc_alter_tablets_rf` above for basic scenarios tested
|
|
@pytest.mark.asyncio
|
|
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
|
|
|
|
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
|
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
|
|
# need to create a table to not change only the schema, but also tablets replicas
|
|
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
# changing RF of dc2 from 0 to 2 should fail
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1', 'rack2']}}")
|
|
|
|
# changing RF of dc2 from 0 to 1 should succeed
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1']}}")
|
|
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
|
|
repl = get_replication(cql, ks)
|
|
logger.info(f"repl = {repl}")
|
|
assert len(repl['dc1']) == 1
|
|
assert repl['dc2'] == ['rack1']
|
|
|
|
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2'], 'dc2': ['rack1', 'rack2']}}")
|
|
# as above, but decrementing
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}}")
|
|
# as above, but decrement 1 RF and increment the other
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1','rack2'], 'dc2': 0}}")
|
|
# as above, but RFs are swapped
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': ['rack1', 'rack2']}}")
|
|
|
|
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 0}}")
|
|
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0}}")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
|
|
|
|
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
|
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
|
|
|
|
async def check_rf(ks: str, expected_dc1_rf: int, expected_dc2_rf: int):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication)
|
|
logger.info(f"repl = {repl}")
|
|
assert get_replica_count(repl['dc1']) == expected_dc1_rf if expected_dc1_rf > 0 else 'dc1' not in repl
|
|
assert get_replica_count(repl['dc2']) == expected_dc2_rf if expected_dc2_rf > 0 else 'dc2' not in repl
|
|
return repl
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
|
|
|
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
|
|
|
|
with pytest.raises(ConfigurationException, match="Attempted to implicitly drop replicas in datacenter dc2. If this is the desired behavior, set replication factor to 0 in dc2 explicitly."):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
repl = await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
|
|
|
|
dc1_rf = repl['dc1']
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {dc1_rf}, 'dc2': 0}}")
|
|
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=0)
|
|
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
|
|
|
|
await cql.run_async(f"alter keyspace {ks} with durable_writes = true")
|
|
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
async def get_replication_options(ks: str):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
|
return repl
|
|
|
|
injection = "create_with_numeric"
|
|
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [injection]}
|
|
|
|
servers = [await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1a'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1b'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2a'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
|
|
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks2.t (pk int primary key);")
|
|
repl = await get_replication_options("ks2")
|
|
assert repl['dc1'] == '1'
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks3.t (pk int primary key);")
|
|
repl = await get_replication_options("ks3")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks4.t (pk int primary key);")
|
|
repl = await get_replication_options("ks4")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks5.t (pk int primary key);")
|
|
repl = await get_replication_options("ks5")
|
|
assert repl['dc1'] == '2'
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks6.t (pk int primary key);")
|
|
repl = await get_replication_options("ks6")
|
|
assert repl['dc1'] == '2'
|
|
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == ['rack1b']
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
|
assert len(tablet_replicas) == 4
|
|
for r in tablet_replicas:
|
|
assert len(r.replicas) == 1
|
|
assert r.replicas[0][0] == host_ids[1]
|
|
|
|
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
|
repl = await get_replication_options("ks2")
|
|
assert repl['dc1'] == ['rack1a']
|
|
assert len(repl['dc2']) == 2
|
|
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks2", "t")
|
|
assert len(tablet_replicas) == 4
|
|
for r in tablet_replicas:
|
|
assert len(r.replicas) == 3
|
|
ks_replicas = [host_ids[0], host_ids[2], host_ids[3]]
|
|
assert all(replica[0] in ks_replicas for replica in r.replicas)
|
|
assert all(host_id in [r[0] for r in r.replicas] for host_id in ks_replicas)
|
|
|
|
try:
|
|
await cql.run_async("alter keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a']};")
|
|
assert False
|
|
except ConfigurationException:
|
|
pass
|
|
|
|
try:
|
|
await cql.run_async("alter keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b']};")
|
|
assert False
|
|
except ConfigurationException:
|
|
pass
|
|
|
|
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
|
repl = await get_replication_options("ks5")
|
|
assert len(repl['dc1']) == 2
|
|
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
|
repl = await get_replication_options("ks6")
|
|
assert repl['dc1'] == '2'
|
|
assert len(repl['dc2']) == 1
|
|
assert repl['dc2'][0] == 'rack2a'
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_enforce_rack_list_option(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
async def get_replication_options(ks: str):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
|
return repl
|
|
|
|
injection = "create_with_numeric"
|
|
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [injection]}
|
|
|
|
servers = [await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc1', 'rack': 'rack1a'}),
|
|
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc1', 'rack': 'rack1b'}),
|
|
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2a'}),
|
|
await manager.server_add(config=config, cmdline=['--smp=2'], property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async("CREATE KEYSPACE ksv WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2} AND tablets = {'enabled': false}")
|
|
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await manager.server_stop_gracefully(servers[-1].server_id)
|
|
failed = False
|
|
try:
|
|
await manager.server_start(server_id=servers[-1].server_id, wait_others=3, connect_driver=True, cmdline_options_override=["--enforce-rack-list", "true", "--smp", "2"])
|
|
except Exception:
|
|
failed = True
|
|
await manager.server_stop_gracefully(servers[-1].server_id)
|
|
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
|
|
assert failed
|
|
|
|
servers = servers[0:-1]
|
|
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == ['rack1b']
|
|
|
|
logging.info("Rolling restart")
|
|
await manager.rolling_restart(servers, wait_for_cql=True, cmdline_options_override=["--enforce-rack-list", "true", "--error-injections-at-startup", "[]", "--smp", "2"])
|
|
|
|
await cql.run_async(f"create keyspace ks2 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
|
repl = await get_replication_options("ks2")
|
|
assert len(repl['dc1']) == 2
|
|
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
|
|
|
await cql.run_async(f"create keyspace ks3 with replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': 4}};")
|
|
repl = await get_replication_options("ks3")
|
|
assert len(repl['dc1']) == 1
|
|
assert len(repl['dc2']) == 1
|
|
assert 'rack1a' in repl['dc1'] or 'rack1b' in repl['dc1']
|
|
assert 'rack2a' in repl['dc2']
|
|
|
|
await cql.run_async("create keyspace ksv2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1};")
|
|
|
|
failed = False
|
|
try:
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2};")
|
|
except Exception:
|
|
failed = True
|
|
assert failed
|
|
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b'], 'dc2': 1};")
|
|
repl = await get_replication_options("ks1")
|
|
assert len(repl['dc1']) == 1
|
|
assert repl['dc1'][0] == 'rack1b'
|
|
assert len(repl['dc2']) == 1
|
|
assert repl['dc2'][0] == 'rack2a'
|
|
|
|
# dc1 - removed; dc2 - the same value
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 1};")
|
|
|
|
await manager.server_add(config=config, cmdline=["--enforce-rack-list", "true", "--smp", "2"], property_file={'dc': 'dc2', 'rack': 'rack2b'})
|
|
|
|
await cql.run_async("alter keyspace ksv with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1};")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_numeric_rf_to_rack_list_conversion_abort(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
async def get_replication_options(ks: str):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
|
return repl
|
|
|
|
numeric_injection = "create_with_numeric"
|
|
colocation_injection = "wait_with_rack_list_colocation"
|
|
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [numeric_injection, colocation_injection]}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--smp=2',
|
|
]
|
|
|
|
servers = [await manager.server_add(config=config, cmdline=cmdline, property_file={'dc': 'dc1', 'rack': 'rack1a'}),
|
|
await manager.server_add(config=config, cmdline=cmdline, property_file={'dc': 'dc1', 'rack': 'rack1b'})]
|
|
|
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == '1'
|
|
|
|
[await manager.api.disable_injection(s.ip_addr, numeric_injection) for s in servers]
|
|
|
|
coord = await get_topology_coordinator(manager)
|
|
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
|
s1_log = await manager.server_open_log(coord_serv.server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
async def alter_keyspace():
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
|
|
|
alter_task = asyncio.create_task(alter_keyspace())
|
|
|
|
await s1_log.wait_for('In make_rack_list_colocation_plan', from_mark=s1_mark)
|
|
|
|
task_manager_client = TaskManagerClient(manager.api)
|
|
tasks = await task_manager_client.list_tasks(servers[0].ip_addr, "global_topology_requests")
|
|
rf_change_tasks = [t for t in tasks if t.type == "keyspace_rf_change"]
|
|
assert len(rf_change_tasks) == 1
|
|
task_id = rf_change_tasks[0].task_id
|
|
await task_manager_client.abort_task(servers[0].ip_addr, task_id)
|
|
|
|
task = await task_manager_client.wait_for_task(servers[0].ip_addr, task_id)
|
|
assert task.state == "failed"
|
|
|
|
failed = False
|
|
try:
|
|
await alter_task
|
|
except Exception as e:
|
|
failed = True
|
|
assert failed
|
|
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == '1'
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
|
|
# Check that an existing cached read, will be cleaned up when the tablet it reads
|
|
# from is migrated away.
|
|
@pytest.mark.asyncio
|
|
async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
|
|
if build_mode != "release":
|
|
cfg['error_injections_at_startup'] = [{'name': 'querier-cache-ttl-seconds', 'value': 999999999}]
|
|
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH"
|
|
" replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
|
|
" and tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, c int, PRIMARY KEY (pk, ck));")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, ck, c) VALUES (0, {k}, 0);") for k in range(128)])
|
|
|
|
statement = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 0", fetch_size=10)
|
|
cql.execute(statement)
|
|
|
|
def get_querier_cache_population(server):
|
|
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
|
|
pattern = re.compile("^scylla_database_querier_cache_population")
|
|
for metric in metrics.split('\n'):
|
|
if pattern.match(metric) is not None:
|
|
return int(float(metric.split()[1]))
|
|
|
|
assert any(map(lambda x: x > 0, [get_querier_cache_population(server) for server in servers]))
|
|
|
|
table_id = await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'")
|
|
table_id = table_id[0].id
|
|
|
|
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
|
|
tablet_infos = list(tablet_infos)
|
|
|
|
assert len(tablet_infos) == 1
|
|
tablet_info = tablet_infos[0]
|
|
assert len(tablet_info.replicas) == 1
|
|
|
|
hosts = {await manager.get_host_id(server.server_id) for server in servers}
|
|
print(f"HOSTS: {hosts}")
|
|
source_host, source_shard = tablet_info.replicas[0]
|
|
|
|
hosts.remove(str(source_host))
|
|
target_host, target_shard = list(hosts)[0], source_shard
|
|
|
|
await manager.api.move_tablet(
|
|
node_ip=servers[0].ip_addr,
|
|
ks=ks,
|
|
table="test",
|
|
src_host=source_host,
|
|
src_shard=source_shard,
|
|
dst_host=target_host,
|
|
dst_shard=target_shard,
|
|
token=tablet_info.last_token)
|
|
|
|
# The tablet move should have evicted the cached reader.
|
|
assert all(map(lambda x: x == 0, [get_querier_cache_population(server) for server in servers]))
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/19052
|
|
# 1) table A has N tablets and views
|
|
# 2) migration starts for a tablet of A from node 1 to 2.
|
|
# 3) migration is at write_both_read_old stage
|
|
# 4) coordinator will push writes to both nodes
|
|
# 5) A has view, so writes to it will also result in reads (table::push_view_replica_updates())
|
|
# 6) tablet's update_effective_replication_map() is not refreshing tablet sstable set (for new tablet migrating in)
|
|
# 7) so read on step 5 is not being able to find sstable set for tablet migrating in
|
|
@pytest.mark.parametrize("with_cache", ['false', 'true'])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_read_of_pending_replica_during_migration(manager: ManagerClient, with_cache):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--enable-cache', with_cache,
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Drop cache to remove dummy entry indicating that underlying mutation source is empty
|
|
await manager.api.drop_sstable_caches(servers[1].ip_addr)
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
# Release abandoned streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
|
|
# This test checks that --enable-tablets option and the TABLETS parameters of the CQL CREATE KEYSPACE
|
|
# statemement are mutually correct from the "the least surprising behavior" concept. See comments inside
|
|
# the test code for more details.
|
|
@pytest.mark.parametrize("with_tablets", [True, False])
|
|
@pytest.mark.parametrize("replication_strategy", ["NetworkTopologyStrategy", "SimpleStrategy", "EverywhereStrategy", "LocalStrategy"])
|
|
@pytest.mark.asyncio
|
|
async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, with_tablets, replication_strategy):
|
|
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if with_tablets else 'disabled'}
|
|
server = await manager.server_add(config=cfg)
|
|
cql = manager.get_cql()
|
|
|
|
# Tablets are only possible when the replication strategy is NetworkTopology
|
|
tablets_possible = (replication_strategy == 'NetworkTopologyStrategy')
|
|
tablets_enabled_by_default = tablets_possible and with_tablets
|
|
|
|
# First, check if a kesypace is able to be created with default CQL statement that
|
|
# doesn't contain tablets parameters. When possible, tablets should be activated
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}}") as ks:
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
if tablets_enabled_by_default:
|
|
assert res.initial_tablets == 0
|
|
else:
|
|
assert res is None
|
|
|
|
# Next, check that explicit CQL request for enabling tablets can only be satisfied when
|
|
# tablets are possible. Tablets must be activated in this case
|
|
if tablets_possible:
|
|
expectation = does_not_raise()
|
|
else:
|
|
expectation = pytest.raises(ConfigurationException)
|
|
with expectation:
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}}")
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
assert res.initial_tablets == 0
|
|
await cql.run_async(f"drop keyspace {ks}")
|
|
|
|
# Finally, check that explicitly disabling tablets in CQL results in vnode-based keyspace
|
|
# whenever tablets are enabled or not in config
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}}") as ks:
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
assert res is None
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/21564
|
|
1) Create a table with 1 initial tablet and populate it
|
|
2) Create a view on the table but prevent the generator from processing it using error injection
|
|
3) Start migration of the tablet from node 1 to 2
|
|
4) Once migration completes, the view should have the correct number of rows
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'view_building_coordinator=debug',
|
|
'--logger-log-level', 'view_building_worker=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create table, populate it and flush the table to disk")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
num_of_rows = 64
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(num_of_rows)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
logger.info("Starting Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Inject error to make view building worker pause before processing the sstable")
|
|
injection_name = "view_building_worker_pause_before_consume"
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
|
|
|
|
logger.info("Create view")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
logger.info("Migrate the tablet to node 2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
async def check_table():
|
|
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
|
if len(result) == 1:
|
|
return True
|
|
else:
|
|
return None
|
|
await wait_for(check_table, time.time() + 30)
|
|
|
|
# Verify the table has expected number of rows
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == num_of_rows
|
|
# Verify that the view has the expected number of rows
|
|
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
|
|
assert len(list(rows)) == num_of_rows
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/19149
|
|
1) Create a table with 1 initial tablet and populate it
|
|
2) Create a view on the table but prevent the generator
|
|
3) Inject error to prevent processing of new sstables in view generator
|
|
4) Create an sstable, move it into upload directory of test table and start upload
|
|
5) Start migration of the tablet from node 1 to 2
|
|
6) Once migration completes, the view should have the correct number of rows
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'view_building_coordinator=debug',
|
|
'--logger-log-level', 'view_building_worker=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create the test table, populate few rows and flush to disk")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
logger.info("Create view")
|
|
# Pause view building
|
|
await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True)
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
# Check if view building was started
|
|
async def check_mv_status():
|
|
mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'")
|
|
return len(mv_status) == 1 and mv_status[0].status == "STARTED"
|
|
await wait_for(check_mv_status, time.time() + 30)
|
|
|
|
logger.info("Generate an sstable and move it to upload directory of test table")
|
|
# create an sstable using a dummy table
|
|
await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
dummy_table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "dummy-*"))[0]
|
|
test_table_upload_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*", "upload"))[0]
|
|
for src_path in glob.glob(os.path.join(dummy_table_dir, sstable_filename_glob)):
|
|
dst_path = os.path.join(test_table_upload_dir, os.path.basename(src_path))
|
|
os.rename(src_path, dst_path)
|
|
await cql.run_async(f"DROP TABLE {ks}.dummy;")
|
|
|
|
logger.info("Starting Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Inject error to prevent view generator from processing staged sstables")
|
|
injection_name = "view_update_generator_consume_staging_sstable"
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
|
|
|
|
logger.info("Load the sstables from upload directory")
|
|
await manager.api.load_new_sstables(servers[0].ip_addr, ks, "test")
|
|
|
|
# The table now has both staged and unstaged sstables.
|
|
# Verify that tablet migration handles them both without causing any base-view inconsistencies.
|
|
logger.info("Migrate the tablet to node 2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
async def check_view_is_built():
|
|
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
|
if len(result) == 1:
|
|
return True
|
|
else:
|
|
return None
|
|
await wait_for(check_view_is_built, time.time() + 60)
|
|
|
|
expected_num_of_rows = 128
|
|
# Verify the table has expected number of rows
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == expected_num_of_rows
|
|
# Verify that the view has the expected number of rows
|
|
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
|
|
assert len(list(rows)) == expected_num_of_rows
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orphaned_sstables_on_startup(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/18038
|
|
1) Start a node (node1)
|
|
2) Create a table with 1 initial tablet and populate it
|
|
3) Start another node (node2)
|
|
4) Migrate the existing tablet from node1 to node2
|
|
5) Stop node1
|
|
6) Copy the sstables from node2 to node1
|
|
7) Attempting to start node1 should fail as it now has an 'orphaned' sstable
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Create the test table, populate few rows and flush to disk")
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(256)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
node0_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
node0_table_dir = glob.glob(os.path.join(node0_workdir, "data", ks, "test-*"))[0]
|
|
|
|
logger.info("Start Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
await manager.disable_tablet_balancing()
|
|
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
|
|
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Migrate the tablet from node1 to node2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
logger.info("Stop node1 and copy the sstables from node2")
|
|
await manager.server_stop(servers[0].server_id)
|
|
for src_path in glob.glob(os.path.join(node1_table_dir, sstable_filename_glob)):
|
|
dst_path = os.path.join(node0_table_dir, os.path.basename(src_path))
|
|
shutil.copy(src_path, dst_path)
|
|
|
|
# try starting the server again
|
|
logger.info("Start node1 with the orphaned sstables and expect it to fail")
|
|
# Error thrown is of format : "Unable to load SSTable {sstable_name} : Storage wasn't found for tablet {tablet_id} of table {ks}.test"
|
|
await manager.server_start(servers[0].server_id, expected_error="Storage wasn't found for tablet", expected_crash=True)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Reproducer for #21826
|
|
Verify that a node cannot be removed with tablets when
|
|
there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
even when there is a zero-token node in the same datacenter and in another datacenter,
|
|
and when there is another down node in the datacenter, leaving no normal token owners.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
|
extra_node = 0 if with_zero_token_node else 1
|
|
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
node_to_remove = servers['dc1'][0]
|
|
node_to_replace = servers['dc1'][1]
|
|
replaced_host_id = await manager.get_host_id(node_to_replace.server_id)
|
|
initiator_node = servers['dc2'][0]
|
|
|
|
# Stop both token owners in dc1 to leave no token owners in the datacenter
|
|
await manager.server_stop_gracefully(node_to_remove.server_id)
|
|
await manager.server_stop_gracefully(node_to_replace.server_id)
|
|
|
|
logger.info("Attempting removenode - expected to fail")
|
|
|
|
with pytest.raises(Exception, match=r"Removenode failed"):
|
|
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id, ignore_dead=[replaced_host_id])
|
|
|
|
logger.info(f"Replacing {node_to_replace} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
|
|
ignore_dead_nodes=[replaced_host_id])
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_excludenode(manager: ManagerClient):
|
|
"""
|
|
Verifies recovery scenario involving marking the node as excluded using excludenode.
|
|
|
|
1. Create a cluster with 3 racks, 1 node in rack1, 2 nodes in rack2, and 1 in rack3.
|
|
2. The keyspace is initially replicated to rack1 and rack2.
|
|
3. We down one node in rack2, which should cause unavailability.
|
|
4. We mark the node as excluded using excludenode. This unblocks the next ALTER
|
|
5. We add rack3 to RF of the keyspace. This wouldn't succeed without marking the node as excluded.
|
|
6. We verify that downed node can be removed successfully, while there are still tablets on it. That's
|
|
why we need two nodes in rack2.
|
|
"""
|
|
servers = await manager.servers_add(servers_num=3, auto_rack_dc='dc1')
|
|
await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack2'})
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', "
|
|
"'dc1': ['rack1', 'rack2']} AND tablets = { 'initial': 8 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
live_node = servers[0]
|
|
node_to_remove = servers[1]
|
|
with pytest.raises(Exception, match="Cannot mark host .* as excluded because it's alive"):
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
|
|
|
with pytest.raises(Exception, match=".* does not belong to this cluster"):
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[str(uuid.uuid4())])
|
|
|
|
await manager.server_stop(node_to_remove.server_id)
|
|
await manager.others_not_see_server(node_to_remove.ip_addr)
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
|
|
|
# Check that tablets can be rebuilt in a new rack with rack2 down.
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH REPLICATION = {{ 'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2', 'rack3']}}")
|
|
|
|
# Check that removenode succeeds on the node which is excluded
|
|
await manager.remove_node(live_node.server_id, server_id=node_to_remove.server_id)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Verify that a node cannot be removed with tablets when
|
|
there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
even when there is a zero-token node in the same datacenter and in another datacenter.
|
|
And then verify that that node can be replaced successfully.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
servers['dc2'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
node_to_remove = servers['dc1'][0]
|
|
initiator_node = servers['dc2'][0]
|
|
|
|
await manager.server_stop_gracefully(node_to_remove.server_id)
|
|
|
|
logger.info("Attempting removenode - expected to fail")
|
|
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id,
|
|
expected_error="Removenode failed")
|
|
|
|
logger.info(f"Replacing {node_to_remove} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.nightly
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Verify that nodes can be successfully replaced with tablets when
|
|
even when there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
with and without zero-token nodes in the same datacenter and in another datacenter,
|
|
and when there is another down node in the datacenter, leaving no normal token owners,
|
|
but other datacenters can be used to rebuild the data.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
|
extra_node = 0 if with_zero_token_node else 1
|
|
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
|
stmt.consistency_level = ConsistencyLevel.ALL
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
|
|
|
|
nodes_to_replace = servers['dc1'][0:2]
|
|
replaced_host_id = await manager.get_host_id(nodes_to_replace[1].server_id)
|
|
|
|
# Stop both token owners in dc1 to leave no token owners in the datacenter
|
|
for node in nodes_to_replace:
|
|
await manager.server_stop_gracefully(node.server_id)
|
|
|
|
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
|
|
ignore_dead_nodes=[replaced_host_id])
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[0].property_file())
|
|
|
|
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[1].property_file())
|
|
|
|
logger.info("Verifying data")
|
|
for node in servers['dc2']:
|
|
await manager.server_stop_gracefully(node.server_id)
|
|
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
|
|
rows = await cql.run_async(query)
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
# For dropping the keyspace
|
|
await asyncio.gather(*[manager.server_start(node.server_id) for node in servers['dc2']])
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_drop_keyspace_while_split(manager: ManagerClient):
|
|
|
|
# Reproducer for: https://github.com/scylladb/scylladb/issues/22431
|
|
# This tests if the split ready compaction groups are correctly created
|
|
# on a shard with several storage groups for the same table
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [ '--target-tablet-size-in-bytes', '8192',
|
|
'--smp', '2' ]
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = [await manager.server_add(config=config, cmdline=cmdline)]
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
# create a table so that it has at least 2 tablets (and storage groups) per shard
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(2048)
|
|
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'truncate_compaction_disabled_wait', one_shot=False)
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
|
|
|
|
# enable the load balancer which should emmit a tablet split
|
|
await manager.enable_tablet_balancing()
|
|
|
|
# wait for compaction groups to be created and split to begin
|
|
await s0_log.wait_for('split_storage_groups_wait: wait')
|
|
|
|
# start a DROP and wait for it to disable compaction
|
|
drop_ks_task = cql.run_async(f'DROP KEYSPACE {ks};')
|
|
await s0_log.wait_for('truncate_compaction_disabled_wait: wait')
|
|
|
|
# release split
|
|
await manager.api.message_injection(servers[0].ip_addr, "split_storage_groups_wait")
|
|
|
|
# release drop and wait for it to complete
|
|
await manager.api.message_injection(servers[0].ip_addr, "truncate_compaction_disabled_wait")
|
|
await drop_ks_task
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/25706
|
|
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True }
|
|
cmdline = ['--smp', '2' ]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
# We don't want the load balancer to migrate tablets during the test
|
|
await manager.disable_tablet_balancing()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
# Create the table, insert data and flush
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(100)])
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# Get the current location of the tablet
|
|
token = 0
|
|
replica = await get_tablet_replica(manager, server, ks, "test", token)
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "wait_before_stop_compaction_groups", one_shot=True)
|
|
await manager.api.enable_injection(server.ip_addr, "truncate_compaction_disabled_wait", one_shot=True)
|
|
|
|
slog = await manager.server_open_log(server.server_id)
|
|
smark = await slog.mark()
|
|
|
|
# Start migrating the tablet
|
|
dst_shard = 1 if replica[0] == 0 else 1
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], replica[1], replica[0], dst_shard, token))
|
|
|
|
# Wait until the leaving replica is about to be cleaned up.
|
|
# storage_group's gate has been closed, but the compaction groups have not yet been stopped and disabled
|
|
await slog.wait_for("wait_before_stop_compaction_groups: wait", from_mark=smark)
|
|
|
|
# Start dropping the table
|
|
drop_future = cql.run_async(f"DROP TABLE {ks}.test;")
|
|
|
|
# Wait for truncate to complete disabling compaction
|
|
await slog.wait_for("truncate_compaction_disabled_wait: wait", from_mark=smark)
|
|
|
|
# Release the migration's tablet cleanup
|
|
await manager.api.message_injection(server.ip_addr, "wait_before_stop_compaction_groups")
|
|
|
|
# Release drop/truncate
|
|
await manager.api.message_injection(server.ip_addr, "truncate_compaction_disabled_wait")
|
|
await drop_future
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_two_tablets_concurrent_repair_and_migration(manager: ManagerClient):
|
|
injection = "repair_shard_repair_task_impl_do_repair_ranges"
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
all_replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
|
|
all_replicas.sort(key=lambda x: x.last_token)
|
|
assert len(all_replicas) >= 3
|
|
repair_replicas = all_replicas[1]
|
|
migration_replicas = all_replicas[0]
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
async def repair_task():
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", repair_replicas.last_token)
|
|
|
|
async def migration_task():
|
|
await wait_for_first_completed([log.wait_for('Started to repair', from_mark=mark) for log, mark in zip(logs, marks)])
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
|
|
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await asyncio.gather(repair_task(), migration_task())
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_split_finalization_with_migrations(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/21762
|
|
1) Start a cluster with two nodes with error injected to prevent resize finalisatio
|
|
2) Create and populate `test` table
|
|
3) Trigger a split in the table by increasing `min_tablet_count`
|
|
4) Wait for the table `test` to reach split finalization stage
|
|
5) Create and populate another table `blocker`
|
|
6) Disable tablet balancing and move all tablets of `test` and `blocker` table from node 2 to node 1
|
|
7) Enable tablet balancing and disable error injection to allow migration and finalisation to proceed.
|
|
8) Expect finalization in `test` to be preferred over migrations in both tables
|
|
"""
|
|
logger.info("Starting Cluster")
|
|
cfg = {
|
|
'enable_user_defined_functions': False, 'enable_tablets': True,
|
|
'error_injections_at_startup': [
|
|
# intially disable transitioning into tablet_resize_finalization topology state
|
|
'tablet_split_finalization_postpone',
|
|
],
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg)
|
|
|
|
logger.info("Create and populate test table")
|
|
cql = manager.get_cql()
|
|
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
|
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, "test")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
|
|
test_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'test'"))[0].id
|
|
|
|
logger.info("Trigger split in table")
|
|
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
|
|
|
|
# Wait for splits to finalise; they don't execute yet as they are prevented by the error injection
|
|
logger.info("Wait for tablets to split")
|
|
log = await manager.server_open_log(servers[0].server_id)
|
|
await log.wait_for(f"Finalizing resize decision for table {test_table_id} as all replicas agree on sequence number 1")
|
|
|
|
logger.info("Create and populate `blocker` table")
|
|
await cql.run_async("CREATE TABLE test.blocker (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.blocker (pk, c) VALUES ({k}, {k%3});") for k in range(128)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "blocker")
|
|
blocker_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'blocker'"))[0].id
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
for cf in ["test", "blocker"]:
|
|
logger.info(f"Move all tablets of test.{cf} from Node 2 to Node 1")
|
|
await manager.disable_tablet_balancing()
|
|
s1_replicas = await get_all_tablet_replicas(manager, servers[1], "test", cf)
|
|
migration_tasks = [
|
|
manager.api.move_tablet(servers[0].ip_addr, "test", cf,
|
|
tablet.replicas[0][0], tablet.replicas[0][1],
|
|
s0_host_id, 0, tablet.last_token)
|
|
for tablet in s1_replicas
|
|
]
|
|
await asyncio.gather(*migration_tasks)
|
|
|
|
logger.info("Re-enable tablet balancing; it should be blocked by pending split finalization")
|
|
await manager.enable_tablet_balancing()
|
|
mark, _ = await log.wait_for("Setting tablet balancing to true")
|
|
|
|
logger.info("Unblock resize finalisation and verify that the finalisation is preferred over migrations")
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
split_finalization_mark, _ = await log.wait_for("Finished tablet resize finalization", from_mark=mark)
|
|
for table_id in [test_table_id, blocker_table_id]:
|
|
migration_mark, _ = await log.wait_for(f"Will set tablet {table_id}:\\d+ stage to write_both_read_old", from_mark=mark)
|
|
assert split_finalization_mark < migration_mark, f"Tablet migration of {table_id} was scheduled before resize finalization"
|
|
|
|
# ensure all migrations complete
|
|
logger.info("Waiting for migrations to complete")
|
|
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):
|
|
injection = "repair_writer_impl_create_writer_wait"
|
|
cmdline = [
|
|
'--logger-log-level', 'repair=debug',
|
|
'--hinted-handoff-enabled', '0',
|
|
]
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
|
|
|
|
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
async def insert_with_down(down_server):
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
|
|
|
|
cql = await safe_rolling_restart(manager, [servers[0]], with_down=insert_with_down)
|
|
|
|
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
|
|
|
|
all_replicas = await get_all_tablet_replicas(manager, servers[1], ks, "test")
|
|
migration_replicas = all_replicas[0]
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
async def repair_task():
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test2", token="all")
|
|
|
|
async def migration_task():
|
|
await wait_for_first_completed([log.wait_for(f'repair_writer: keyspace={ks}', from_mark=mark) for log, mark in zip(logs, marks)])
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
|
|
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await asyncio.gather(repair_task(), migration_task())
|
|
|
|
async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'enable_tablets': True}
|
|
if fail:
|
|
cfg['error_injections_at_startup'] = ['rebuild_repair_stage_fail']
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
|
|
await make_server("r1")
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
for h in host_ids:
|
|
if h not in replicas:
|
|
new_replica = (h, 0)
|
|
break
|
|
else:
|
|
assert False, "Cannot find node without replica"
|
|
|
|
logs = []
|
|
for s in servers:
|
|
logs.append(await manager.server_open_log(s.server_id))
|
|
|
|
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
|
|
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
|
|
|
|
assert sum([len(await log.grep(rf'.*Will set tablet .* stage to rebuild_repair.*')) for log in logs]) == 1
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
|
|
assert len(replicas) == 2 if fail else len(replicas) == 3
|
|
|
|
for h, s in zip(host_ids, servers):
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
if h != host_ids[0]:
|
|
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
|
|
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
|
|
logger.info(f"Host {h} reports {res} as mutation fragments count")
|
|
if h in replicas:
|
|
assert res[0].count != 0
|
|
else:
|
|
assert res[0].count == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_rebuild(manager: ManagerClient):
|
|
await check_tablet_rebuild_with_repair(manager, False)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_rebuild_failure(manager: ManagerClient):
|
|
await check_tablet_rebuild_with_repair(manager, True)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_repair_with_invalid_session_id(manager: ManagerClient):
|
|
injection = "handle_tablet_migration_repair_random_session"
|
|
token = -1
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
|
|
|
matches = [await log.grep(r"std::runtime_error \(Session not found", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_moving_replica_to_replica(manager: ManagerClient):
|
|
"""
|
|
Verify that trying to move a tablet replica to a node that is already
|
|
a replica is prevented with an appropriate error message.
|
|
"""
|
|
|
|
ks = "ks"
|
|
table = "my_table"
|
|
|
|
# For convenience when moving tablets.
|
|
cmdline = ["--smp=1"]
|
|
s1, s2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
host_id1 = await manager.get_host_id(s1.server_id)
|
|
host_id2 = await manager.get_host_id(s2.server_id)
|
|
|
|
cql, _ = await manager.get_ready_cql([s1, s2])
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} "
|
|
f"AND tablets = {{'enabled': true, 'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
|
|
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
|
|
|
|
# Doesn't matter. There's only one tablet.
|
|
tablet_token = 0
|
|
|
|
with pytest.raises(Exception, match=rf"Tablet .* has replica on {host_id2}"):
|
|
await manager.api.move_tablet(
|
|
node_ip=s1.ip_addr,
|
|
ks=ks,
|
|
table=table,
|
|
src_host=host_id1,
|
|
src_shard=0,
|
|
dst_host=host_id2,
|
|
dst_shard=0,
|
|
token=tablet_token)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_moving_replica_within_single_rack(manager: ManagerClient):
|
|
"""
|
|
Verify that it's possible to move a tablet from a replica node to a node
|
|
that's not a replica.
|
|
"""
|
|
|
|
ks = "ks"
|
|
table = "my_table"
|
|
|
|
# For convenience when moving tablets.
|
|
cmdline = ["--smp=1"]
|
|
s1, s2 = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "r1"})
|
|
await manager.disable_tablet_balancing()
|
|
|
|
host_id1 = await manager.get_host_id(s1.server_id)
|
|
host_id2 = await manager.get_host_id(s2.server_id)
|
|
|
|
cql, _ = await manager.get_ready_cql([s1, s2])
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} "
|
|
f"AND tablets = {{'enabled': true, 'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
|
|
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
|
|
|
|
# Doesn't matter. There's only one tablet.
|
|
tablet_token = 0
|
|
host_id, _ = await get_tablet_replica(manager, s1, ks, table, tablet_token)
|
|
|
|
if host_id != host_id1:
|
|
s1, s2 = s2, s1
|
|
host_id1, host_id2 = host_id2, host_id1
|
|
|
|
await manager.api.move_tablet(
|
|
node_ip=s1.ip_addr,
|
|
ks=ks,
|
|
table=table,
|
|
src_host=host_id1,
|
|
src_shard=0,
|
|
dst_host=host_id2,
|
|
dst_shard=0,
|
|
token=tablet_token)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_disabling_balancing_preempts_balancer(manager: ManagerClient):
|
|
servers = await manager.servers_add(2, auto_rack_dc="dc1")
|
|
coord_srv = servers[0]
|
|
await manager.api.enable_injection(coord_srv.ip_addr, "tablet_allocator_shuffle", one_shot=False)
|
|
await manager.api.enable_injection(coord_srv.ip_addr, "tablet_keep_repairing", one_shot=False)
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy'}}") as ks:
|
|
cql = manager.get_cql()
|
|
log = await manager.server_open_log(coord_srv.server_id)
|
|
mark = await log.mark()
|
|
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await log.wait_for('Initiating tablet', from_mark=mark)
|
|
|
|
# Should preempt balancing
|
|
await manager.disable_tablet_balancing()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_table_creation_wakes_up_balancer(manager: ManagerClient):
|
|
"""
|
|
Reproduces both https://github.com/scylladb/scylladb/issues/25163 and https://github.com/scylladb/scylladb/issues/27958
|
|
|
|
Scenario:
|
|
1. Start a cluster
|
|
2. Block the topology coordinator right before it goes to sleep
|
|
3. Create a table, which should wake up the coordinator
|
|
4. Verify that the coordinator didn't go to sleep
|
|
"""
|
|
server = await manager.server_add()
|
|
log = await manager.server_open_log(server.server_id)
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
|
|
# Block coordinator right before going to sleep
|
|
# We use node bootstrap as an operation which is going to be trapped on exit, but it's arbitrary.
|
|
mark = await log.mark()
|
|
await manager.api.enable_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep', one_shot=True)
|
|
await manager.server_add()
|
|
await log.wait_for('wait-before-topology-coordinator-goes-to-sleep: wait', from_mark=mark)
|
|
|
|
# Create a table, which should prevent the coordinator from sleeping
|
|
await manager.api.enable_injection(server.ip_addr, 'wait-after-topology-coordinator-gets-event', one_shot=True)
|
|
mark = await log.mark()
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# Verify it didn't go to sleep. If it did, the wait for wakeup would be 30s on average,
|
|
# up to stats refresh period, which is 60s. So use a small timeout.
|
|
await manager.api.message_injection(server.ip_addr, 'wait-before-topology-coordinator-goes-to-sleep')
|
|
await log.wait_for('wait-after-topology-coordinator-gets-event: wait', from_mark=mark, timeout=5)
|