This is a refactoring commit. We need to load the cluster version for a host in several places, so extract a helper for this.
2229 lines
104 KiB
Python
2229 lines
104 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from typing import Any
|
|
from cassandra.query import ConsistencyLevel, SimpleStatement
|
|
from cassandra.policies import FallthroughRetryPolicy
|
|
|
|
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
|
|
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
|
|
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace, get_topology_version
|
|
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import random
|
|
import os
|
|
import glob
|
|
from collections import defaultdict
|
|
from collections.abc import Iterable
|
|
from contextlib import asynccontextmanager
|
|
import itertools
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def inject_error_one_shot_on(manager, error_name, servers):
|
|
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
|
|
async def inject_error_on(manager, error_name, servers):
|
|
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_injection_on(manager, error_name, servers):
|
|
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def safe_server_stop_gracefully(manager, server_id, timeout: float = 60, reconnect: bool = False):
|
|
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
|
|
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
|
|
manager.driver_close()
|
|
await manager.server_stop_gracefully(server_id, timeout)
|
|
cql = None
|
|
if reconnect:
|
|
cql = await reconnect_driver(manager)
|
|
return cql
|
|
|
|
async def safe_rolling_restart(manager, servers, with_down):
|
|
# https://github.com/scylladb/python-driver/issues/230 is not fixed yet, so for sake of CI stability,
|
|
# driver must be reconnected after rolling restart of servers.
|
|
await manager.rolling_restart(servers, with_down)
|
|
cql = await reconnect_driver(manager)
|
|
return cql
|
|
|
|
async def wait_for_valid_load_stats(cql, table_id, timeout=120):
|
|
started = time.time()
|
|
# Wait until the given table has no missing tablet sizes
|
|
while True:
|
|
missing_cnt = 0
|
|
found_cnt = 0
|
|
for r in await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table_id};"):
|
|
found_cnt += 1
|
|
if len(r.missing_replicas) > 0:
|
|
missing_cnt += 1
|
|
|
|
if missing_cnt == 0 and found_cnt > 0:
|
|
break
|
|
|
|
assert time.time() - started < timeout, "Timed out while waiting for valid load_stats"
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
|
|
"""Test that you can create a table and insert and query data"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_proxy=trace',
|
|
'--logger-log-level', 'cql_server=trace',
|
|
'--logger-log-level', 'query_processor=trace',
|
|
'--logger-log-level', 'gossip=trace',
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'raft_topology=trace',
|
|
'--logger-log-level', 'messaging_service=trace',
|
|
'--logger-log-level', 'rpc=trace',
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
s0 = servers[0].server_id
|
|
not_s0 = servers[1:]
|
|
|
|
# s0 should miss schema and tablet changes
|
|
cql = await safe_server_stop_gracefully(manager, s0, reconnect=True)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 100}") as ks:
|
|
# force s0 to catch up later from the snapshot and not the raft log
|
|
await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0)
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(10)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 1);") for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(list(rows)) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 1
|
|
|
|
manager.driver_close()
|
|
await manager.server_start(s0, wait_others=2)
|
|
await manager.driver_connect(server=servers[0])
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
# Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as test_dummy:
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
|
|
for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 2
|
|
|
|
conn_logger = logging.getLogger("conn_messages")
|
|
conn_logger.setLevel(logging.DEBUG)
|
|
try:
|
|
# Check that after rolling restart the tablet metadata is still there
|
|
await manager.rolling_restart(servers)
|
|
|
|
cql = await reconnect_driver(manager)
|
|
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
|
|
for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 3
|
|
finally:
|
|
conn_logger.setLevel(logging.INFO)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
servers = await manager.servers_add(3)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT count(*) FROM {ks}.test;")
|
|
assert rows[0].count == len(keys)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_drop_with_auto_snapshot(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = { 'auto_snapshot': True }
|
|
servers = await manager.servers_add(3, config = cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
# Increases the chance of tablet migration concurrent with schema change
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
for i in range(3):
|
|
await cql.run_async("DROP KEYSPACE IF EXISTS test;")
|
|
await cql.run_async("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8 };")
|
|
await cql.run_async("CREATE TABLE IF NOT EXISTS test.tbl_sample_kv (id int, value text, PRIMARY KEY (id));")
|
|
await cql.run_async("INSERT INTO test.tbl_sample_kv (id, value) VALUES (1, 'ala');")
|
|
|
|
await cql.run_async("DROP KEYSPACE test;")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_topology_changes(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
servers = await manager.servers_add(3)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 32}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
expected_rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert rows == expected_rows
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
logger.info("Adding new server")
|
|
await manager.server_add()
|
|
|
|
await check()
|
|
|
|
logger.info("Adding new server")
|
|
await manager.server_add()
|
|
|
|
await check()
|
|
time.sleep(5) # Give load balancer some time to do work
|
|
await check()
|
|
|
|
await manager.decommission_node(servers[0].server_id)
|
|
|
|
await check()
|
|
|
|
async def get_two_servers_to_move_tablet(manager: ManagerClient):
|
|
"""
|
|
The first server in servers list is source node to move the tablet from. The second server is the dest node.
|
|
"""
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = ['--enable-file-stream', 'false']
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
logger.info(f'{replica=}')
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Make sure servers[0] is the source node to move the tablet from
|
|
if replica[0] != s0_host_id:
|
|
servers.reverse()
|
|
s0_host_id, s1_host_id = s1_host_id, s0_host_id
|
|
|
|
dst_shard = 0
|
|
|
|
return (servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_rx_error_no_failed_message_with_fail_stream_plan(manager: ManagerClient):
|
|
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=30))
|
|
|
|
await s1_log.wait_for('stream_manager: Failed stream_session for stream_plan', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.disable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error")
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# Sanity test
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
await cql.run_async(f"TRUNCATE {ks}.test")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that there is no data resurrection
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that moving the tablet back works
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_rx_error_no_failed_message_no_fail_stream_plan_hang(manager: ManagerClient):
|
|
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=10))
|
|
|
|
try:
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
assert False # The move tablet is not supposed to finish
|
|
except TimeoutError:
|
|
logger.info("Migration timeout as expected")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'raft_topology=trace',
|
|
'--enable-file-stream', 'false',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
# Wait for the replica-side writer of streaming to reach a place where it already
|
|
# received writes from the leaving replica but haven't applied them yet.
|
|
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
|
|
# The place we block the writer in should not hold to erm or topology_guard because that will block the migration
|
|
# below and prevent test from proceeding.
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Should cause streaming to fail and be retried while leaving behind the replica-side writer.
|
|
await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr)
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# Sanity test
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
await cql.run_async(f"TRUNCATE {ks}.test")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Release abandoned streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
|
|
|
|
# Verify that there is no data resurrection
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that moving the tablet back works
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_table_dropped_during_streaming(manager: ManagerClient):
|
|
"""
|
|
Verifies that load balancing recovers when table is dropped during streaming phase of tablet migration.
|
|
Recovering means that state machine is not stuck and later migrations can proceed.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
servers = [await manager.server_add()]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
logger.info("Populating tables")
|
|
key = 7 # Whatever
|
|
value = 3 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, {value})")
|
|
await cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({key}, {value})")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test2")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Starting tablet migration")
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token))
|
|
|
|
# Wait for the replica-side writer of streaming to reach a place where it already
|
|
# received writes from the leaving replica but haven't applied them yet.
|
|
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
|
|
# We want to drop the table while streaming is deep in the process, where it will attempt to apply writes
|
|
# to the dropped table.
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
|
|
# Streaming blocks table drop, so we can't wait here.
|
|
drop_task = cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
# Release streaming as late as possible to increase probability of drop causing problems.
|
|
await s1_log.wait_for('Dropping', from_mark=s1_mark)
|
|
|
|
# Unblock streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await drop_task
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
try:
|
|
await migration_task
|
|
except HTTPError as e:
|
|
assert 'Tablet map not found' in e.message
|
|
|
|
logger.info("Verifying that moving the other tablet works")
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
assert replica[0] == s0_host_id
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test2", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
|
|
logger.info("Verifying tablet replica")
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
|
|
assert replica == (s1_host_id, 0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_cleanup(manager: ManagerClient):
|
|
cmdline = ['--smp=2', '--commitlog-sync=batch']
|
|
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Populate table")
|
|
cql = manager.get_cql()
|
|
n_tablets = 32
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(1000)])
|
|
|
|
logger.info("Start second node")
|
|
servers.append(await manager.server_add())
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Read system.tablets")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == n_tablets
|
|
|
|
# Randomly select half of all tablets.
|
|
sample = random.sample(tablet_replicas, n_tablets // 2)
|
|
moved_tokens = [x.last_token for x in sample]
|
|
moved_src = [x.replicas[0] for x in sample]
|
|
moved_dst = [(s1_host_id, random.choice([0, 1])) for _ in sample]
|
|
|
|
# Migrate the selected tablets to second node.
|
|
logger.info("Migrate half of all tablets to second node")
|
|
for t, s, d in zip(moved_tokens, moved_src, moved_dst):
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
|
|
|
|
# Sanity check. All data we inserted should be still there.
|
|
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Wipe data on second node.
|
|
logger.info("Wipe data on second node")
|
|
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
|
|
await manager.server_wipe_sstables(servers[1].server_id, ks, "test")
|
|
await manager.server_start(servers[1].server_id)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
partitions_after_loss = (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
assert partitions_after_loss < n_partitions
|
|
|
|
# Migrate all tablets back to their original position.
|
|
# Check that this doesn't resurrect cleaned data.
|
|
logger.info("Migrate the migrated tablets back")
|
|
for t, s, d in zip(moved_tokens, moved_dst, moved_src):
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
|
|
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Kill and restart first node.
|
|
# Check that this doesn't resurrect cleaned data.
|
|
logger.info("Brutally restart first node")
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Bonus: check that commitlog_cleanups doesn't have any garbage after restart.
|
|
assert 0 == (await cql.run_async("SELECT COUNT(*) FROM system.commitlog_cleanups", host=hosts[0]))[0].count
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_cleanup_failure(manager: ManagerClient):
|
|
cmdline = ['--smp=1']
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
# Disable load balancing, so that after the move_tablet API the load balancer does
|
|
# not attempt to migrate the tablet back to the first node.
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
n_tablets = 1
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await inject_error_one_shot_on(manager, "tablet_cleanup_failure", servers)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
tablet_token = 0
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for injected cleanup failure...")
|
|
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for cleanup success on retry...")
|
|
await s0_log.wait_for(f'Cleaned up tablet .* of table {ks}.test successfully.', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for cleanup success on retry...")
|
|
await s0_log.wait_for('updating topology state: Finished tablet migration', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for migration task...")
|
|
await migration_task
|
|
|
|
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
ssts = glob.glob(os.path.join(table_dir, "*-Data.db"))
|
|
logger.info("Guarantee source node of migration left no sstables undeleted")
|
|
assert len(ssts) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_resharding(manager: ManagerClient):
|
|
cmdline = ['--smp=3']
|
|
config = {'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(1, cmdline=cmdline)
|
|
server = servers[0]
|
|
|
|
logger.info("Populate table")
|
|
cql = manager.get_cql()
|
|
n_tablets = 32
|
|
n_partitions = 1000
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await manager.server_stop_gracefully(server.server_id, timeout=120)
|
|
await manager.server_update_cmdline(server.server_id, ['--smp=2'])
|
|
|
|
await manager.server_start(
|
|
server.server_id,
|
|
expected_error="Detected a tablet with invalid replica shard, reducing shard count with tablet-enabled tables is not yet supported. Replace the node instead.")
|
|
|
|
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_split(manager: ManagerClient, injection_error: str):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '1024',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# enough to trigger multiple splits with max size of 1024 bytes.
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
# Increases the chance of tablet migration concurrent with split
|
|
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
|
|
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
# Now there's a split and migration need, so they'll potentially run concurrently.
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await check()
|
|
time.sleep(5) # Give load balancer some time to do work
|
|
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
await check()
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_correctness_of_tablet_split_finalization_after_restart(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '1024',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
servers.append(await manager.server_add(config={
|
|
'error_injections_at_startup': ['delay_split_compaction']
|
|
}, cmdline=cmdline))
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH compaction = {{'class': 'NullCompactionStrategy'}};")
|
|
|
|
# enough to trigger multiple splits with max size of 1024 bytes.
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 2
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await s1_log.wait_for('Finalizing resize decision for table', from_mark=s1_mark)
|
|
|
|
# Delays refresh of tablet stats, so balancer works with whichever it got last.
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
|
|
time.sleep(1)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
|
|
await manager.server_start(servers[1].server_id)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 2
|
|
|
|
await check()
|
|
|
|
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, injection_error):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = []
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
logger.info("Started major compaction")
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
|
|
t = tablet_replicas[0]
|
|
logger.info("Migrating tablet")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
if injection_error == "major_compaction_wait":
|
|
logger.info("Check that major was successfully aborted on migration")
|
|
await s1_log.wait_for(f"Compaction for {ks}/test was stopped due to: tablet cleanup", from_mark=s1_mark)
|
|
|
|
await check()
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_table_drop_and_major(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
injection_error = "major_compaction_wait"
|
|
cmdline = ['--logger-log-level', 'compaction_manager=debug',]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
logger.info("Started major compaction")
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
logger.info("Dropping table")
|
|
await cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
if injection_error == "major_compaction_wait":
|
|
logger.info("Check that major was successfully aborted on migration")
|
|
await s1_log.wait_for(f"ongoing compactions for table {ks}.test .* due to table removal", from_mark=s1_mark)
|
|
|
|
async def assert_tablet_count_metric_value_for_shards(manager: ManagerClient, server: ServerInfo, expected_count_per_shard: list[int]):
|
|
tablet_count_metric_name = "scylla_tablets_count"
|
|
metrics = await manager.metrics.query(server.ip_addr)
|
|
for shard_id in range(0, len(expected_count_per_shard)):
|
|
expected_tablet_count = expected_count_per_shard[shard_id]
|
|
tablet_count = metrics.get(tablet_count_metric_name, {'shard':str(shard_id)})
|
|
assert int(tablet_count) == expected_tablet_count
|
|
|
|
async def get_tablet_tokens_from_host_on_shard(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, shard: int) -> list[int]:
|
|
host = await manager.get_host_id(server.server_id)
|
|
table_tablets = await get_all_tablet_replicas(manager, server, keyspace_name, table_name)
|
|
tokens = []
|
|
for tablet_replica in table_tablets:
|
|
for host_id, shard_id in tablet_replica.replicas:
|
|
if host_id == host and shard_id == shard:
|
|
tokens.append(tablet_replica.last_token)
|
|
return tokens
|
|
|
|
async def get_tablet_count_per_shard_for_host(manager: ManagerClient, server: ServerInfo, full_tables: dict[str, list[str]], shards_count: int = 2) -> list[int]:
|
|
dict_result = await get_tablet_count_per_shard_for_hosts(manager, [server], full_tables, shards_count)
|
|
return dict_result[server.server_id]
|
|
|
|
async def get_tablet_count_per_shard_for_hosts(manager: ManagerClient, servers: Iterable[ServerInfo], full_tables: dict[str, list[str]], shards_per_node: int = 2) -> dict[ServerNum, list[int]]:
|
|
result = dict[ServerNum, list[int]]()
|
|
hosts = dict[HostID, ServerNum]()
|
|
server1 = None
|
|
|
|
for server in servers:
|
|
if not server1:
|
|
server1 = server
|
|
result[server.server_id] = [0] * shards_per_node
|
|
hosts[await manager.get_host_id(server.server_id)] = server.server_id
|
|
|
|
for keyspace, tables in full_tables.items():
|
|
for table in tables:
|
|
table_tablets = await get_all_tablet_replicas(manager, server1, keyspace, table)
|
|
for tablet_replica in table_tablets:
|
|
for host_id, shard_id in tablet_replica.replicas:
|
|
if host_id in hosts:
|
|
result[hosts[host_id]][shard_id] += 1
|
|
|
|
return result
|
|
|
|
def get_shard_that_has_tablets(tablet_count_per_shard: list[int]) -> int:
|
|
for shard_id in range(0, len(tablet_count_per_shard)):
|
|
if tablet_count_per_shard[shard_id] > 0:
|
|
return shard_id
|
|
return -1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_count_metric_per_shard(manager: ManagerClient):
|
|
# Given two running servers
|
|
shards_count = 4
|
|
cmdline = ['--smp=4']
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
# And given disabled load balancing
|
|
await manager.disable_tablet_balancing()
|
|
|
|
# When two tables are created
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable2 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable2"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# When third table is created
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable3 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable2", "mytable3"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# When one of tables is dropped
|
|
await cql.run_async(f"DROP TABLE {ks}.mytable2;")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable3"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# And when moving tablets from one shard of src_host to (dest_host, shard_3)
|
|
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_0)
|
|
if shard_id_to_move != -1:
|
|
src_server = servers[0]
|
|
dest_server = servers[1]
|
|
src_expected_count_per_shard = expected_count_per_shard_for_host_0
|
|
dest_expected_count_per_shard = expected_count_per_shard_for_host_1
|
|
else:
|
|
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_1)
|
|
src_server = servers[1]
|
|
dest_server = servers[0]
|
|
src_expected_count_per_shard = expected_count_per_shard_for_host_1
|
|
dest_expected_count_per_shard = expected_count_per_shard_for_host_0
|
|
|
|
|
|
tokens_on_shard_to_move = {
|
|
"mytable1" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable1", shard_id_to_move),
|
|
"mytable3" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable3", shard_id_to_move)
|
|
}
|
|
|
|
count_of_tokens_on_src_shard_to_move = len(tokens_on_shard_to_move["mytable1"]) + len(tokens_on_shard_to_move["mytable3"])
|
|
assert count_of_tokens_on_src_shard_to_move > 0
|
|
|
|
src_host_id = await manager.get_host_id(src_server.server_id)
|
|
dest_host_id = await manager.get_host_id(dest_server.server_id)
|
|
for table_name, tokens in tokens_on_shard_to_move.items():
|
|
for token in tokens:
|
|
await manager.api.move_tablet(node_ip=src_server.ip_addr, ks=ks, table=table_name, src_host=src_host_id, src_shard=shard_id_to_move, dst_host=dest_host_id, dst_shard=3, token=token)
|
|
|
|
# And when ensuring that local tablet metadata on the queried node reflects the finalized tablet movement
|
|
await read_barrier(manager.api, servers[0].ip_addr)
|
|
await read_barrier(manager.api, servers[1].ip_addr)
|
|
|
|
# Then tablet count metric is adjusted to depict that situation on src_host - all tablets from selected shard have been moved
|
|
src_expected_count_per_shard[shard_id_to_move] = 0
|
|
await assert_tablet_count_metric_value_for_shards(manager, src_server, src_expected_count_per_shard)
|
|
|
|
# And then tablet count metric is increased on dest_host - tablets have been moved to shard_3
|
|
dest_expected_count_per_shard[3] += count_of_tokens_on_src_shard_to_move
|
|
await assert_tablet_count_metric_value_for_shards(manager, dest_server, dest_expected_count_per_shard)
|
|
|
|
@pytest.mark.parametrize("primary_replica_only", [False, True])
|
|
async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_only):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--smp', '1',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def create_table(tablet_count : int) -> str:
|
|
# Creates multiple tablets in the same shard
|
|
ks_name = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" \
|
|
f" AND tablets = {{ 'initial': {tablet_count} }};")
|
|
await cql.run_async(f"CREATE TABLE {ks_name}.test (pk int PRIMARY KEY, c int);")
|
|
return ks_name
|
|
|
|
ks = await create_table(5) # 5 is rounded up to next power-of-two
|
|
|
|
# Populate tablets
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check(ks_name: str):
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
await check(ks)
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
|
|
ks2 = await create_table(16)
|
|
|
|
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
|
|
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
|
|
dst_table_dir = glob.glob(os.path.join(node_workdir, "data", ks2, "test-*"))[0]
|
|
logger.info(f"Dst table dir: {dst_table_dir}")
|
|
|
|
def move_sstables_to_upload(table_dir: str, dst_table_dir: str):
|
|
logger.info("Moving sstables to upload dir of destination table")
|
|
table_upload_dir = os.path.join(dst_table_dir, "upload")
|
|
moved_files = []
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
moved_files.append(dst_path)
|
|
return moved_files
|
|
|
|
moved_sstable_files = move_sstables_to_upload(table_dir, dst_table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
# Trigger concurrent migration and load-and-stream
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await manager.api.load_new_sstables(servers[0].ip_addr, ks2, "test", primary_replica_only)
|
|
|
|
time.sleep(1)
|
|
|
|
await check(ks2)
|
|
|
|
logger.info("Checking that streamed SSTables are deleted from upload directory")
|
|
remaining_files = []
|
|
for file_path in moved_sstable_files:
|
|
if os.path.exists(file_path):
|
|
logger.info(f"SSTable file still exists: {file_path}")
|
|
remaining_files.append(file_path)
|
|
|
|
if remaining_files:
|
|
raise AssertionError(f"SSTable files were not deleted after load_and_stream: {remaining_files}")
|
|
|
|
logger.info("All SSTable files successfully deleted after streaming")
|
|
|
|
await asyncio.gather(*[cql.run_async(f"drop keyspace {i}") for i in [ks, ks2]])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_storage_service_api_uneven_ownership_keyspace_and_table_params_used(manager: ManagerClient):
|
|
# Given two running servers
|
|
shards_count = 4
|
|
cmdline = ['--smp=4']
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
# When table is created with initial tablets set to 1
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# And when ownership for this table is queried
|
|
actual_ownerships = await manager.api.get_ownership(servers[0].ip_addr, ks, "mytable1")
|
|
|
|
# Then ensure that returned ownerships is 0.0 and 1.0 (which node gets 0.0 and 1.0 is unspecified)
|
|
expected_ips = {servers[0].ip_addr, servers[1].ip_addr}
|
|
expected_ownerships = [0.0, 1.0]
|
|
delta = 0.0001
|
|
already_verified = set()
|
|
|
|
sorted_actual_ownerships = sorted(actual_ownerships, key=lambda e: e["value"])
|
|
assert len(sorted_actual_ownerships) == len(expected_ownerships)
|
|
|
|
for i in range(0, len(sorted_actual_ownerships)):
|
|
entry = sorted_actual_ownerships[i]
|
|
actual_ip = entry["key"]
|
|
actual_ownership = float(entry["value"])
|
|
|
|
assert actual_ip in expected_ips
|
|
assert actual_ip not in already_verified
|
|
assert actual_ownership == pytest.approx(expected_ownerships[i], abs=delta)
|
|
|
|
already_verified.add(actual_ip)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_storage_freeing(manager: ManagerClient):
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add()]
|
|
await manager.disable_tablet_balancing()
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Create a table with two tablets and populate it with a moderate amount of data.")
|
|
n_tablets = 2
|
|
n_partitions = 1000
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v text) WITH compression = {{'sstable_compression': ''}};")
|
|
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, v) VALUES (?, ?);")
|
|
payload = "a"*10000
|
|
|
|
max_concurrency = 100
|
|
for batch in itertools.batched(range(n_partitions), max_concurrency):
|
|
await asyncio.gather(*[cql.run_async(insert_stmt, [k, payload]) for k in batch])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Start second node.")
|
|
servers.append(await manager.server_add())
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Check the table's disk usage on first node.")
|
|
size_before = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
|
|
assert size_before > n_partitions * len(payload)
|
|
|
|
logger.info("Read system.tablets.")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == n_tablets
|
|
|
|
logger.info("Migrate one of the two tablets from the first node to the second node.")
|
|
t = tablet_replicas[0]
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
|
|
|
|
logger.info("Verify that the table's disk usage on first node shrunk by about half.")
|
|
size_after = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
|
|
assert size_before * 0.33 < size_after < size_before * 0.66
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_schema_change_during_cleanup(manager: ManagerClient):
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add()]
|
|
await manager.disable_tablet_balancing()
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Start second node.")
|
|
servers.append(await manager.server_add())
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
await inject_error_on(manager, "delay_tablet_compaction_groups_cleanup", servers)
|
|
|
|
logger.info("Read system.tablets.")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == 1
|
|
|
|
logger.info("Migrating one tablet to another node.")
|
|
t = tablet_replicas[0]
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
|
|
|
|
logger.info("Waiting for log")
|
|
await s1_log.wait_for('Initiating tablet cleanup of', from_mark=s1_mark, timeout=120)
|
|
time.sleep(1)
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH gc_grace_seconds = 0;")
|
|
await migration_task
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '5000',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH gc_grace_seconds=0;")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(100)
|
|
|
|
logger.info("Generating sstable with shadowed data")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Generating another sstable with tombstones")
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
async def assert_empty_table():
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await assert_empty_table()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Waits for tombstones to be expired.
|
|
time.sleep(1)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
|
|
|
|
logger.info("Enable balancing so split will be emitted")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
logger.info("Waits for split of sstable containing expired tombstones")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
|
|
|
|
logger.info("Pause split of sstable containing deleted data")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Force compaction of split sstable containing expired tombstone")
|
|
await manager.api.stop_compaction(servers[0].ip_addr, "SPLIT")
|
|
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
|
|
|
|
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "split_sstable_rewrite")
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
|
|
logger.info("Verify data is not resurrected")
|
|
await assert_empty_table()
|
|
|
|
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int, config: dict[str, Any] = None) -> dict[ServerNum, ServerInfo]:
|
|
logger.debug(f"Creating cluster: num_dcs={num_dcs} num_racks={num_racks} nodes_per_rack={nodes_per_rack}")
|
|
servers: dict[ServerNum, ServerInfo] = dict()
|
|
for dc in range(1, num_dcs + 1):
|
|
for rack in range(1, num_racks + 1):
|
|
rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
|
|
for s in rack_servers:
|
|
servers[s.server_id] = s
|
|
logger.debug(f"Created servers={list(servers.values())}")
|
|
return servers
|
|
|
|
|
|
class TestContext:
|
|
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
|
|
self.ks = ks
|
|
self.table = table
|
|
self.rf = rf
|
|
self.initial_tablets = initial_tablets
|
|
self.num_keys = num_keys
|
|
|
|
|
|
@asynccontextmanager
|
|
async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial_tablets: int = 64, num_keys: int = 0):
|
|
ks = ""
|
|
table = unique_name()
|
|
if not num_keys:
|
|
num_keys = initial_tablets * 4
|
|
|
|
logger.info(f"Creating table and populating data {ks}.{table}: rf={rf} initial_tablets={initial_tablets} num_keys={num_keys}")
|
|
|
|
cql = manager.get_cql()
|
|
try:
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
|
|
yield TestContext(ks, table, rf, initial_tablets, num_keys)
|
|
finally:
|
|
await cql.run_async(f"DROP KEYSPACE {ks}")
|
|
|
|
|
|
def get_expected_replicas_per_server(live_servers: Iterable[ServerInfo], dead_servers: Iterable[ServerInfo], initial_tablets: int, rf: int) -> dict[ServerNum, float]:
|
|
total_replicas = initial_tablets * rf
|
|
nodes_per_rack: defaultdict[str, set[ServerNum]] = defaultdict(set)
|
|
for s in live_servers:
|
|
nodes_per_rack[s.rack].add(s.server_id)
|
|
replicas_per_rack = total_replicas / len(nodes_per_rack.keys())
|
|
result: dict[ServerNum, float] = dict()
|
|
for rack_servers in nodes_per_rack.values():
|
|
replicas_per_node = replicas_per_rack / len(rack_servers)
|
|
for id in rack_servers:
|
|
result[id] = replicas_per_node
|
|
for s in dead_servers:
|
|
result[s.server_id] = 0
|
|
return result
|
|
|
|
|
|
def verify_replicas_per_server(desc: str, expected_replicas_per_server: dict[ServerNum, float], tablet_count: dict[ServerNum, list[int]], initial_tablets: int, rf: int, shards_per_node: int = 2):
|
|
logger.debug(f"{desc}: expected_replicas_per_server={expected_replicas_per_server} tablet_count={tablet_count}")
|
|
total = 0
|
|
for server_id, host_replicas in tablet_count.items():
|
|
expected_replicas_per_shard = expected_replicas_per_server[server_id] / shards_per_node
|
|
for i in host_replicas:
|
|
total += i
|
|
assert abs(i - expected_replicas_per_shard) <= 1
|
|
assert total == initial_tablets * rf
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_rack_basic(manager: ManagerClient):
|
|
"""
|
|
Test decommissioning of all nodes in a rack
|
|
when there are enough remaining racks to satisfy
|
|
the replication factor.
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
num_racks = 3
|
|
nodes_per_rack = 2
|
|
rf = num_racks - 1
|
|
|
|
# We need to disable this option to be able to create a keyspace. This can be ditched
|
|
# once we've implemented scylladb/scylladb#23426 and we can add new racks with the option enabled.
|
|
# Then we can create `rf` nodes, create the keyspace, and add another node.
|
|
config = {"rf_rack_valid_keyspaces": False}
|
|
|
|
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack, config)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
decommision_rack = f"rack{num_racks}"
|
|
logger.debug(f"Decommissioning rack={decommision_rack}")
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
for id, s in all_servers.items():
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
await manager.decommission_node(s.server_id)
|
|
dead_servers[id] = s
|
|
else:
|
|
live_servers[id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, live_servers.values(), tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
|
"""
|
|
Test decommissioning a rack, after a rack with new nodes is added
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
initial_num_racks = 3
|
|
num_racks = initial_num_racks + 1
|
|
nodes_per_rack = 2
|
|
rf = initial_num_racks
|
|
|
|
# We can't add a new rack if we create a keyspace.
|
|
# Once scylladb/scylladb#23426 has been implemented, this can be ditched.
|
|
config = {"rf_rack_valid_keyspaces": False}
|
|
|
|
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.debug("Temporarily disable tablet load balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Add a new rack")
|
|
new_rack = f"rack{num_racks}"
|
|
# copy initial_servers into all_servers, don't just assign it (by reference)
|
|
all_servers: list[ServerInfo] = list(initial_servers.values())
|
|
new_rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": "dc1", "rack": new_rack})
|
|
all_servers.extend(new_rack_servers)
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(initial_servers.values(), new_rack_servers, ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
logger.debug("Reenable tablet load balancing")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
decommision_rack = f"rack{initial_num_racks}"
|
|
logger.debug(f"Decommissioning rack={decommision_rack}")
|
|
for s in all_servers:
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
await manager.decommission_node(s.server_id)
|
|
dead_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_not_enough_racks(manager: ManagerClient):
|
|
"""
|
|
Test that decommissioning a rack fails if the number of rack is
|
|
insufficient to satisfy replication factor, even if the number of
|
|
nodes is sufficient.
|
|
Reproduces https://github.com/scylladb/scylladb/issues/19475
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
num_racks = 3
|
|
nodes_per_rack = 2
|
|
rf = num_racks
|
|
|
|
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
decommision_rack = f"rack{num_racks}"
|
|
decommision_count = 0
|
|
for s in all_servers.values():
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
decommision_count += 1
|
|
expected_error = "Decommission failed" if decommision_count == nodes_per_rack else None
|
|
await manager.decommission_node(s.server_id, expected_error=expected_error)
|
|
if not expected_error:
|
|
dead_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
|
|
cmdline = ['--smp=1']
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
n_tablets = 1
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await inject_error_one_shot_on(manager, "tablet_cleanup_failure_post_deletion", servers)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
tablet_token = 0
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for injected cleanup failure...")
|
|
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
|
|
|
|
await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot")
|
|
|
|
# Reproduces assert failure when truncating table, either triggered by DROP TABLE or TRUNCATE.
|
|
# See: https://github.com/scylladb/scylladb/issues/18059
|
|
# It's achieved by migrating a tablet away that contains the highest replay position of a shard,
|
|
# so when drop/truncate happens, the highest replay position will be greater than all the data
|
|
# found in the table (includes data in memtable).
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("operation", ['DROP TABLE', 'TRUNCATE'])
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, operation):
|
|
cmdline = [ '--smp=2' ]
|
|
cfg = { 'auto_snapshot': True }
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
tablet_replicas_in_s0 = list[TabletReplicas]()
|
|
|
|
for replica in tablet_replicas:
|
|
if replica.replicas[0][1] == 0:
|
|
tablet_replicas_in_s0.append(replica)
|
|
|
|
assert len(tablet_replicas_in_s0) == 2
|
|
|
|
target_tablet = tablet_replicas_in_s0[0]
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
|
|
logger.info("Migrating 1st tablet to shard 1")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *(s0_host_id, 0), *(s0_host_id, 1), target_tablet.last_token)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "truncate_disable_compaction_delay", one_shot=True)
|
|
|
|
logger.info(f"Running {operation} {ks}.test")
|
|
await cql.run_async(f"{operation} {ks}.test")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.nightly
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_truncate_during_topology_change(manager: ManagerClient):
|
|
"""Test truncate operation during topology change."""
|
|
|
|
# Start 3 node cluster
|
|
servers = await manager.servers_add(3, config = { 'enable_tablets': True }, auto_rack_dc="dc1")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (k int PRIMARY KEY, v int)")
|
|
|
|
logger.info("Populating table")
|
|
stmt = cql.prepare(f"INSERT INTO {ks}.test (k, v) VALUES (?, ?)")
|
|
stmt.consistency_level = ConsistencyLevel.QUORUM
|
|
await asyncio.gather(*(cql.run_async(stmt, [k, k]) for k in range(10000)))
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
async def truncate_table():
|
|
await asyncio.sleep(10)
|
|
logger.info("Executing truncate during bootstrap")
|
|
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
|
|
|
|
truncate_task = asyncio.create_task(truncate_table())
|
|
logger.info("Adding fourth node")
|
|
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True},
|
|
property_file=servers[0].property_file())
|
|
await truncate_task
|
|
|
|
# Wait for bootstrap completion
|
|
await wait_for_cql_and_get_hosts(cql, servers + [new_server], time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
|
|
assert rows[0].count == 0, "Table should be empty after truncation"
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
|
|
cmdline = ['--smp=2']
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
table = f"{ks}.test"
|
|
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
|
|
|
|
stop_compaction = False
|
|
async def background_compaction():
|
|
while stop_compaction == False:
|
|
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
|
|
|
|
compaction_task = asyncio.create_task(background_compaction())
|
|
|
|
for i in range(5):
|
|
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
|
|
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
|
|
|
|
stop_compaction = True
|
|
await compaction_task
|
|
|
|
async def force_minor_compaction():
|
|
for i in range(4):
|
|
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
|
|
await force_minor_compaction()
|
|
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
|
|
await force_minor_compaction()
|
|
|
|
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/24153
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--smp', '1', # single cpu is needed to prevent intra-node migration which interacts badly with injection splitting_mutation_writer_switch_wait.
|
|
]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
logger.info(f'server_id = {server.server_id}')
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 2
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# force split on the test table
|
|
expected_tablet_count = 4
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
log_mark = await log.mark()
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
|
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
|
|
|
# needs to skip update of repaired_at on merge, since it stops split task.
|
|
await manager.api.enable_injection(server.ip_addr, "skip_update_repaired_at_for_merge", one_shot=False)
|
|
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
|
|
|
|
expected_tablet_count = 1
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
# wait for merge to complete
|
|
actual_tablet_count = 0
|
|
started = time.time()
|
|
while expected_tablet_count != actual_tablet_count:
|
|
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
|
|
|
|
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
|
|
await asyncio.sleep(.1)
|
|
|
|
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
|
|
|
|
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
|
await asyncio.sleep(.1)
|
|
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
|
|
@pytest.mark.parametrize("primary_replica_only", [False, True])
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
server = servers[0]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
initial_tablets = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check(ks_name: str):
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
await check(ks)
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
|
|
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
|
|
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
|
|
def move_sstables_to_upload(table_dir: str):
|
|
logger.info("Moving sstables to upload dir")
|
|
table_upload_dir = os.path.join(table_dir, "upload")
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
|
|
move_sstables_to_upload(table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
|
|
|
|
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
|
|
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
|
|
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
|
|
|
|
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
|
|
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
|
|
|
|
await load_and_stream_task
|
|
|
|
await check(ks)
|
|
|
|
async def test_update_load_stats_after_rebuild(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def get_tablet_sizes(table):
|
|
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
|
|
# Wait for the coordinator to refresh load_stats
|
|
while True:
|
|
rows = await get_tablet_sizes(table_id)
|
|
if len(rows) == 1 and len(rows[0].replicas) == 1:
|
|
replica_host = [str(u) for u in rows[0].replicas.keys()][0]
|
|
if s0_host_id == replica_host:
|
|
break
|
|
|
|
# Increase load_stats refresh so that it does not race with the
|
|
# topology coordinator updating load_stats during end_migration
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
|
|
|
|
# Wait for the current load_stats refresh interval to elapse
|
|
await asyncio.sleep(2)
|
|
|
|
# Increase RF to trigger a tablet rebuild
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
|
|
# Check that the new tablet size was added to the host in rack3
|
|
rows = await get_tablet_sizes(table_id)
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
|
|
assert len(replica_hosts) == 2
|
|
assert s0_host_id in replica_hosts and s1_host_id in replica_hosts, "Tablet size was added to load_stats after rebuild"
|
|
|
|
async def test_update_load_stats_after_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
])
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def get_tablet_sizes(table):
|
|
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
|
|
|
|
# Disable load balancing to avoid the balancer moving the tablet from a node with less to a node with more
|
|
# available disk space. Otherwise, the move_tablet API can fail (if the tablet is already in transisiton) or
|
|
# be a no-op (in case the tablet has already been migrated)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
|
|
# Wait for the coordinator to refresh load_stats
|
|
while True:
|
|
rows = await get_tablet_sizes(table_id)
|
|
if len(rows) == 1:
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
if s0_host_id in replica_hosts or s1_host_id in replica_hosts:
|
|
break
|
|
|
|
# Increase load_stats refresh so that it does not race with the
|
|
# topology coordinator updating load_stats during end_migration
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
|
|
|
|
# Wait for the current load_stats refresh interval to elapse
|
|
await asyncio.sleep(2)
|
|
|
|
# Migrate a tablet between nodes in rack1
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
leaving_replica = replicas[0].replicas[0]
|
|
pending_replica = (s0_host_id if leaving_replica[0] == s1_host_id else s1_host_id, 0)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *leaving_replica, *pending_replica, 0)
|
|
|
|
# Check that the new tablet size was moved from leaving to pending host
|
|
rows = await get_tablet_sizes(table_id)
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
|
|
logger.info(f'replica_hosts: {replica_hosts}')
|
|
assert leaving_replica[0] not in replica_hosts, "Leaving replica tablet size is not in load_stats any more"
|
|
assert pending_replica[0] in replica_hosts, "Pending replica tablet size is in load_stats"
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_crash_on_missing_table_from_load_stats(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--smp', '2',
|
|
]
|
|
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
|
|
|
|
# Make sure load_stats has been refreshed and that the coordinator has cached load_stats
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
await wait_for_valid_load_stats(cql, table_id)
|
|
|
|
# Kill the non-coordinator node
|
|
await manager.server_stop_gracefully(servers[1].server_id)
|
|
|
|
# Drop the table; this leaves the table size in the cached load_stats on the coordinator
|
|
await cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
# Wait for the next load_stats refresh
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
await s0_log.wait_for('raft topology: Refreshed table load stats for all DC', from_mark=s0_mark)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablets_barrier_waits_for_replica_erms(manager: ManagerClient):
|
|
"""
|
|
The test verifies that tablet replicas hold ERMS while processing requests,
|
|
and that the tablet's global barrier waits for all replicas to acknowledge it.
|
|
To do this, the test starts a read request and makes it hang on the
|
|
`replica_query_wait` injection, then initiates tablet migration. Finally, it
|
|
checks that the tablet's global barrier waits for the replica handling that
|
|
request to complete.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--range-request-timeout-in-ms', '1000', # shorten time coordinator abandon the request, releasing erm
|
|
'--read-request-timeout-in-ms', '1000',
|
|
'--abort-on-internal-error', 'true',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "replica_query_wait", one_shot=False, parameters={"table": "test"})
|
|
|
|
replica_query = cql.run_async(f"SELECT * from {ks}.test where pk={key} BYPASS CACHE", host=hosts[1])
|
|
await s0_log.wait_for('replica_query_wait: waiting', from_mark=s0_mark)
|
|
|
|
version_before_move = await get_topology_version(cql, hosts[0])
|
|
|
|
s0_mark = await s0_log.mark()
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
# migration should proceed once replica query times out on coordinator, causing it to be abandoned
|
|
new_version = version_before_move + 1
|
|
await s0_log.wait_for(re.escape(
|
|
f"Got raft_topology_cmd::barrier_and_drain, version {new_version}, "
|
|
f"current version {new_version}, "
|
|
f"stale versions (version: use_count): {{{version_before_move}: 1}}"),
|
|
from_mark=s0_mark)
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, "replica_query_wait")
|
|
await manager.api.disable_injection(servers[0].ip_addr, "replica_query_wait")
|
|
|
|
# swallow exception of timed out read.
|
|
try:
|
|
await replica_query
|
|
except:
|
|
pass
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("repair_before_split", [False, True])
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--logger-log-level', 'compaction=debug',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 2
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
expected_tablet_count = 4 # expected tablet count post split
|
|
|
|
async def run_split_prepare():
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
|
|
|
# force split on the test table
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
|
|
|
|
async def generate_repair_work():
|
|
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
|
insert_stmt.consistency_level = ConsistencyLevel.ONE
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
|
|
pks = range(256, 512)
|
|
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
|
|
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
|
|
|
|
token = 'all'
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
if repair_before_split:
|
|
await generate_repair_work()
|
|
for server in servers:
|
|
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
|
|
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
|
|
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
|
|
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
|
|
|
|
await run_split_prepare()
|
|
|
|
for server in servers:
|
|
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
|
|
await repair_task
|
|
else:
|
|
await run_split_prepare()
|
|
await generate_repair_work()
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
|
|
|
|
async def finished_splitting():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count >= expected_tablet_count or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120)
|
|
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--logger-log-level', 'compaction=debug',
|
|
'--smp', '2',
|
|
]
|
|
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
|
|
server = servers[0]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
mark = await log.mark()
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
host_id = await manager.get_host_id(server.server_id)
|
|
src_shard = replica[1]
|
|
|
|
# if tablet replica is at shard 0, move it to shard 1.
|
|
if src_shard == 0:
|
|
dst_shard = 1
|
|
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
|
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
|
|
|
|
# force split on the test table
|
|
expected_tablet_count = initial_tablets * 2
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
# Check that shard 0 ACKed split.
|
|
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
|
|
|
|
# Move tablet replica back to shard 0, where split was already ACKed.
|
|
src_shard = 1
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
|
|
|
|
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
|
|
|
|
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
|
|
|
|
await migration_task
|
|
|
|
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
|
|
|
|
async def finished_splitting():
|
|
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
return tablet_count >= expected_tablet_count or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120) |