1619 lines
85 KiB
Python
1619 lines
85 KiB
Python
#
|
|
# Copyright (C) 2024-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import uuid
|
|
|
|
from cassandra.protocol import ConfigurationException, InvalidRequest, SyntaxException
|
|
from cassandra.query import SimpleStatement, ConsistencyLevel
|
|
from test.cluster.test_tablets2 import safe_rolling_restart
|
|
from test.pylib.internal_types import ServerInfo
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.repair import create_table_insert_data_for_repair
|
|
from test.pylib.rest_client import HTTPError, read_barrier
|
|
from test.pylib.scylla_cluster import ReplaceConfig
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
|
from test.pylib.util import unique_name, wait_for, wait_for_first_completed
|
|
from test.cluster.conftest import skip_mode
|
|
from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, \
|
|
get_topology_coordinator, parse_replication_options, get_replication, get_replica_count
|
|
from contextlib import nullcontext as does_not_raise
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import asyncio
|
|
import re
|
|
import requests
|
|
import random
|
|
import os
|
|
import glob
|
|
import shutil
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# The glob below is designed to match the version-generation-format-component.extension format, e.g.
|
|
# da-3gqu_1hke_4919c2kfgur9y2bm77-bti-Data.db
|
|
# me-1-big-TOC.txt
|
|
sstable_filename_glob = "??-*-???-*.*"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
# This test verifies that Scylla rejects creating a table if there are too few token-owning nodes.
|
|
# That means that a keyspace must already be in place, but that's impossible with RF-rack-valid
|
|
# keyspaces being enforced. We could go over this constraint by creating 3 nodes and then
|
|
# decommissioning one of them before attempting to create a table, but if we decide to constraint
|
|
# decommission later on, this test will have to be modified again. Let's simply disable the option.
|
|
cfg = cfg | {'rf_rack_valid_keyspaces': False}
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
res = await cql.run_async("SELECT data_center FROM system.local")
|
|
this_dc = res[0].data_center
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 3}}") as ks:
|
|
with pytest.raises(ConfigurationException, match=f"Datacenter {this_dc} doesn't have enough token-owning nodes"):
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_scaling_option_is_respected(manager: ManagerClient):
|
|
# 32 is high enough to ensure we demand more tablets than the default choice.
|
|
cfg = {'tablets_mode_for_new_keyspaces': 'enabled', 'tablets_initial_scale_factor': 32}
|
|
servers = await manager.servers_add(1, config=cfg, cmdline=['--smp', '2'])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
|
|
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
|
|
|
tablets = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
|
|
assert len(tablets) == 64
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_cannot_decommision_below_replication_factor(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(4, config=cfg, property_file=[
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r2"},
|
|
{"dc": "dc1", "rack": "r3"}
|
|
])
|
|
|
|
logger.info("Creating table")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
logger.info("Decommission some node")
|
|
await manager.decommission_node(servers[0].server_id)
|
|
|
|
with pytest.raises(HTTPError, match="Decommission failed"):
|
|
logger.info("Decommission another node")
|
|
await manager.decommission_node(servers[1].server_id)
|
|
|
|
# Three nodes should still provide CL=3
|
|
logger.info("Checking table")
|
|
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.THREE)
|
|
rows = await cql.run_async(query)
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
async def test_reshape_with_tablets(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
server = (await manager.servers_add(1, config=cfg, cmdline=['--smp', '1', '--logger-log-level', 'compaction=debug']))[0]
|
|
|
|
logger.info("Creating table")
|
|
cql = manager.get_cql()
|
|
number_of_tablets = 2
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': {number_of_tablets} }}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Disabling autocompaction for the table")
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, "test")
|
|
|
|
logger.info("Populating table")
|
|
loop_count = 32
|
|
for _ in range(loop_count):
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(64)])
|
|
await manager.api.keyspace_flush(server.ip_addr, ks, "test")
|
|
# After populating the table, expect loop_count number of sstables per tablet
|
|
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
|
|
assert len(sstable_info[0]['sstables']) == number_of_tablets * loop_count
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
mark = await log.mark()
|
|
|
|
# Restart the server and verify that the sstables have been reshaped down to one sstable per tablet
|
|
logger.info("Restart the server")
|
|
await manager.server_restart(server.server_id)
|
|
await reconnect_driver(manager)
|
|
|
|
await log.wait_for(f"Reshape {ks}.test .* Reshaped 32 sstables to .*", from_mark=mark, timeout=30)
|
|
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
|
|
assert len(sstable_info[0]['sstables']) == number_of_tablets
|
|
|
|
|
|
@pytest.mark.parametrize("direction", ["up", "down", "none"])
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_rf_change(manager: ManagerClient, direction):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(2, config=cfg, auto_rack_dc="dc1")
|
|
for s in servers:
|
|
await manager.api.disable_tablet_balancing(s.ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
res = await cql.run_async("SELECT data_center FROM system.local")
|
|
this_dc = res[0].data_center
|
|
|
|
if direction == 'up':
|
|
rf_from_opt = "['rack1']"
|
|
rf_to_opt = "['rack1', 'rack2']"
|
|
rf_from = 1
|
|
rf_to = 2
|
|
if direction == 'down':
|
|
rf_from_opt = "['rack1', 'rack2']"
|
|
rf_to_opt = "['rack1']"
|
|
rf_from = 2
|
|
rf_to = 1
|
|
if direction == 'none':
|
|
rf_from_opt = "['rack1']"
|
|
rf_to_opt = "['rack1']"
|
|
rf_from = 1
|
|
rf_to = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from_opt}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.test_mv AS SELECT pk FROM {ks}.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
|
|
|
|
async def check_allocated_replica(expected: int):
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], ks, 'test_mv', is_view=True)
|
|
for r in replicas:
|
|
logger.info(f"{r.replicas}")
|
|
assert len(r.replicas) == expected
|
|
|
|
logger.info(f"Checking {rf_from} allocated replicas")
|
|
await check_allocated_replica(rf_from)
|
|
|
|
logger.info(f"Altering RF {rf_from} -> {rf_to}")
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_to_opt}}}")
|
|
|
|
logger.info(f"Checking {rf_to} re-allocated replicas")
|
|
await check_allocated_replica(rf_to)
|
|
|
|
if direction != 'up':
|
|
# Don't check fragments for up/none changes, scylla crashes when checking nodes
|
|
# that (validly) miss the replica, see scylladb/scylladb#18786
|
|
return
|
|
|
|
fragments = { pk: set() for pk in random.sample(range(128), 17) }
|
|
for s in servers:
|
|
host_id = await manager.get_host_id(s.server_id)
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
await read_barrier(manager.api, s.ip_addr) # scylladb/scylladb#18199
|
|
for k in fragments:
|
|
res = await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
|
|
for fragment in res:
|
|
if fragment.partition_region == 0: # partition start
|
|
fragments[k].add(host_id)
|
|
logger.info("Checking fragments")
|
|
for k in fragments:
|
|
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClient):
|
|
"""Check that MUTATION_FRAGMENTS() queries handle the case when a partition
|
|
not owned by the node is attempted to be read."""
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'tablets_mode_for_new_keyspaces': 'enabled' }
|
|
servers = await manager.servers_add(3, config=cfg, property_file=[
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r1"},
|
|
{"dc": "dc1", "rack": "r2"}
|
|
])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
|
|
|
|
for s in servers:
|
|
host_id = await manager.get_host_id(s.server_id)
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
for k in range(4):
|
|
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
|
|
|
|
|
|
# The test checks that describe_ring and range_to_address_map API return
|
|
# information that's consistent with system.tablets contents
|
|
@pytest.mark.parametrize("endpoint", ["describe_ring", "range_to_endpoint", "tokens_endpoint"])
|
|
@pytest.mark.asyncio
|
|
async def test_tablets_api_consistency(manager: ManagerClient, endpoint):
|
|
servers = []
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack1'})
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack2'})
|
|
servers += await manager.servers_add(2, property_file={'dc': f'dc1', 'rack': 'rack3'})
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
hosts = { await manager.get_host_id(s.server_id): s.ip_addr for s in servers }
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH TABLETS = {{'min_tablet_count': 4}};")
|
|
|
|
def columnar(lst):
|
|
return f"\n {'\n '.join([f'{x}' for x in lst])}"
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
|
|
logger.info(f'system.tablets: {columnar(replicas)}')
|
|
|
|
if endpoint == 'describe_ring':
|
|
ring_info = await manager.api.describe_ring(servers[0].ip_addr, ks, "test")
|
|
logger.info(f'api.describe_ring: {columnar(ring_info)}')
|
|
api_data = [ { 'token': x['end_token'], 'nodes': set(x['endpoints']) } for x in ring_info ]
|
|
elif endpoint == 'range_to_endpoint':
|
|
rte_info = await manager.api.range_to_endpoint_map(servers[0].ip_addr, ks, "test")
|
|
rte_info.sort(key = lambda x: int(x['key'][1])) # sort by end token
|
|
logger.info(f'api.range_to_endpoint_map: {columnar(rte_info)}')
|
|
api_data = [ { 'token': x['key'][1], 'nodes': set(x['value']) } for x in rte_info ]
|
|
elif endpoint == 'tokens_endpoint':
|
|
te_info = await manager.api.tokens_endpoint(servers[0].ip_addr, ks, "test")
|
|
logger.info(f'api.tokens_endpoint: {columnar(te_info)}')
|
|
api_data = [ { 'token': x['key'], 'node': x['value'] } for x in te_info ]
|
|
else:
|
|
raise RuntimeError('invalid endpoint parameter')
|
|
|
|
assert len(replicas) == len(api_data), f"{endpoint} returned wrong number of ranges"
|
|
for x, rep in zip(api_data, replicas):
|
|
assert x['token'] == f'{rep.last_token}'
|
|
nodes = set([hosts[r[0]] for r in rep.replicas])
|
|
if 'nodes' in x:
|
|
assert x['nodes'] == nodes
|
|
elif 'node' in x:
|
|
assert x['node'] in nodes
|
|
else:
|
|
raise RuntimeError('api_data is badly prepared')
|
|
|
|
|
|
# ALTER KEYSPACE cannot change the replication factor by more than 1 at a time.
|
|
# That provides us with a guarantee that the old and the new QUORUM overlap.
|
|
# In this test, we verify that in a simple scenario with one DC. We explicitly disable
|
|
# enforcing RF-rack-valid keyspaces to be able to perform more flexible alterations.
|
|
@pytest.mark.asyncio
|
|
async def test_singledc_alter_tablets_rf(manager: ManagerClient):
|
|
await manager.server_add(config={"rf_rack_valid_keyspaces": "false", "enable_tablets": "true"}, property_file={"dc": "dc1", "rack": "r1"})
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
|
|
async def change_rf(rf):
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {rf}}}")
|
|
|
|
await change_rf(2) # Increasing the RF by 1 should be OK.
|
|
await change_rf(3) # Increasing the RF by 1 again should also be OK.
|
|
await change_rf(3) # Setting the same RF shouldn't cause any problems.
|
|
await change_rf(4) # Increasing the RF by 1 again should still be OK.
|
|
await change_rf(3) # Decreasing the RF by 1 should be OK.
|
|
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(5) # Trying to increase the RF by 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(1) # Trying to decrease the RF by 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(10) # Trying to increase the RF by more than 2 should fail.
|
|
with pytest.raises(InvalidRequest):
|
|
await change_rf(0) # Trying to decrease the RF by more than 2 should fail.
|
|
|
|
|
|
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
|
|
# In a multi-dc environment, we can create replicas in a DC that didn't have replicas before,
|
|
# but the above requirement should still be honoured, because we'd be changing RF from 0 to N in the new DC.
|
|
# Reproduces https://github.com/scylladb/scylladb/issues/20039#issuecomment-2271365060
|
|
# See also `test_singledc_alter_tablets_rf` above for basic scenarios tested
|
|
@pytest.mark.asyncio
|
|
async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
|
|
|
|
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
|
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
|
|
# need to create a table to not change only the schema, but also tablets replicas
|
|
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
# changing RF of dc2 from 0 to 2 should fail
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1', 'rack2']}}")
|
|
|
|
# changing RF of dc2 from 0 to 1 should succeed
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': ['rack1']}}")
|
|
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
|
|
repl = get_replication(cql, ks)
|
|
logger.info(f"repl = {repl}")
|
|
assert len(repl['dc1']) == 1
|
|
assert repl['dc2'] == ['rack1']
|
|
|
|
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2'], 'dc2': ['rack1', 'rack2']}}")
|
|
# as above, but decrementing
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}}")
|
|
# as above, but decrement 1 RF and increment the other
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1','rack2'], 'dc2': 0}}")
|
|
# as above, but RFs are swapped
|
|
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': ['rack1', 'rack2']}}")
|
|
|
|
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 0}}")
|
|
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0}}")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_alter_tablets_rf_dc_drop(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
config = {"endpoint_snitch": "GossipingPropertyFileSnitch", "tablets_mode_for_new_keyspaces": "enabled"}
|
|
|
|
logger.info("Creating a new cluster of 2 nodes in 1st DC and 2 nodes in 2nd DC")
|
|
# we have to have at least 2 nodes in each DC if we want to try setting RF to 2 in each DC
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc1")
|
|
await manager.servers_add(2, config=config, auto_rack_dc="dc2")
|
|
|
|
async def check_rf(ks: str, expected_dc1_rf: int, expected_dc2_rf: int):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication)
|
|
logger.info(f"repl = {repl}")
|
|
assert get_replica_count(repl['dc1']) == expected_dc1_rf if expected_dc1_rf > 0 else 'dc1' not in repl
|
|
assert get_replica_count(repl['dc2']) == expected_dc2_rf if expected_dc2_rf > 0 else 'dc2' not in repl
|
|
return repl
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
await cql.run_async(f"create table {ks}.t (pk int primary key)")
|
|
|
|
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
|
|
|
|
with pytest.raises(ConfigurationException, match="Attempted to implicitly drop replicas in datacenter dc2. If this is the desired behavior, set replication factor to 0 in dc2 explicitly."):
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
repl = await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=1)
|
|
|
|
dc1_rf = repl['dc1']
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': {dc1_rf}, 'dc2': 0}}")
|
|
await check_rf(ks=ks, expected_dc1_rf=1, expected_dc2_rf=0)
|
|
|
|
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
|
|
|
|
await cql.run_async(f"alter keyspace {ks} with durable_writes = true")
|
|
await check_rf(ks=ks, expected_dc1_rf=2, expected_dc2_rf=0)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_numeric_rf_to_rack_list_conversion(request: pytest.FixtureRequest, manager: ManagerClient) -> None:
|
|
async def get_replication_options(ks: str):
|
|
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
|
repl = parse_replication_options(res[0].replication_v2 or res[0].replication)
|
|
return repl
|
|
|
|
injection = "create_with_numeric"
|
|
config = {"tablets_mode_for_new_keyspaces": "enabled", "error_injections_at_startup": [injection]}
|
|
|
|
servers = [await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1a'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc1', 'rack': 'rack1b'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2a'}),
|
|
await manager.server_add(config=config, property_file={'dc': 'dc2', 'rack': 'rack2b'})]
|
|
|
|
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await cql.run_async(f"create keyspace ks1 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks1.t (pk int primary key);")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async("create keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 2} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks2.t (pk int primary key);")
|
|
repl = await get_replication_options("ks2")
|
|
assert repl['dc1'] == '1'
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async("create keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks3.t (pk int primary key);")
|
|
repl = await get_replication_options("ks3")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async("create keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1} and tablets = {'initial': 4};")
|
|
await cql.run_async("create table ks4.t (pk int primary key);")
|
|
repl = await get_replication_options("ks4")
|
|
assert repl['dc1'] == '1'
|
|
|
|
await cql.run_async(f"create keyspace ks5 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks5.t (pk int primary key);")
|
|
repl = await get_replication_options("ks5")
|
|
assert repl['dc1'] == '2'
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async(f"create keyspace ks6 with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} and tablets = {{'initial': 4}};")
|
|
await cql.run_async("create table ks6.t (pk int primary key);")
|
|
repl = await get_replication_options("ks6")
|
|
assert repl['dc1'] == '2'
|
|
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await cql.run_async("alter keyspace ks1 with replication = {'class': 'NetworkTopologyStrategy', 'dc1': ['rack1b']};")
|
|
repl = await get_replication_options("ks1")
|
|
assert repl['dc1'] == ['rack1b']
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks1", "t")
|
|
assert len(tablet_replicas) == 4
|
|
for r in tablet_replicas:
|
|
assert len(r.replicas) == 1
|
|
assert r.replicas[0][0] == host_ids[1]
|
|
|
|
await cql.run_async("alter keyspace ks2 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a', 'rack2b']};")
|
|
repl = await get_replication_options("ks2")
|
|
assert repl['dc1'] == ['rack1a']
|
|
assert len(repl['dc2']) == 2
|
|
assert 'rack2a' in repl['dc2'] and 'rack2b' in repl['dc2']
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], "ks2", "t")
|
|
assert len(tablet_replicas) == 4
|
|
for r in tablet_replicas:
|
|
assert len(r.replicas) == 3
|
|
ks_replicas = [host_ids[0], host_ids[2], host_ids[3]]
|
|
assert all(replica[0] in ks_replicas for replica in r.replicas)
|
|
assert all(host_id in [r[0] for r in r.replicas] for host_id in ks_replicas)
|
|
|
|
try:
|
|
await cql.run_async("alter keyspace ks3 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a'], 'dc2' : ['rack2a']};")
|
|
assert False
|
|
except ConfigurationException:
|
|
pass
|
|
|
|
try:
|
|
await cql.run_async("alter keyspace ks4 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b']};")
|
|
assert False
|
|
except ConfigurationException:
|
|
pass
|
|
|
|
await cql.run_async("alter keyspace ks5 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : ['rack1a', 'rack1b'], 'dc2' : 2};")
|
|
repl = await get_replication_options("ks5")
|
|
assert len(repl['dc1']) == 2
|
|
assert 'rack1a' in repl['dc1'] and 'rack1b' in repl['dc1']
|
|
assert repl['dc2'] == '2'
|
|
|
|
await cql.run_async("alter keyspace ks6 with replication = {'class': 'NetworkTopologyStrategy', 'dc1' : 2, 'dc2' : ['rack2a']};")
|
|
repl = await get_replication_options("ks6")
|
|
assert repl['dc1'] == '2'
|
|
assert len(repl['dc2']) == 1
|
|
assert repl['dc2'][0] == 'rack2a'
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
|
|
# Check that an existing cached read, will be cleaned up when the tablet it reads
|
|
# from is migrated away.
|
|
@pytest.mark.asyncio
|
|
async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode):
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
|
|
if build_mode != "release":
|
|
cfg['error_injections_at_startup'] = [{'name': 'querier-cache-ttl-seconds', 'value': 999999999}]
|
|
|
|
servers = await manager.servers_add(2, config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH"
|
|
" replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
|
|
" and tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, c int, PRIMARY KEY (pk, ck));")
|
|
|
|
logger.info("Populating table")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, ck, c) VALUES (0, {k}, 0);") for k in range(128)])
|
|
|
|
statement = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 0", fetch_size=10)
|
|
cql.execute(statement)
|
|
|
|
def get_querier_cache_population(server):
|
|
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
|
|
pattern = re.compile("^scylla_database_querier_cache_population")
|
|
for metric in metrics.split('\n'):
|
|
if pattern.match(metric) is not None:
|
|
return int(float(metric.split()[1]))
|
|
|
|
assert any(map(lambda x: x > 0, [get_querier_cache_population(server) for server in servers]))
|
|
|
|
table_id = await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'")
|
|
table_id = table_id[0].id
|
|
|
|
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
|
|
tablet_infos = list(tablet_infos)
|
|
|
|
assert len(tablet_infos) == 1
|
|
tablet_info = tablet_infos[0]
|
|
assert len(tablet_info.replicas) == 1
|
|
|
|
hosts = {await manager.get_host_id(server.server_id) for server in servers}
|
|
print(f"HOSTS: {hosts}")
|
|
source_host, source_shard = tablet_info.replicas[0]
|
|
|
|
hosts.remove(str(source_host))
|
|
target_host, target_shard = list(hosts)[0], source_shard
|
|
|
|
await manager.api.move_tablet(
|
|
node_ip=servers[0].ip_addr,
|
|
ks=ks,
|
|
table="test",
|
|
src_host=source_host,
|
|
src_shard=source_shard,
|
|
dst_host=target_host,
|
|
dst_shard=target_shard,
|
|
token=tablet_info.last_token)
|
|
|
|
# The tablet move should have evicted the cached reader.
|
|
assert all(map(lambda x: x == 0, [get_querier_cache_population(server) for server in servers]))
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/19052
|
|
# 1) table A has N tablets and views
|
|
# 2) migration starts for a tablet of A from node 1 to 2.
|
|
# 3) migration is at write_both_read_old stage
|
|
# 4) coordinator will push writes to both nodes
|
|
# 5) A has view, so writes to it will also result in reads (table::push_view_replica_updates())
|
|
# 6) tablet's update_effective_replication_map() is not refreshing tablet sstable set (for new tablet migrating in)
|
|
# 7) so read on step 5 is not being able to find sstable set for tablet migrating in
|
|
@pytest.mark.parametrize("with_cache", ['false', 'true'])
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_read_of_pending_replica_during_migration(manager: ManagerClient, with_cache):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--enable-cache', with_cache,
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Drop cache to remove dummy entry indicating that underlying mutation source is empty
|
|
await manager.api.drop_sstable_caches(servers[1].ip_addr)
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
# Release abandoned streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/20073
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_explicit_tablet_movement_during_decommission(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'enable_tablets': True}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
# Launch the cluster with two nodes.
|
|
server_tasks = [asyncio.create_task(manager.server_add(cmdline=cmdline, config=cfg)) for _ in range(2)]
|
|
servers = [await task for task in server_tasks]
|
|
|
|
# Disable the load balancer so that it does not move tablets behind our back mid-test. This does not disable automatic tablet movement in response to
|
|
# decommission, but we'll block the latter by injecting a wait-for-message.
|
|
#
|
|
# Load balancing being enabled or disabled is a cluster-global property; we can use any node to toggle it.
|
|
logger.info("Disabling load balancing")
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
logger.info("Populating tablet")
|
|
# Create a table with just one partition and RF=1, so we have exactly one tablet.
|
|
cql = manager.get_cql()
|
|
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
|
|
await cql.run_async("CREATE TABLE test.tabmv_decomm (pk int PRIMARY KEY);")
|
|
await cql.run_async("INSERT INTO test.tabmv_decomm (pk) VALUES (0)")
|
|
rows = await cql.run_async("SELECT pk FROM test.tabmv_decomm")
|
|
assert len(list(rows)) == 1
|
|
|
|
logger.info("Identifying source, destination, coordinator and non-coordinator nodes")
|
|
# Get the sole replica (see RF=1 above) for the sole tablet. (The token value is irrelevant due to there being only one tablet.) We can ask either one of
|
|
# the nodes.
|
|
token = 0
|
|
source_task = asyncio.create_task(get_tablet_replica(manager, servers[0], 'test', 'tabmv_decomm', token))
|
|
|
|
# Get the IDs of both nodes.
|
|
node_id_tasks = [asyncio.create_task(manager.get_host_id(srv.server_id)) for srv in servers]
|
|
|
|
# Get the ID of the topology coordinator.
|
|
crd_task = asyncio.create_task(get_topology_coordinator(manager))
|
|
|
|
# Open the logs of both servers.
|
|
log_tasks = [asyncio.create_task(manager.server_open_log(srv.server_id)) for srv in servers]
|
|
|
|
# Collect results (completion order doesn't matter).
|
|
src_node_id, src_shard = await source_task
|
|
node_ids = [await task for task in node_id_tasks]
|
|
crd_id = await crd_task
|
|
logs = [await task for task in log_tasks]
|
|
|
|
# The destination node is the node that is not the source node. We always use shard#0 on the destination.
|
|
src = node_ids.index(src_node_id)
|
|
dst = 1 - src
|
|
dst_shard = 0
|
|
|
|
# The coordinator node is one of the two nodes. The non-coordinator node is the other node.
|
|
crd = node_ids.index(crd_id)
|
|
ncr = 1 - crd
|
|
|
|
# Four variations are possible:
|
|
#
|
|
# source destination coordinator non-coordinator
|
|
# ------ ----------- ----------- ---------------
|
|
# node#0 node#1 node#0 node#1
|
|
# node#0 node#1 node#1 node#0
|
|
# node#1 node#0 node#0 node#1
|
|
# node#1 node#0 node#1 node#0
|
|
logger.info(f"src id={servers[src].server_id} ip={servers[src].ip_addr} node={node_ids[src]} shard={src_shard}")
|
|
logger.info(f"dst id={servers[dst].server_id} ip={servers[dst].ip_addr} node={node_ids[dst]} shard={dst_shard}")
|
|
logger.info(f"crd id={servers[crd].server_id} ip={servers[crd].ip_addr} node={node_ids[crd]}")
|
|
logger.info(f"ncr id={servers[ncr].server_id} ip={servers[ncr].ip_addr} node={node_ids[ncr]}")
|
|
|
|
logger.info("Decommissioning src")
|
|
# Inject a wait-for-message into the topology coordinator. We're going to block decommission right after entering the "tablet draining" transition state.
|
|
await manager.api.enable_injection(servers[crd].ip_addr, "suspend_decommission", one_shot=True)
|
|
|
|
# Initiate decommissioning the source node, and wait until the coordinator reaches "tablet draining".
|
|
crd_log_mark = await logs[crd].mark()
|
|
decomm_task = asyncio.create_task(manager.decommission_node(servers[src].server_id))
|
|
await logs[crd].wait_for('entered `tablet draining` transition state', from_mark=crd_log_mark)
|
|
|
|
logger.info("Moving tablet from src to dst")
|
|
# Move the tablet from the source node to the destination node. Ask the non-coordinator node to do it, as the coordinator node is suspended. Wait until the
|
|
# storage service on the non-coordinator node confirms it has seen the topology state machine as busy, and that it has kept the transition state intact.
|
|
ncr_log_mark = await logs[ncr].mark()
|
|
move_task = asyncio.create_task(manager.api.move_tablet(servers[ncr].ip_addr, "test", "tabmv_decomm",
|
|
node_ids[src], src_shard, node_ids[dst], dst_shard, token))
|
|
await logs[ncr].wait_for(r'transit_tablet\([^)]+\): topology busy, keeping transition state', from_mark=ncr_log_mark)
|
|
|
|
logger.info("Completing decommissioning and tablet movement")
|
|
# Resume decommissioning.
|
|
await manager.api.message_injection(servers[crd].ip_addr, "suspend_decommission")
|
|
|
|
# Complete both the decommissioning and the explicit tablet movement (completion order does not matter).
|
|
#
|
|
# Completion of "decomm_task" shows that the decommission flow doesn't get stuck.
|
|
await decomm_task
|
|
await move_task
|
|
|
|
|
|
# This test checks that --enable-tablets option and the TABLETS parameters of the CQL CREATE KEYSPACE
|
|
# statemement are mutually correct from the "the least surprising behavior" concept. See comments inside
|
|
# the test code for more details.
|
|
@pytest.mark.parametrize("with_tablets", [True, False])
|
|
@pytest.mark.parametrize("replication_strategy", ["NetworkTopologyStrategy", "SimpleStrategy", "EverywhereStrategy", "LocalStrategy"])
|
|
@pytest.mark.asyncio
|
|
async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, with_tablets, replication_strategy):
|
|
cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if with_tablets else 'disabled'}
|
|
server = await manager.server_add(config=cfg)
|
|
cql = manager.get_cql()
|
|
|
|
# Tablets are only possible when the replication strategy is NetworkTopology
|
|
tablets_possible = (replication_strategy == 'NetworkTopologyStrategy')
|
|
tablets_enabled_by_default = tablets_possible and with_tablets
|
|
|
|
# First, check if a kesypace is able to be created with default CQL statement that
|
|
# doesn't contain tablets parameters. When possible, tablets should be activated
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}}") as ks:
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
if tablets_enabled_by_default:
|
|
assert res.initial_tablets == 0
|
|
else:
|
|
assert res is None
|
|
|
|
# Next, check that explicit CQL request for enabling tablets can only be satisfied when
|
|
# tablets are possible. Tablets must be activated in this case
|
|
if tablets_possible:
|
|
expectation = does_not_raise()
|
|
else:
|
|
expectation = pytest.raises(ConfigurationException)
|
|
with expectation:
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}}")
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
assert res.initial_tablets == 0
|
|
await cql.run_async(f"drop keyspace {ks}")
|
|
|
|
# Finally, check that explicitly disabling tablets in CQL results in vnode-based keyspace
|
|
# whenever tablets are enabled or not in config
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}}") as ks:
|
|
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
|
|
assert res is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablets_and_gossip_topology_changes_are_incompatible(manager: ManagerClient):
|
|
cfg = {"tablets_mode_for_new_keyspaces": "enabled", "force_gossip_topology_changes": True}
|
|
with pytest.raises(Exception, match="Failed to add server"):
|
|
await manager.server_add(config=cfg)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerClient):
|
|
cfg = {"tablets_mode_for_new_keyspaces": "disabled", "force_gossip_topology_changes": True}
|
|
await manager.server_add(config=cfg)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks_name:
|
|
res = cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks_name}'").one()
|
|
logger.info(res)
|
|
|
|
for enabled in ["false", "true"]:
|
|
expected = r"Error from server: code=2000 \[Syntax error in CQL query\] message=\"line 1:126 no viable alternative at input 'tablets'\""
|
|
with pytest.raises(SyntaxException, match=expected):
|
|
ks_name = unique_name()
|
|
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets {{'enabled': {enabled}}};")
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/21564
|
|
1) Create a table with 1 initial tablet and populate it
|
|
2) Create a view on the table but prevent the generator from processing it using error injection
|
|
3) Start migration of the tablet from node 1 to 2
|
|
4) Once migration completes, the view should have the correct number of rows
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'view_building_coordinator=debug',
|
|
'--logger-log-level', 'view_building_worker=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
logger.info("Create table, populate it and flush the table to disk")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
num_of_rows = 64
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(num_of_rows)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
logger.info("Starting Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Inject error to make view building worker pause before processing the sstable")
|
|
injection_name = "view_building_worker_pause_before_consume"
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
|
|
|
|
logger.info("Create view")
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
logger.info("Migrate the tablet to node 2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
async def check_table():
|
|
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
|
if len(result) == 1:
|
|
return True
|
|
else:
|
|
return None
|
|
await wait_for(check_table, time.time() + 30)
|
|
|
|
# Verify the table has expected number of rows
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == num_of_rows
|
|
# Verify that the view has the expected number of rows
|
|
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
|
|
assert len(list(rows)) == num_of_rows
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/19149
|
|
1) Create a table with 1 initial tablet and populate it
|
|
2) Create a view on the table but prevent the generator
|
|
3) Inject error to prevent processing of new sstables in view generator
|
|
4) Create an sstable, move it into upload directory of test table and start upload
|
|
5) Start migration of the tablet from node 1 to 2
|
|
6) Once migration completes, the view should have the correct number of rows
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'view_building_coordinator=debug',
|
|
'--logger-log-level', 'view_building_worker=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
logger.info("Create the test table, populate few rows and flush to disk")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
logger.info("Create view")
|
|
# Pause view building
|
|
await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True)
|
|
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
|
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
|
PRIMARY KEY (c, pk);")
|
|
|
|
# Check if view building was started
|
|
async def check_mv_status():
|
|
mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'")
|
|
return len(mv_status) == 1 and mv_status[0].status == "STARTED"
|
|
await wait_for(check_mv_status, time.time() + 30)
|
|
|
|
logger.info("Generate an sstable and move it to upload directory of test table")
|
|
# create an sstable using a dummy table
|
|
await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
dummy_table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "dummy-*"))[0]
|
|
test_table_upload_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*", "upload"))[0]
|
|
for src_path in glob.glob(os.path.join(dummy_table_dir, sstable_filename_glob)):
|
|
dst_path = os.path.join(test_table_upload_dir, os.path.basename(src_path))
|
|
os.rename(src_path, dst_path)
|
|
await cql.run_async(f"DROP TABLE {ks}.dummy;")
|
|
|
|
logger.info("Starting Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Inject error to prevent view generator from processing staged sstables")
|
|
injection_name = "view_update_generator_consume_staging_sstable"
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
|
|
|
|
logger.info("Load the sstables from upload directory")
|
|
await manager.api.load_new_sstables(servers[0].ip_addr, ks, "test")
|
|
|
|
# The table now has both staged and unstaged sstables.
|
|
# Verify that tablet migration handles them both without causing any base-view inconsistencies.
|
|
logger.info("Migrate the tablet to node 2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
async def check_view_is_built():
|
|
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
|
if len(result) == 1:
|
|
return True
|
|
else:
|
|
return None
|
|
await wait_for(check_view_is_built, time.time() + 60)
|
|
|
|
expected_num_of_rows = 128
|
|
# Verify the table has expected number of rows
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == expected_num_of_rows
|
|
# Verify that the view has the expected number of rows
|
|
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
|
|
assert len(list(rows)) == expected_num_of_rows
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orphaned_sstables_on_startup(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/18038
|
|
1) Start a node (node1)
|
|
2) Create a table with 1 initial tablet and populate it
|
|
3) Start another node (node2)
|
|
4) Migrate the existing tablet from node1 to node2
|
|
5) Stop node1
|
|
6) Copy the sstables from node2 to node1
|
|
7) Attempting to start node1 should fail as it now has an 'orphaned' sstable
|
|
"""
|
|
logger.info("Starting Node 1")
|
|
cfg = {'enable_user_defined_functions': False, 'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
logger.info("Create the test table, populate few rows and flush to disk")
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(256)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
node0_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
node0_table_dir = glob.glob(os.path.join(node0_workdir, "data", ks, "test-*"))[0]
|
|
|
|
logger.info("Start Node 2")
|
|
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
|
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
|
|
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
|
|
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Migrate the tablet from node1 to node2")
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
logger.info("Migration done")
|
|
|
|
logger.info("Stop node1 and copy the sstables from node2")
|
|
await manager.server_stop(servers[0].server_id)
|
|
for src_path in glob.glob(os.path.join(node1_table_dir, sstable_filename_glob)):
|
|
dst_path = os.path.join(node0_table_dir, os.path.basename(src_path))
|
|
shutil.copy(src_path, dst_path)
|
|
|
|
# try starting the server again
|
|
logger.info("Start node1 with the orphaned sstables and expect it to fail")
|
|
# Error thrown is of format : "Unable to load SSTable {sstable_name} : Storage wasn't found for tablet {tablet_id} of table {ks}.test"
|
|
await manager.server_start(servers[0].server_id, expected_error="Storage wasn't found for tablet")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Reproducer for #21826
|
|
Verify that a node cannot be removed with tablets when
|
|
there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
even when there is a zero-token node in the same datacenter and in another datacenter,
|
|
and when there is another down node in the datacenter, leaving no normal token owners.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
|
extra_node = 0 if with_zero_token_node else 1
|
|
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
node_to_remove = servers['dc1'][0]
|
|
node_to_replace = servers['dc1'][1]
|
|
replaced_host_id = await manager.get_host_id(node_to_replace.server_id)
|
|
initiator_node = servers['dc2'][0]
|
|
|
|
# Stop both token owners in dc1 to leave no token owners in the datacenter
|
|
await manager.server_stop_gracefully(node_to_remove.server_id)
|
|
await manager.server_stop_gracefully(node_to_replace.server_id)
|
|
|
|
logger.info("Attempting removenode - expected to fail")
|
|
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id, ignore_dead=[replaced_host_id],
|
|
expected_error="Removenode failed. See earlier errors (Rolled back: Failed to drain tablets: std::runtime_error (There are nodes with tablets to drain")
|
|
|
|
logger.info(f"Replacing {node_to_replace} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_excludenode(manager: ManagerClient):
|
|
"""
|
|
Verifies recovery scenario involving marking the node as excluded using excludenode.
|
|
|
|
1. Create a cluster with 3 racks, 1 node in rack1, 2 nodes in rack2, and 1 in rack3.
|
|
2. The keyspace is initially replicated to rack1 and rack2.
|
|
3. We down one node in rack2, which should cause unavailability.
|
|
4. We mark the node as excluded using excludenode. This unblocks the next ALTER
|
|
5. We add rack3 to RF of the keyspace. This wouldn't succeed without marking the node as excluded.
|
|
6. We verify that downed node can be removed successfully, while there are still tablets on it. That's
|
|
why we need two nodes in rack2.
|
|
"""
|
|
servers = await manager.servers_add(servers_num=3, auto_rack_dc='dc1')
|
|
await manager.server_add(property_file={'dc': 'dc1', 'rack': 'rack2'})
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', "
|
|
"'dc1': ['rack1', 'rack2']} AND tablets = { 'initial': 8 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
live_node = servers[0]
|
|
node_to_remove = servers[1]
|
|
with pytest.raises(Exception, match="Cannot mark host .* as excluded because it's alive"):
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
|
|
|
with pytest.raises(Exception, match=".* does not belong to this cluster"):
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[str(uuid.uuid4())])
|
|
|
|
await manager.server_stop(node_to_remove.server_id)
|
|
await manager.others_not_see_server(node_to_remove.ip_addr)
|
|
await manager.api.exclude_node(live_node.ip_addr, hosts=[await manager.get_host_id(node_to_remove.server_id)])
|
|
|
|
# Check that tablets can be rebuilt in a new rack with rack2 down.
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH REPLICATION = {{ 'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2', 'rack3']}}")
|
|
|
|
# Check that removenode succeeds on the node which is excluded
|
|
await manager.remove_node(live_node.server_id, server_id=node_to_remove.server_id)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Verify that a node cannot be removed with tablets when
|
|
there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
even when there is a zero-token node in the same datacenter and in another datacenter.
|
|
And then verify that that node can be replaced successfully.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
servers['dc2'] = await manager.servers_add(servers_num=2, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
node_to_remove = servers['dc1'][0]
|
|
initiator_node = servers['dc2'][0]
|
|
|
|
await manager.server_stop_gracefully(node_to_remove.server_id)
|
|
|
|
logger.info("Attempting removenode - expected to fail")
|
|
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id,
|
|
expected_error="Removenode failed")
|
|
|
|
logger.info(f"Replacing {node_to_remove} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=node_to_remove.property_file())
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.nightly
|
|
@pytest.mark.parametrize("with_zero_token_node", [False, True])
|
|
async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient, with_zero_token_node: bool):
|
|
"""
|
|
Verify that nodes can be successfully replaced with tablets when
|
|
even when there are not enough nodes in a datacenter to satisfy the configured replication factor,
|
|
with and without zero-token nodes in the same datacenter and in another datacenter,
|
|
and when there is another down node in the datacenter, leaving no normal token owners,
|
|
but other datacenters can be used to rebuild the data.
|
|
"""
|
|
servers: dict[str, list[ServerInfo]] = dict()
|
|
servers['dc1'] = await manager.servers_add(servers_num=2, property_file=[
|
|
{'dc': 'dc1', 'rack': 'rack1_1'},
|
|
{'dc': 'dc1', 'rack': 'rack1_2'}])
|
|
# if testing with no zero-token-node, add an additional node to dc2 to maintain raft quorum
|
|
extra_node = 0 if with_zero_token_node else 1
|
|
servers['dc2'] = await manager.servers_add(servers_num=2 + extra_node, property_file={'dc': 'dc2', 'rack': 'rack2'})
|
|
if with_zero_token_node:
|
|
servers['dc1'].append(await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc1', 'rack': 'rack1_1'}))
|
|
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
|
stmt.consistency_level = ConsistencyLevel.ALL
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
|
|
|
|
nodes_to_replace = servers['dc1'][0:2]
|
|
replaced_host_id = await manager.get_host_id(nodes_to_replace[1].server_id)
|
|
|
|
# Stop both token owners in dc1 to leave no token owners in the datacenter
|
|
for node in nodes_to_replace:
|
|
await manager.server_stop_gracefully(node.server_id)
|
|
|
|
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
|
|
ignore_dead_nodes=[replaced_host_id])
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[0].property_file())
|
|
|
|
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
|
|
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
|
|
await manager.server_add(replace_cfg=replace_cfg, property_file=nodes_to_replace[1].property_file())
|
|
|
|
logger.info("Verifying data")
|
|
for node in servers['dc2']:
|
|
await manager.server_stop_gracefully(node.server_id)
|
|
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
|
|
rows = await cql.run_async(query)
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
# For dropping the keyspace
|
|
await asyncio.gather(*[manager.server_start(node.server_id) for node in servers['dc2']])
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_drop_keyspace_while_split(manager: ManagerClient):
|
|
|
|
# Reproducer for: https://github.com/scylladb/scylladb/issues/22431
|
|
# This tests if the split ready compaction groups are correctly created
|
|
# on a shard with several storage groups for the same table
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [ '--target-tablet-size-in-bytes', '8192',
|
|
'--smp', '2' ]
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = [await manager.server_add(config=config, cmdline=cmdline)]
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
# create a table so that it has at least 2 tablets (and storage groups) per shard
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(2048)
|
|
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'truncate_compaction_disabled_wait', one_shot=False)
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
|
|
|
|
# enable the load balancer which should emmit a tablet split
|
|
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
# wait for compaction groups to be created and split to begin
|
|
await s0_log.wait_for('split_storage_groups_wait: wait')
|
|
|
|
# start a DROP and wait for it to disable compaction
|
|
drop_ks_task = cql.run_async(f'DROP KEYSPACE {ks};')
|
|
await s0_log.wait_for('truncate_compaction_disabled_wait: wait')
|
|
|
|
# release split
|
|
await manager.api.message_injection(servers[0].ip_addr, "split_storage_groups_wait")
|
|
|
|
# release drop and wait for it to complete
|
|
await manager.api.message_injection(servers[0].ip_addr, "truncate_compaction_disabled_wait")
|
|
await drop_ks_task
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_drop_with_tablet_migration_cleanup(manager: ManagerClient):
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/25706
|
|
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True }
|
|
cmdline = ['--smp', '2' ]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
# We don't want the load balancer to migrate tablets during the test
|
|
await manager.api.disable_tablet_balancing(server.ip_addr)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
# Create the table, insert data and flush
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(100)])
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# Get the current location of the tablet
|
|
token = 0
|
|
replica = await get_tablet_replica(manager, server, ks, "test", token)
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "wait_before_stop_compaction_groups", one_shot=True)
|
|
await manager.api.enable_injection(server.ip_addr, "truncate_compaction_disabled_wait", one_shot=True)
|
|
|
|
slog = await manager.server_open_log(server.server_id)
|
|
smark = await slog.mark()
|
|
|
|
# Start migrating the tablet
|
|
dst_shard = 1 if replica[0] == 0 else 1
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], replica[1], replica[0], dst_shard, token))
|
|
|
|
# Wait until the leaving replica is about to be cleaned up.
|
|
# storage_group's gate has been closed, but the compaction groups have not yet been stopped and disabled
|
|
await slog.wait_for("wait_before_stop_compaction_groups: wait", from_mark=smark)
|
|
|
|
# Start dropping the table
|
|
drop_future = cql.run_async(f"DROP TABLE {ks}.test;")
|
|
|
|
# Wait for truncate to complete disabling compaction
|
|
await slog.wait_for("truncate_compaction_disabled_wait: wait", from_mark=smark)
|
|
|
|
# Release the migration's tablet cleanup
|
|
await manager.api.message_injection(server.ip_addr, "wait_before_stop_compaction_groups")
|
|
|
|
# Release drop/truncate
|
|
await manager.api.message_injection(server.ip_addr, "truncate_compaction_disabled_wait")
|
|
await drop_future
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_two_tablets_concurrent_repair_and_migration(manager: ManagerClient):
|
|
injection = "repair_shard_repair_task_impl_do_repair_ranges"
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
all_replicas = await get_all_tablet_replicas(manager, servers[0], ks, "test")
|
|
all_replicas.sort(key=lambda x: x.last_token)
|
|
assert len(all_replicas) >= 3
|
|
repair_replicas = all_replicas[1]
|
|
migration_replicas = all_replicas[0]
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
async def repair_task():
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", repair_replicas.last_token)
|
|
|
|
async def migration_task():
|
|
await wait_for_first_completed([log.wait_for('Started to repair', from_mark=mark) for log, mark in zip(logs, marks)])
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
|
|
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await asyncio.gather(repair_task(), migration_task())
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablet_split_finalization_with_migrations(manager: ManagerClient):
|
|
"""
|
|
Reproducer for https://github.com/scylladb/scylladb/issues/21762
|
|
1) Start a cluster with two nodes with error injected to prevent resize finalisatio
|
|
2) Create and populate `test` table
|
|
3) Trigger a split in the table by increasing `min_tablet_count`
|
|
4) Wait for the table `test` to reach split finalization stage
|
|
5) Create and populate another table `blocker`
|
|
6) Disable tablet balancing and move all tablets of `test` and `blocker` table from node 2 to node 1
|
|
7) Enable tablet balancing and disable error injection to allow migration and finalisation to proceed.
|
|
8) Expect finalization in `test` to be preferred over migrations in both tables
|
|
"""
|
|
logger.info("Starting Cluster")
|
|
cfg = {
|
|
'enable_user_defined_functions': False, 'enable_tablets': True,
|
|
'error_injections_at_startup': [
|
|
# intially disable transitioning into tablet_resize_finalization topology state
|
|
'tablet_split_finalization_postpone',
|
|
],
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg)
|
|
|
|
logger.info("Create and populate test table")
|
|
cql = manager.get_cql()
|
|
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
|
|
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, "test")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
|
|
test_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'test'"))[0].id
|
|
|
|
logger.info("Trigger split in table")
|
|
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
|
|
|
|
# Wait for splits to finalise; they don't execute yet as they are prevented by the error injection
|
|
logger.info("Wait for tablets to split")
|
|
log = await manager.server_open_log(servers[0].server_id)
|
|
await log.wait_for(f"Finalizing resize decision for table {test_table_id} as all replicas agree on sequence number 1")
|
|
|
|
logger.info("Create and populate `blocker` table")
|
|
await cql.run_async("CREATE TABLE test.blocker (pk int PRIMARY KEY, c int);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.blocker (pk, c) VALUES ({k}, {k%3});") for k in range(128)])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "blocker")
|
|
blocker_table_id = (await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'blocker'"))[0].id
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
for cf in ["test", "blocker"]:
|
|
logger.info(f"Move all tablets of test.{cf} from Node 2 to Node 1")
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
s1_replicas = await get_all_tablet_replicas(manager, servers[1], "test", cf)
|
|
migration_tasks = [
|
|
manager.api.move_tablet(servers[0].ip_addr, "test", cf,
|
|
tablet.replicas[0][0], tablet.replicas[0][1],
|
|
s0_host_id, 0, tablet.last_token)
|
|
for tablet in s1_replicas
|
|
]
|
|
await asyncio.gather(*migration_tasks)
|
|
|
|
logger.info("Re-enable tablet balancing; it should be blocked by pending split finalization")
|
|
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
|
mark, _ = await log.wait_for("Setting tablet balancing to true")
|
|
|
|
logger.info("Unblock resize finalisation and verify that the finalisation is preferred over migrations")
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
split_finalization_mark, _ = await log.wait_for("Finished tablet resize finalization", from_mark=mark)
|
|
for table_id in [test_table_id, blocker_table_id]:
|
|
migration_mark, _ = await log.wait_for(f"Will set tablet {table_id}:\\d+ stage to write_both_read_old", from_mark=mark)
|
|
assert split_finalization_mark < migration_mark, f"Tablet migration of {table_id} was scheduled before resize finalization"
|
|
|
|
# ensure all migrations complete
|
|
logger.info("Waiting for migrations to complete")
|
|
await log.wait_for("Tablet load balancer did not make any plan", from_mark=migration_mark)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):
|
|
injection = "repair_writer_impl_create_writer_wait"
|
|
cmdline = [
|
|
'--logger-log-level', 'repair=debug',
|
|
'--hinted-handoff-enabled', '0',
|
|
]
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager, cmdline=cmdline)
|
|
|
|
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
|
|
|
|
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
|
|
|
async def insert_with_down(down_server):
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({k}, {k + 1});") for k in range(10)])
|
|
|
|
cql = await safe_rolling_restart(manager, [servers[0]], with_down=insert_with_down)
|
|
|
|
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 30)
|
|
|
|
all_replicas = await get_all_tablet_replicas(manager, servers[1], ks, "test")
|
|
migration_replicas = all_replicas[0]
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
async def repair_task():
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test2", token="all")
|
|
|
|
async def migration_task():
|
|
await wait_for_first_completed([log.wait_for(f'repair_writer: keyspace={ks}', from_mark=mark) for log, mark in zip(logs, marks)])
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", migration_replicas.replicas[0][0], migration_replicas.replicas[0][1], migration_replicas.replicas[0][0], 0 if migration_replicas.replicas[0][1] != 0 else 1, migration_replicas.last_token)
|
|
[await manager.api.message_injection(s.ip_addr, injection) for s in servers]
|
|
[await manager.api.disable_injection(s.ip_addr, injection) for s in servers]
|
|
|
|
await asyncio.gather(repair_task(), migration_task())
|
|
|
|
async def check_tablet_rebuild_with_repair(manager: ManagerClient, fail: bool):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = {'enable_user_defined_functions': False, 'enable_tablets': True}
|
|
if fail:
|
|
cfg['error_injections_at_startup'] = ['rebuild_repair_stage_fail']
|
|
host_ids = []
|
|
servers = []
|
|
|
|
async def make_server(rack: str):
|
|
s = await manager.server_add(config=cfg, property_file={"dc": "dc1", "rack": rack})
|
|
servers.append(s)
|
|
host_ids.append(await manager.get_host_id(s.server_id))
|
|
await manager.api.disable_tablet_balancing(s.ip_addr)
|
|
|
|
await make_server("r1")
|
|
await make_server("r1")
|
|
await make_server("r2")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is on [{replicas}]")
|
|
assert len(replicas) == 1 and len(replicas[0].replicas) == 2
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
for h in host_ids:
|
|
if h not in replicas:
|
|
new_replica = (h, 0)
|
|
break
|
|
else:
|
|
assert False, "Cannot find node without replica"
|
|
|
|
logs = []
|
|
for s in servers:
|
|
logs.append(await manager.server_open_log(s.server_id))
|
|
|
|
logger.info(f"Adding replica to tablet, host {new_replica[0]}")
|
|
await manager.api.add_tablet_replica(servers[0].ip_addr, ks, "test", new_replica[0], new_replica[1], 0)
|
|
|
|
assert sum([len(await log.grep(rf'.*Will set tablet .* stage to rebuild_repair.*')) for log in logs]) == 1
|
|
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
logger.info(f"Tablet is now on [{replicas}]")
|
|
assert len(replicas) == 1
|
|
replicas = [ r[0] for r in replicas[0].replicas ]
|
|
|
|
assert len(replicas) == 2 if fail else len(replicas) == 3
|
|
|
|
for h, s in zip(host_ids, servers):
|
|
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
|
|
if h != host_ids[0]:
|
|
await read_barrier(manager.api, host[0].address) # host-0 did the barrier in get_all_tablet_replicas above
|
|
res = await cql.run_async(f"SELECT COUNT(*) FROM MUTATION_FRAGMENTS({ks}.test)", host=host[0])
|
|
logger.info(f"Host {h} reports {res} as mutation fragments count")
|
|
if h in replicas:
|
|
assert res[0].count != 0
|
|
else:
|
|
assert res[0].count == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_rebuild(manager: ManagerClient):
|
|
await check_tablet_rebuild_with_repair(manager, False)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_tablet_rebuild_failure(manager: ManagerClient):
|
|
await check_tablet_rebuild_with_repair(manager, True)
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_repair_with_invalid_session_id(manager: ManagerClient):
|
|
injection = "handle_tablet_migration_repair_random_session"
|
|
token = -1
|
|
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(manager)
|
|
|
|
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
[await manager.api.enable_injection(s.ip_addr, injection, one_shot=True) for s in servers]
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token)
|
|
|
|
matches = [await log.grep("std::runtime_error \(Session not found", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
assert sum(len(x) for x in matches) > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_moving_replica_to_replica(manager: ManagerClient):
|
|
"""
|
|
Verify that trying to move a tablet replica to a node that is already
|
|
a replica is prevented with an appropriate error message.
|
|
"""
|
|
|
|
ks = "ks"
|
|
table = "my_table"
|
|
|
|
# For convenience when moving tablets.
|
|
cmdline = ["--smp=1"]
|
|
s1, s2 = await manager.servers_add(2, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
host_id1 = await manager.get_host_id(s1.server_id)
|
|
host_id2 = await manager.get_host_id(s2.server_id)
|
|
|
|
cql, _ = await manager.get_ready_cql([s1, s2])
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2}} "
|
|
f"AND tablets = {{'enabled': true, 'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
|
|
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
|
|
|
|
# Doesn't matter. There's only one tablet.
|
|
tablet_token = 0
|
|
|
|
with pytest.raises(Exception, match=rf"Tablet .* has replica on {host_id2}"):
|
|
await manager.api.move_tablet(
|
|
node_ip=s1.ip_addr,
|
|
ks=ks,
|
|
table=table,
|
|
src_host=host_id1,
|
|
src_shard=0,
|
|
dst_host=host_id2,
|
|
dst_shard=0,
|
|
token=tablet_token)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_moving_replica_within_single_rack(manager: ManagerClient):
|
|
"""
|
|
Verify that it's possible to move a tablet from a replica node to a node
|
|
that's not a replica.
|
|
"""
|
|
|
|
ks = "ks"
|
|
table = "my_table"
|
|
|
|
# For convenience when moving tablets.
|
|
cmdline = ["--smp=1"]
|
|
s1, s2 = await manager.servers_add(2, cmdline=cmdline, property_file={"dc": "dc1", "rack": "r1"})
|
|
|
|
host_id1 = await manager.get_host_id(s1.server_id)
|
|
host_id2 = await manager.get_host_id(s2.server_id)
|
|
|
|
cql, _ = await manager.get_ready_cql([s1, s2])
|
|
|
|
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1}} "
|
|
f"AND tablets = {{'enabled': true, 'initial': 1}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int, ck int, v int, PRIMARY KEY (pk, ck))")
|
|
await cql.run_async(f"INSERT INTO {ks}.{table} (pk, ck, v) VALUES (1, 1, 1)")
|
|
|
|
# Doesn't matter. There's only one tablet.
|
|
tablet_token = 0
|
|
host_id, _ = await get_tablet_replica(manager, s1, ks, table, tablet_token)
|
|
|
|
if host_id != host_id1:
|
|
s1, s2 = s2, s1
|
|
host_id1, host_id2 = host_id2, host_id1
|
|
|
|
await manager.api.move_tablet(
|
|
node_ip=s1.ip_addr,
|
|
ks=ks,
|
|
table=table,
|
|
src_host=host_id1,
|
|
src_shard=0,
|
|
dst_host=host_id2,
|
|
dst_shard=0,
|
|
token=tablet_token)
|