Run keyspace compaction asynchronously in `test_tombstone_gc_correctness_during_tablet_split` and only await it after `split_sstable_rewrite` is disabled. The problem is that `keyspace_compaction()` starts with a flush, and that flush can take around five seconds. During that window the split compaction is stopped before major compaction is retried. The stop aborts the in-flight major compaction attempt, then the split proceeds far enough to enter the `split_sstable_rewrite` injection point. At that point the test used to wait synchronously for major compaction to finish, but major compaction cannot finish yet: when it retries, it needs the same semaphore that is still effectively tied up behind the blocked split rewrite. So the test waits for major compaction, while the split waits for the injection to be released, and the code that would release that injection never runs. Starting major compaction as a task breaks that cycle. The test can first disable `split_sstable_rewrite`, let the split get out of the way, and only then wait for major compaction to complete. Fixes https://scylladb.atlassian.net/browse/SCYLLADB-827. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Closes scylladb/scylladb#29066
2301 lines
107 KiB
Python
2301 lines
107 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
from typing import Any
|
|
from cassandra.query import ConsistencyLevel, SimpleStatement
|
|
from cassandra.policies import FallthroughRetryPolicy
|
|
|
|
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
|
|
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
|
|
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
|
|
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace, get_topology_version
|
|
from test.cqlpy.cassandra_tests.validation.entities.secondary_index_test import dotestCreateAndDropIndex
|
|
|
|
import pytest
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
import random
|
|
import os
|
|
import glob
|
|
from collections import defaultdict
|
|
from collections.abc import Iterable
|
|
from contextlib import asynccontextmanager
|
|
import itertools
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def inject_error_one_shot_on(manager, error_name, servers):
|
|
errs = [inject_error_one_shot(manager.api, s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
|
|
async def inject_error_on(manager, error_name, servers):
|
|
errs = [manager.api.enable_injection(s.ip_addr, error_name, False) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def disable_injection_on(manager, error_name, servers):
|
|
errs = [manager.api.disable_injection(s.ip_addr, error_name) for s in servers]
|
|
await asyncio.gather(*errs)
|
|
|
|
async def safe_server_stop_gracefully(manager, server_id, timeout: float = 60, reconnect: bool = False):
|
|
# Explicitly close the driver to avoid reconnections if scylla fails to update gossiper state on shutdown.
|
|
# It's a problem until https://github.com/scylladb/scylladb/issues/15356 is fixed.
|
|
manager.driver_close()
|
|
await manager.server_stop_gracefully(server_id, timeout)
|
|
cql = None
|
|
if reconnect:
|
|
cql = await reconnect_driver(manager)
|
|
return cql
|
|
|
|
async def safe_rolling_restart(manager, servers, with_down):
|
|
# https://github.com/scylladb/python-driver/issues/230 is not fixed yet, so for sake of CI stability,
|
|
# driver must be reconnected after rolling restart of servers.
|
|
await manager.rolling_restart(servers, with_down)
|
|
cql = await reconnect_driver(manager)
|
|
return cql
|
|
|
|
async def wait_for_valid_load_stats(cql, table_id, timeout=120):
|
|
started = time.time()
|
|
# Wait until the given table has no missing tablet sizes
|
|
while True:
|
|
missing_cnt = 0
|
|
found_cnt = 0
|
|
for r in await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table_id};"):
|
|
found_cnt += 1
|
|
if len(r.missing_replicas) > 0:
|
|
missing_cnt += 1
|
|
|
|
if missing_cnt == 0 and found_cnt > 0:
|
|
break
|
|
|
|
assert time.time() - started < timeout, "Timed out while waiting for valid load_stats"
|
|
|
|
await asyncio.sleep(0.2)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_metadata_propagates_with_schema_changes_in_snapshot_mode(manager: ManagerClient):
|
|
"""Test that you can create a table and insert and query data"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_proxy=trace',
|
|
'--logger-log-level', 'cql_server=trace',
|
|
'--logger-log-level', 'query_processor=trace',
|
|
'--logger-log-level', 'gossip=trace',
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'raft_topology=trace',
|
|
'--logger-log-level', 'messaging_service=trace',
|
|
'--logger-log-level', 'rpc=trace',
|
|
]
|
|
servers = await manager.servers_add(3, cmdline=cmdline, auto_rack_dc="dc1")
|
|
|
|
s0 = servers[0].server_id
|
|
not_s0 = servers[1:]
|
|
|
|
# s0 should miss schema and tablet changes
|
|
cql = await safe_server_stop_gracefully(manager, s0, reconnect=True)
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 100}") as ks:
|
|
# force s0 to catch up later from the snapshot and not the raft log
|
|
await inject_error_one_shot_on(manager, 'raft_server_force_snapshot', not_s0)
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(10)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 1);") for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(list(rows)) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 1
|
|
|
|
manager.driver_close()
|
|
await manager.server_start(s0, wait_others=2)
|
|
await manager.driver_connect(server=servers[0])
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
# Trigger a schema change to invoke schema agreement waiting to make sure that s0 has the latest schema
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as test_dummy:
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 2);", execution_profile='whitelist')
|
|
for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 2
|
|
|
|
conn_logger = logging.getLogger("conn_messages")
|
|
conn_logger.setLevel(logging.DEBUG)
|
|
try:
|
|
# Check that after rolling restart the tablet metadata is still there
|
|
await manager.rolling_restart(servers)
|
|
|
|
cql = await reconnect_driver(manager)
|
|
|
|
await wait_for_cql_and_get_hosts(cql, [servers[0]], time.time() + 60)
|
|
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, 3);", execution_profile='whitelist')
|
|
for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == 3
|
|
finally:
|
|
conn_logger.setLevel(logging.INFO)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_scans(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
servers = await manager.servers_add(3)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
rows = await cql.run_async(f"SELECT count(*) FROM {ks}.test;")
|
|
assert rows[0].count == len(keys)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_table_drop_with_auto_snapshot(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cfg = { 'auto_snapshot': True }
|
|
servers = await manager.servers_add(3, config = cfg)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
# Increases the chance of tablet migration concurrent with schema change
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
for i in range(3):
|
|
await cql.run_async("DROP KEYSPACE IF EXISTS test;")
|
|
await cql.run_async("CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 8 };")
|
|
await cql.run_async("CREATE TABLE IF NOT EXISTS test.tbl_sample_kv (id int, value text, PRIMARY KEY (id));")
|
|
await cql.run_async("INSERT INTO test.tbl_sample_kv (id, value) VALUES (1, 'ala');")
|
|
|
|
await cql.run_async("DROP KEYSPACE test;")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_topology_changes(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
servers = await manager.servers_add(3)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 32}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
expected_rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert rows == expected_rows
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await inject_error_on(manager, "tablet_allocator_shuffle", servers)
|
|
|
|
logger.info("Adding new server")
|
|
await manager.server_add()
|
|
|
|
await check()
|
|
|
|
logger.info("Adding new server")
|
|
await manager.server_add()
|
|
|
|
await check()
|
|
time.sleep(5) # Give load balancer some time to do work
|
|
await check()
|
|
|
|
await manager.decommission_node(servers[0].server_id)
|
|
|
|
await check()
|
|
|
|
async def get_two_servers_to_move_tablet(manager: ManagerClient):
|
|
"""
|
|
The first server in servers list is source node to move the tablet from. The second server is the dest node.
|
|
"""
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = ['--enable-file-stream', 'false']
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
logger.info(f'{replica=}')
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Make sure servers[0] is the source node to move the tablet from
|
|
if replica[0] != s0_host_id:
|
|
servers.reverse()
|
|
s0_host_id, s1_host_id = s1_host_id, s0_host_id
|
|
|
|
dst_shard = 0
|
|
|
|
return (servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_rx_error_no_failed_message_with_fail_stream_plan(manager: ManagerClient):
|
|
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=30))
|
|
|
|
await s1_log.wait_for('stream_manager: Failed stream_session for stream_plan', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.disable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error")
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# Sanity test
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
await cql.run_async(f"TRUNCATE {ks}.test")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that there is no data resurrection
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that moving the tablet back works
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_rx_error_no_failed_message_no_fail_stream_plan_hang(manager: ManagerClient):
|
|
servers, cql, s0_host_id, s1_host_id, replica, tablet_token, dst_shard, ks = await get_two_servers_to_move_tablet(manager)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_session_ignore_failed_message", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_rx_error", one_shot=True)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments_skip_fail_stream_plan", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token, timeout=10))
|
|
|
|
try:
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
assert False # The move tablet is not supposed to finish
|
|
except TimeoutError:
|
|
logger.info("Migration timeout as expected")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_streaming_is_guarded_by_topology_guard(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=trace',
|
|
'--logger-log-level', 'raft_topology=trace',
|
|
'--enable-file-stream', 'false',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
# Wait for the replica-side writer of streaming to reach a place where it already
|
|
# received writes from the leaving replica but haven't applied them yet.
|
|
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
|
|
# The place we block the writer in should not hold to erm or topology_guard because that will block the migration
|
|
# below and prevent test from proceeding.
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Should cause streaming to fail and be retried while leaving behind the replica-side writer.
|
|
await manager.api.inject_disconnect(servers[0].ip_addr, servers[1].ip_addr)
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
# Sanity test
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
await cql.run_async(f"TRUNCATE {ks}.test")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Release abandoned streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
|
|
|
|
# Verify that there is no data resurrection
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
# Verify that moving the tablet back works
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", s1_host_id, dst_shard, replica[0], replica[1], tablet_token)
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_table_dropped_during_streaming(manager: ManagerClient):
|
|
"""
|
|
Verifies that load balancing recovers when table is dropped during streaming phase of tablet migration.
|
|
Recovering means that state machine is not stuck and later migrations can proceed.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
servers = [await manager.server_add()]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test2 (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
logger.info("Populating tables")
|
|
key = 7 # Whatever
|
|
value = 3 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, {value})")
|
|
await cql.run_async(f"INSERT INTO {ks}.test2 (pk, c) VALUES ({key}, {value})")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test2")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Starting tablet migration")
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token))
|
|
|
|
# Wait for the replica-side writer of streaming to reach a place where it already
|
|
# received writes from the leaving replica but haven't applied them yet.
|
|
# Once the writer reaches this place, it will wait for the message_injection() call below before proceeding.
|
|
# We want to drop the table while streaming is deep in the process, where it will attempt to apply writes
|
|
# to the dropped table.
|
|
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
|
|
|
|
# Streaming blocks table drop, so we can't wait here.
|
|
drop_task = cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
# Release streaming as late as possible to increase probability of drop causing problems.
|
|
await s1_log.wait_for('Dropping', from_mark=s1_mark)
|
|
|
|
# Unblock streaming
|
|
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
|
|
await drop_task
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
try:
|
|
await migration_task
|
|
except HTTPError as e:
|
|
assert 'Tablet map not found' in e.message
|
|
|
|
logger.info("Verifying that moving the other tablet works")
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
assert replica[0] == s0_host_id
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test2", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
|
|
|
logger.info("Verifying tablet replica")
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test2', tablet_token)
|
|
assert replica == (s1_host_id, 0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_cleanup(manager: ManagerClient):
|
|
cmdline = ['--smp=2', '--commitlog-sync=batch']
|
|
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Populate table")
|
|
cql = manager.get_cql()
|
|
n_tablets = 32
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(1000)])
|
|
|
|
logger.info("Start second node")
|
|
servers.append(await manager.server_add())
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Read system.tablets")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == n_tablets
|
|
|
|
# Randomly select half of all tablets.
|
|
sample = random.sample(tablet_replicas, n_tablets // 2)
|
|
moved_tokens = [x.last_token for x in sample]
|
|
moved_src = [x.replicas[0] for x in sample]
|
|
moved_dst = [(s1_host_id, random.choice([0, 1])) for _ in sample]
|
|
|
|
# Migrate the selected tablets to second node.
|
|
logger.info("Migrate half of all tablets to second node")
|
|
for t, s, d in zip(moved_tokens, moved_src, moved_dst):
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
|
|
|
|
# Sanity check. All data we inserted should be still there.
|
|
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Wipe data on second node.
|
|
logger.info("Wipe data on second node")
|
|
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
|
|
await manager.server_wipe_sstables(servers[1].server_id, ks, "test")
|
|
await manager.server_start(servers[1].server_id)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
partitions_after_loss = (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
assert partitions_after_loss < n_partitions
|
|
|
|
# Migrate all tablets back to their original position.
|
|
# Check that this doesn't resurrect cleaned data.
|
|
logger.info("Migrate the migrated tablets back")
|
|
for t, s, d in zip(moved_tokens, moved_dst, moved_src):
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *s, *d, t)
|
|
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Kill and restart first node.
|
|
# Check that this doesn't resurrect cleaned data.
|
|
logger.info("Brutally restart first node")
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
assert partitions_after_loss == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
# Bonus: check that commitlog_cleanups doesn't have any garbage after restart.
|
|
assert 0 == (await cql.run_async("SELECT COUNT(*) FROM system.commitlog_cleanups", host=hosts[0]))[0].count
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_cleanup_failure(manager: ManagerClient):
|
|
cmdline = ['--smp=1']
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
# Disable load balancing, so that after the move_tablet API the load balancer does
|
|
# not attempt to migrate the tablet back to the first node.
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
n_tablets = 1
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await inject_error_one_shot_on(manager, "tablet_cleanup_failure", servers)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
tablet_token = 0
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for injected cleanup failure...")
|
|
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for cleanup success on retry...")
|
|
await s0_log.wait_for(f'Cleaned up tablet .* of table {ks}.test successfully.', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for cleanup success on retry...")
|
|
await s0_log.wait_for('updating topology state: Finished tablet migration', from_mark=s0_mark)
|
|
|
|
logger.info("Waiting for migration task...")
|
|
await migration_task
|
|
|
|
assert n_partitions == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test"))[0].count
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
ssts = glob.glob(os.path.join(table_dir, "*-Data.db"))
|
|
logger.info("Guarantee source node of migration left no sstables undeleted")
|
|
assert len(ssts) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_resharding(manager: ManagerClient):
|
|
cmdline = ['--smp=3']
|
|
config = {'tablets_mode_for_new_keyspaces': 'enabled'}
|
|
servers = await manager.servers_add(1, cmdline=cmdline)
|
|
server = servers[0]
|
|
|
|
logger.info("Populate table")
|
|
cql = manager.get_cql()
|
|
n_tablets = 32
|
|
n_partitions = 1000
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await manager.server_stop_gracefully(server.server_id, timeout=120)
|
|
await manager.server_update_cmdline(server.server_id, ['--smp=2'])
|
|
|
|
await manager.server_start(
|
|
server.server_id,
|
|
expected_error="Detected a tablet with invalid replica shard, reducing shard count with tablet-enabled tables is not yet supported. Replace the node instead.")
|
|
|
|
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_split(manager: ManagerClient, injection_error: str):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '1024',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
# enough to trigger multiple splits with max size of 1024 bytes.
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
# Increases the chance of tablet migration concurrent with split
|
|
await inject_error_one_shot_on(manager, "tablet_allocator_shuffle", servers)
|
|
await inject_error_on(manager, "tablet_load_stats_refresh_before_rebalancing", servers)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
# Now there's a split and migration need, so they'll potentially run concurrently.
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await check()
|
|
time.sleep(5) # Give load balancer some time to do work
|
|
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
await check()
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_correctness_of_tablet_split_finalization_after_restart(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '1024',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
servers.append(await manager.server_add(config={
|
|
'error_injections_at_startup': ['delay_split_compaction']
|
|
}, cmdline=cmdline))
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH compaction = {{'class': 'NullCompactionStrategy'}};")
|
|
|
|
# enough to trigger multiple splits with max size of 1024 bytes.
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 2
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await s1_log.wait_for('Finalizing resize decision for table', from_mark=s1_mark)
|
|
|
|
# Delays refresh of tablet stats, so balancer works with whichever it got last.
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing")
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 60)
|
|
time.sleep(1)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
await manager.server_stop_gracefully(servers[1].server_id, timeout=120)
|
|
await manager.server_start(servers[1].server_id)
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 2
|
|
|
|
await check()
|
|
|
|
@pytest.mark.parametrize("injection_error", ["foreach_compaction_group_wait", "major_compaction_wait"])
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_tablet_migration_and_major(manager: ManagerClient, injection_error):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = []
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check():
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await check()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
logger.info("Started major compaction")
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
|
|
t = tablet_replicas[0]
|
|
logger.info("Migrating tablet")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
if injection_error == "major_compaction_wait":
|
|
logger.info("Check that major was successfully aborted on migration")
|
|
await s1_log.wait_for(f"Compaction for {ks}/test was stopped due to: tablet cleanup", from_mark=s1_mark)
|
|
|
|
await check()
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_table_drop_and_major(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
injection_error = "major_compaction_wait"
|
|
cmdline = ['--logger-log-level', 'compaction_manager=debug',]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, injection_error, one_shot=True)
|
|
logger.info("Started major compaction")
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
await s1_log.wait_for(f"{injection_error}: waiting", from_mark=s1_mark)
|
|
|
|
logger.info("Dropping table")
|
|
await cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, injection_error)
|
|
await s1_log.wait_for(f"{injection_error}: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
if injection_error == "major_compaction_wait":
|
|
logger.info("Check that major was successfully aborted on migration")
|
|
await s1_log.wait_for(f"ongoing compactions for table {ks}.test .* due to table removal", from_mark=s1_mark)
|
|
|
|
async def assert_tablet_count_metric_value_for_shards(manager: ManagerClient, server: ServerInfo, expected_count_per_shard: list[int]):
|
|
tablet_count_metric_name = "scylla_tablets_count"
|
|
metrics = await manager.metrics.query(server.ip_addr)
|
|
for shard_id in range(0, len(expected_count_per_shard)):
|
|
expected_tablet_count = expected_count_per_shard[shard_id]
|
|
tablet_count = metrics.get(tablet_count_metric_name, {'shard':str(shard_id)})
|
|
assert int(tablet_count) == expected_tablet_count
|
|
|
|
async def get_tablet_tokens_from_host_on_shard(manager: ManagerClient, server: ServerInfo, keyspace_name: str, table_name: str, shard: int) -> list[int]:
|
|
host = await manager.get_host_id(server.server_id)
|
|
table_tablets = await get_all_tablet_replicas(manager, server, keyspace_name, table_name)
|
|
tokens = []
|
|
for tablet_replica in table_tablets:
|
|
for host_id, shard_id in tablet_replica.replicas:
|
|
if host_id == host and shard_id == shard:
|
|
tokens.append(tablet_replica.last_token)
|
|
return tokens
|
|
|
|
async def get_tablet_count_per_shard_for_host(manager: ManagerClient, server: ServerInfo, full_tables: dict[str, list[str]], shards_count: int = 2) -> list[int]:
|
|
dict_result = await get_tablet_count_per_shard_for_hosts(manager, [server], full_tables, shards_count)
|
|
return dict_result[server.server_id]
|
|
|
|
async def get_tablet_count_per_shard_for_hosts(manager: ManagerClient, servers: Iterable[ServerInfo], full_tables: dict[str, list[str]], shards_per_node: int = 2) -> dict[ServerNum, list[int]]:
|
|
result = dict[ServerNum, list[int]]()
|
|
hosts = dict[HostID, ServerNum]()
|
|
server1 = None
|
|
|
|
for server in servers:
|
|
if not server1:
|
|
server1 = server
|
|
result[server.server_id] = [0] * shards_per_node
|
|
hosts[await manager.get_host_id(server.server_id)] = server.server_id
|
|
|
|
for keyspace, tables in full_tables.items():
|
|
for table in tables:
|
|
table_tablets = await get_all_tablet_replicas(manager, server1, keyspace, table)
|
|
for tablet_replica in table_tablets:
|
|
for host_id, shard_id in tablet_replica.replicas:
|
|
if host_id in hosts:
|
|
result[hosts[host_id]][shard_id] += 1
|
|
|
|
return result
|
|
|
|
def get_shard_that_has_tablets(tablet_count_per_shard: list[int]) -> int:
|
|
for shard_id in range(0, len(tablet_count_per_shard)):
|
|
if tablet_count_per_shard[shard_id] > 0:
|
|
return shard_id
|
|
return -1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_count_metric_per_shard(manager: ManagerClient):
|
|
# Given two running servers
|
|
shards_count = 4
|
|
cmdline = ['--smp=4']
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
# And given disabled load balancing
|
|
await manager.disable_tablet_balancing()
|
|
|
|
# When two tables are created
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable2 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable2"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# When third table is created
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable3 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable2", "mytable3"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# When one of tables is dropped
|
|
await cql.run_async(f"DROP TABLE {ks}.mytable2;")
|
|
|
|
# Then tablet count metric for each shard depicts the actual state
|
|
tables = { ks: ["mytable1", "mytable3"] }
|
|
expected_count_per_shard_for_host_0 = await get_tablet_count_per_shard_for_host(manager, servers[0], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[0], expected_count_per_shard_for_host_0)
|
|
|
|
expected_count_per_shard_for_host_1 = await get_tablet_count_per_shard_for_host(manager, servers[1], tables, shards_count)
|
|
await assert_tablet_count_metric_value_for_shards(manager, servers[1], expected_count_per_shard_for_host_1)
|
|
|
|
# And when moving tablets from one shard of src_host to (dest_host, shard_3)
|
|
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_0)
|
|
if shard_id_to_move != -1:
|
|
src_server = servers[0]
|
|
dest_server = servers[1]
|
|
src_expected_count_per_shard = expected_count_per_shard_for_host_0
|
|
dest_expected_count_per_shard = expected_count_per_shard_for_host_1
|
|
else:
|
|
shard_id_to_move = get_shard_that_has_tablets(expected_count_per_shard_for_host_1)
|
|
src_server = servers[1]
|
|
dest_server = servers[0]
|
|
src_expected_count_per_shard = expected_count_per_shard_for_host_1
|
|
dest_expected_count_per_shard = expected_count_per_shard_for_host_0
|
|
|
|
|
|
tokens_on_shard_to_move = {
|
|
"mytable1" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable1", shard_id_to_move),
|
|
"mytable3" : await get_tablet_tokens_from_host_on_shard(manager, src_server, ks, "mytable3", shard_id_to_move)
|
|
}
|
|
|
|
count_of_tokens_on_src_shard_to_move = len(tokens_on_shard_to_move["mytable1"]) + len(tokens_on_shard_to_move["mytable3"])
|
|
assert count_of_tokens_on_src_shard_to_move > 0
|
|
|
|
src_host_id = await manager.get_host_id(src_server.server_id)
|
|
dest_host_id = await manager.get_host_id(dest_server.server_id)
|
|
for table_name, tokens in tokens_on_shard_to_move.items():
|
|
for token in tokens:
|
|
await manager.api.move_tablet(node_ip=src_server.ip_addr, ks=ks, table=table_name, src_host=src_host_id, src_shard=shard_id_to_move, dst_host=dest_host_id, dst_shard=3, token=token)
|
|
|
|
# And when ensuring that local tablet metadata on the queried node reflects the finalized tablet movement
|
|
await read_barrier(manager.api, servers[0].ip_addr)
|
|
await read_barrier(manager.api, servers[1].ip_addr)
|
|
|
|
# Then tablet count metric is adjusted to depict that situation on src_host - all tablets from selected shard have been moved
|
|
src_expected_count_per_shard[shard_id_to_move] = 0
|
|
await assert_tablet_count_metric_value_for_shards(manager, src_server, src_expected_count_per_shard)
|
|
|
|
# And then tablet count metric is increased on dest_host - tablets have been moved to shard_3
|
|
dest_expected_count_per_shard[3] += count_of_tokens_on_src_shard_to_move
|
|
await assert_tablet_count_metric_value_for_shards(manager, dest_server, dest_expected_count_per_shard)
|
|
|
|
@pytest.mark.parametrize("primary_replica_only", [False, True])
|
|
async def test_tablet_load_and_stream(manager: ManagerClient, primary_replica_only):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--smp', '1',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def create_table(tablet_count : int) -> str:
|
|
# Creates multiple tablets in the same shard
|
|
ks_name = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" \
|
|
f" AND tablets = {{ 'initial': {tablet_count} }};")
|
|
await cql.run_async(f"CREATE TABLE {ks_name}.test (pk int PRIMARY KEY, c int);")
|
|
return ks_name
|
|
|
|
ks = await create_table(5) # 5 is rounded up to next power-of-two
|
|
|
|
# Populate tablets
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check(ks_name: str):
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
await check(ks)
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
|
|
ks2 = await create_table(16)
|
|
|
|
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
|
|
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
|
|
dst_table_dir = glob.glob(os.path.join(node_workdir, "data", ks2, "test-*"))[0]
|
|
logger.info(f"Dst table dir: {dst_table_dir}")
|
|
|
|
def move_sstables_to_upload(table_dir: str, dst_table_dir: str):
|
|
logger.info("Moving sstables to upload dir of destination table")
|
|
table_upload_dir = os.path.join(dst_table_dir, "upload")
|
|
moved_files = []
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
moved_files.append(dst_path)
|
|
return moved_files
|
|
|
|
moved_sstable_files = move_sstables_to_upload(table_dir, dst_table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Adding new server")
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
# Trigger concurrent migration and load-and-stream
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await manager.api.load_new_sstables(servers[0].ip_addr, ks2, "test", primary_replica_only)
|
|
|
|
time.sleep(1)
|
|
|
|
await check(ks2)
|
|
|
|
logger.info("Checking that streamed SSTables are deleted from upload directory")
|
|
remaining_files = []
|
|
for file_path in moved_sstable_files:
|
|
if os.path.exists(file_path):
|
|
logger.info(f"SSTable file still exists: {file_path}")
|
|
remaining_files.append(file_path)
|
|
|
|
if remaining_files:
|
|
raise AssertionError(f"SSTable files were not deleted after load_and_stream: {remaining_files}")
|
|
|
|
logger.info("All SSTable files successfully deleted after streaming")
|
|
|
|
await asyncio.gather(*[cql.run_async(f"drop keyspace {i}") for i in [ks, ks2]])
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_storage_service_api_uneven_ownership_keyspace_and_table_params_used(manager: ManagerClient):
|
|
# Given two running servers
|
|
shards_count = 4
|
|
cmdline = ['--smp=4']
|
|
servers = await manager.servers_add(2, cmdline=cmdline)
|
|
|
|
# When table is created with initial tablets set to 1
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.mytable1 (col1 timestamp, col2 text, col3 blob, PRIMARY KEY (col1));")
|
|
|
|
# And when ownership for this table is queried
|
|
actual_ownerships = await manager.api.get_ownership(servers[0].ip_addr, ks, "mytable1")
|
|
|
|
# Then ensure that returned ownerships is 0.0 and 1.0 (which node gets 0.0 and 1.0 is unspecified)
|
|
expected_ips = {servers[0].ip_addr, servers[1].ip_addr}
|
|
expected_ownerships = [0.0, 1.0]
|
|
delta = 0.0001
|
|
already_verified = set()
|
|
|
|
sorted_actual_ownerships = sorted(actual_ownerships, key=lambda e: e["value"])
|
|
assert len(sorted_actual_ownerships) == len(expected_ownerships)
|
|
|
|
for i in range(0, len(sorted_actual_ownerships)):
|
|
entry = sorted_actual_ownerships[i]
|
|
actual_ip = entry["key"]
|
|
actual_ownership = float(entry["value"])
|
|
|
|
assert actual_ip in expected_ips
|
|
assert actual_ip not in already_verified
|
|
assert actual_ownership == pytest.approx(expected_ownerships[i], abs=delta)
|
|
|
|
already_verified.add(actual_ip)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tablet_storage_freeing(manager: ManagerClient):
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add()]
|
|
await manager.disable_tablet_balancing()
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Create a table with two tablets and populate it with a moderate amount of data.")
|
|
n_tablets = 2
|
|
n_partitions = 1000
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v text) WITH compression = {{'sstable_compression': ''}};")
|
|
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, v) VALUES (?, ?);")
|
|
payload = "a"*10000
|
|
|
|
max_concurrency = 100
|
|
for batch in itertools.batched(range(n_partitions), max_concurrency):
|
|
await asyncio.gather(*[cql.run_async(insert_stmt, [k, payload]) for k in batch])
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Start second node.")
|
|
servers.append(await manager.server_add())
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
logger.info("Check the table's disk usage on first node.")
|
|
size_before = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
|
|
assert size_before > n_partitions * len(payload)
|
|
|
|
logger.info("Read system.tablets.")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == n_tablets
|
|
|
|
logger.info("Migrate one of the two tablets from the first node to the second node.")
|
|
t = tablet_replicas[0]
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token)
|
|
|
|
logger.info("Verify that the table's disk usage on first node shrunk by about half.")
|
|
size_after = await manager.server_get_sstables_disk_usage(servers[0].server_id, ks, "test")
|
|
assert size_before * 0.33 < size_after < size_before * 0.66
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_schema_change_during_cleanup(manager: ManagerClient):
|
|
logger.info("Start first node")
|
|
servers = [await manager.server_add()]
|
|
await manager.disable_tablet_balancing()
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
logger.info("Populating table")
|
|
|
|
keys = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Start second node.")
|
|
servers.append(await manager.server_add())
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
await inject_error_on(manager, "delay_tablet_compaction_groups_cleanup", servers)
|
|
|
|
logger.info("Read system.tablets.")
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
assert len(tablet_replicas) == 1
|
|
|
|
logger.info("Migrating one tablet to another node.")
|
|
t = tablet_replicas[0]
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", *t.replicas[0], *(s1_host_id, 0), t.last_token))
|
|
|
|
logger.info("Waiting for log")
|
|
await s1_log.wait_for('Initiating tablet cleanup of', from_mark=s1_mark, timeout=120)
|
|
time.sleep(1)
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH gc_grace_seconds = 0;")
|
|
await migration_task
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tombstone_gc_correctness_during_tablet_split(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
'--target-tablet-size-in-bytes', '5000',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH gc_grace_seconds=0;")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(100)
|
|
|
|
logger.info("Generating sstable with shadowed data")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
logger.info("Generating another sstable with tombstones")
|
|
await asyncio.gather(*[cql.run_async(f"DELETE FROM {ks}.test WHERE pk={k};") for k in keys])
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
async def assert_empty_table():
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await assert_empty_table()
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count == 1
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_load_stats_refresh_before_rebalancing", one_shot=False)
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone", one_shot=False)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
# Waits for tombstones to be expired.
|
|
time.sleep(1)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
|
|
|
|
logger.info("Enable balancing so split will be emitted")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
logger.info("Waits for split of sstable containing expired tombstones")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
|
|
|
|
logger.info("Pause split of sstable containing deleted data")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: waiting", from_mark=s1_mark)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
logger.info("Force compaction of split sstable containing expired tombstone")
|
|
await manager.api.stop_compaction(servers[0].ip_addr, "SPLIT")
|
|
compaction_task = asyncio.create_task(manager.api.keyspace_compaction(servers[0].ip_addr, ks))
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "split_sstable_rewrite")
|
|
await s1_log.wait_for(f"split_sstable_rewrite: released", from_mark=s1_mark)
|
|
await compaction_task
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_split_finalization_postpone")
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
assert tablet_count > 1
|
|
|
|
logger.info("Verify data is not resurrected")
|
|
await assert_empty_table()
|
|
|
|
async def create_cluster(manager: ManagerClient, num_dcs: int, num_racks: int, nodes_per_rack: int, config: dict[str, Any] = None) -> dict[ServerNum, ServerInfo]:
|
|
logger.debug(f"Creating cluster: num_dcs={num_dcs} num_racks={num_racks} nodes_per_rack={nodes_per_rack}")
|
|
servers: dict[ServerNum, ServerInfo] = dict()
|
|
for dc in range(1, num_dcs + 1):
|
|
for rack in range(1, num_racks + 1):
|
|
rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": f"dc{dc}", "rack": f"rack{rack}"})
|
|
for s in rack_servers:
|
|
servers[s.server_id] = s
|
|
logger.debug(f"Created servers={list(servers.values())}")
|
|
return servers
|
|
|
|
|
|
class Context:
|
|
def __init__(self, ks: str, table: str, rf: int, initial_tablets: int, num_keys: int):
|
|
self.ks = ks
|
|
self.table = table
|
|
self.rf = rf
|
|
self.initial_tablets = initial_tablets
|
|
self.num_keys = num_keys
|
|
|
|
|
|
@asynccontextmanager
|
|
async def create_and_populate_table(manager: ManagerClient, rf: int = 3, initial_tablets: int = 64, num_keys: int = 0):
|
|
ks = ""
|
|
table = unique_name()
|
|
if not num_keys:
|
|
num_keys = initial_tablets * 4
|
|
|
|
logger.info(f"Creating table and populating data {ks}.{table}: rf={rf} initial_tablets={initial_tablets} num_keys={num_keys}")
|
|
|
|
cql = manager.get_cql()
|
|
try:
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}} AND tablets = {{'initial': {initial_tablets}}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.{table} (pk int PRIMARY KEY, c int)")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.{table} (pk, c) VALUES ({k}, 1);") for k in range(num_keys)])
|
|
yield Context(ks, table, rf, initial_tablets, num_keys)
|
|
finally:
|
|
await cql.run_async(f"DROP KEYSPACE {ks}")
|
|
|
|
|
|
def get_expected_replicas_per_server(live_servers: Iterable[ServerInfo], dead_servers: Iterable[ServerInfo], initial_tablets: int, rf: int) -> dict[ServerNum, float]:
|
|
total_replicas = initial_tablets * rf
|
|
nodes_per_rack: defaultdict[str, set[ServerNum]] = defaultdict(set)
|
|
for s in live_servers:
|
|
nodes_per_rack[s.rack].add(s.server_id)
|
|
replicas_per_rack = total_replicas / len(nodes_per_rack.keys())
|
|
result: dict[ServerNum, float] = dict()
|
|
for rack_servers in nodes_per_rack.values():
|
|
replicas_per_node = replicas_per_rack / len(rack_servers)
|
|
for id in rack_servers:
|
|
result[id] = replicas_per_node
|
|
for s in dead_servers:
|
|
result[s.server_id] = 0
|
|
return result
|
|
|
|
|
|
def verify_replicas_per_server(desc: str, expected_replicas_per_server: dict[ServerNum, float], tablet_count: dict[ServerNum, list[int]], initial_tablets: int, rf: int, shards_per_node: int = 2):
|
|
logger.debug(f"{desc}: expected_replicas_per_server={expected_replicas_per_server} tablet_count={tablet_count}")
|
|
total = 0
|
|
for server_id, host_replicas in tablet_count.items():
|
|
expected_replicas_per_shard = expected_replicas_per_server[server_id] / shards_per_node
|
|
for i in host_replicas:
|
|
total += i
|
|
assert abs(i - expected_replicas_per_shard) <= 1
|
|
assert total == initial_tablets * rf
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_rack_basic(manager: ManagerClient):
|
|
"""
|
|
Test decommissioning of all nodes in a rack
|
|
when there are enough remaining racks to satisfy
|
|
the replication factor.
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
num_racks = 3
|
|
nodes_per_rack = 2
|
|
rf = num_racks - 1
|
|
|
|
# We need to disable this option to be able to create a keyspace. This can be ditched
|
|
# once we've implemented scylladb/scylladb#23426 and we can add new racks with the option enabled.
|
|
# Then we can create `rf` nodes, create the keyspace, and add another node.
|
|
config = {"rf_rack_valid_keyspaces": False}
|
|
|
|
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack, config)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
decommision_rack = f"rack{num_racks}"
|
|
logger.debug(f"Decommissioning rack={decommision_rack}")
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
for id, s in all_servers.items():
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
await manager.decommission_node(s.server_id)
|
|
dead_servers[id] = s
|
|
else:
|
|
live_servers[id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, live_servers.values(), tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_rack_after_adding_new_rack(manager: ManagerClient):
|
|
"""
|
|
Test decommissioning a rack, after a rack with new nodes is added
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
initial_num_racks = 3
|
|
num_racks = initial_num_racks + 1
|
|
nodes_per_rack = 2
|
|
rf = initial_num_racks
|
|
|
|
# We can't add a new rack if we create a keyspace.
|
|
# Once scylladb/scylladb#23426 has been implemented, this can be ditched.
|
|
config = {"rf_rack_valid_keyspaces": False}
|
|
|
|
initial_servers = await create_cluster(manager, 1, initial_num_racks, nodes_per_rack, config)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.debug("Temporarily disable tablet load balancing")
|
|
await manager.disable_tablet_balancing()
|
|
|
|
logger.info("Add a new rack")
|
|
new_rack = f"rack{num_racks}"
|
|
# copy initial_servers into all_servers, don't just assign it (by reference)
|
|
all_servers: list[ServerInfo] = list(initial_servers.values())
|
|
new_rack_servers = await manager.servers_add(nodes_per_rack, config=config, property_file={"dc": "dc1", "rack": new_rack})
|
|
all_servers.extend(new_rack_servers)
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(initial_servers.values(), new_rack_servers, ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
logger.debug("Reenable tablet load balancing")
|
|
await manager.enable_tablet_balancing()
|
|
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
decommision_rack = f"rack{initial_num_racks}"
|
|
logger.debug(f"Decommissioning rack={decommision_rack}")
|
|
for s in all_servers:
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
await manager.decommission_node(s.server_id)
|
|
dead_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers, tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_decommission_not_enough_racks(manager: ManagerClient):
|
|
"""
|
|
Test that decommissioning a rack fails if the number of rack is
|
|
insufficient to satisfy replication factor, even if the number of
|
|
nodes is sufficient.
|
|
Reproduces https://github.com/scylladb/scylladb/issues/19475
|
|
"""
|
|
logger.info("Bootstrapping multi-rack cluster")
|
|
num_racks = 3
|
|
nodes_per_rack = 2
|
|
rf = num_racks
|
|
|
|
all_servers = await create_cluster(manager, 1, num_racks, nodes_per_rack)
|
|
async with create_and_populate_table(manager, rf=rf) as ctx:
|
|
logger.info("Verify tablet replicas distribution")
|
|
tables = {ctx.ks: [ctx.table]}
|
|
expected_replicas_per_server = get_expected_replicas_per_server(all_servers.values(), [], ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("Before decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
live_servers: dict[ServerNum, ServerInfo] = dict()
|
|
dead_servers: dict[ServerNum, ServerInfo] = dict()
|
|
decommision_rack = f"rack{num_racks}"
|
|
decommision_count = 0
|
|
for s in all_servers.values():
|
|
if s.rack == decommision_rack:
|
|
logger.debug(f"Decommissioning server={s}")
|
|
decommision_count += 1
|
|
expected_error = "Decommission failed" if decommision_count == nodes_per_rack else None
|
|
await manager.decommission_node(s.server_id, expected_error=expected_error)
|
|
if not expected_error:
|
|
dead_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
else:
|
|
live_servers[s.server_id] = s
|
|
|
|
logger.info("Verify tablet replicas distribution")
|
|
expected_replicas_per_server = get_expected_replicas_per_server(live_servers.values(), dead_servers.values(), ctx.initial_tablets, ctx.rf)
|
|
tablet_count = await get_tablet_count_per_shard_for_hosts(manager, all_servers.values(), tables)
|
|
verify_replicas_per_server("After decommission", expected_replicas_per_server, tablet_count, ctx.initial_tablets, ctx.rf)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_cleanup_vs_snapshot_race(manager: ManagerClient):
|
|
cmdline = ['--smp=1']
|
|
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
n_tablets = 1
|
|
n_partitions = 1000
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': {n_tablets}}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY);")
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk) VALUES ({k});") for k in range(n_partitions)])
|
|
|
|
await inject_error_one_shot_on(manager, "tablet_cleanup_failure_post_deletion", servers)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
servers.append(await manager.server_add())
|
|
|
|
tablet_token = 0
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
logger.info("Waiting for injected cleanup failure...")
|
|
await s0_log.wait_for('Cleanup failed for tablet', from_mark=s0_mark)
|
|
|
|
await manager.api.take_snapshot(servers[0].ip_addr, ks, "test_snapshot")
|
|
|
|
# Reproduces assert failure when truncating table, either triggered by DROP TABLE or TRUNCATE.
|
|
# See: https://github.com/scylladb/scylladb/issues/18059
|
|
# It's achieved by migrating a tablet away that contains the highest replay position of a shard,
|
|
# so when drop/truncate happens, the highest replay position will be greater than all the data
|
|
# found in the table (includes data in memtable).
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("operation", ['DROP TABLE', 'TRUNCATE'])
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_drop_table_and_truncate_after_migration(manager: ManagerClient, operation):
|
|
cmdline = [ '--smp=2' ]
|
|
cfg = { 'auto_snapshot': True }
|
|
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND TABLETS = {{'initial': 4}}")
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
tablet_replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
tablet_replicas_in_s0 = list[TabletReplicas]()
|
|
|
|
for replica in tablet_replicas:
|
|
if replica.replicas[0][1] == 0:
|
|
tablet_replicas_in_s0.append(replica)
|
|
|
|
assert len(tablet_replicas_in_s0) == 2
|
|
|
|
target_tablet = tablet_replicas_in_s0[0]
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
|
|
logger.info("Migrating 1st tablet to shard 1")
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *(s0_host_id, 0), *(s0_host_id, 1), target_tablet.last_token)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "truncate_disable_compaction_delay", one_shot=True)
|
|
|
|
logger.info(f"Running {operation} {ks}.test")
|
|
await cql.run_async(f"{operation} {ks}.test")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.nightly
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_truncate_during_topology_change(manager: ManagerClient):
|
|
"""Test truncate operation during topology change."""
|
|
|
|
# Start 3 node cluster
|
|
servers = await manager.servers_add(3, config = { 'enable_tablets': True }, auto_rack_dc="dc1")
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (k int PRIMARY KEY, v int)")
|
|
|
|
logger.info("Populating table")
|
|
stmt = cql.prepare(f"INSERT INTO {ks}.test (k, v) VALUES (?, ?)")
|
|
stmt.consistency_level = ConsistencyLevel.QUORUM
|
|
await asyncio.gather(*(cql.run_async(stmt, [k, k]) for k in range(10000)))
|
|
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
|
|
|
async def truncate_table():
|
|
await asyncio.sleep(10)
|
|
logger.info("Executing truncate during bootstrap")
|
|
await cql.run_async(SimpleStatement(f"TRUNCATE {ks}.test USING TIMEOUT 4m", retry_policy=FallthroughRetryPolicy()))
|
|
|
|
truncate_task = asyncio.create_task(truncate_table())
|
|
logger.info("Adding fourth node")
|
|
new_server = await manager.server_add(config={'error_injections_at_startup': ['delay_bootstrap_120s'], 'enable_tablets': True},
|
|
property_file=servers[0].property_file())
|
|
await truncate_task
|
|
|
|
# Wait for bootstrap completion
|
|
await wait_for_cql_and_get_hosts(cql, servers + [new_server], time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test")
|
|
assert rows[0].count == 0, "Table should be empty after truncation"
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/22040.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_concurrent_schema_change_with_compaction_completion(manager: ManagerClient):
|
|
cmdline = ['--smp=2']
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "sstable_list_builder_delay", one_shot=False)
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
|
table = f"{ks}.test"
|
|
await cql.run_async(f"CREATE TABLE {table} (a int PRIMARY KEY, b int);")
|
|
|
|
stop_compaction = False
|
|
async def background_compaction():
|
|
while stop_compaction == False:
|
|
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
|
|
|
|
compaction_task = asyncio.create_task(background_compaction())
|
|
|
|
for i in range(5):
|
|
dotestCreateAndDropIndex(cql, table, "CamelCase", False)
|
|
dotestCreateAndDropIndex(cql, table, "CamelCase2", True)
|
|
|
|
stop_compaction = True
|
|
await compaction_task
|
|
|
|
async def force_minor_compaction():
|
|
for i in range(4):
|
|
cql.run_async(f"INSERT INTO {ks}.test (a, b) VALUES (1, 1);")
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
|
|
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'TimeWindowCompactionStrategy' }};")
|
|
await force_minor_compaction()
|
|
await cql.run_async(f"ALTER TABLE {table} WITH compaction = {{ 'class' : 'IncrementalCompactionStrategy' }};")
|
|
await force_minor_compaction()
|
|
|
|
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/24153
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_split_correctness_on_tablet_count_change(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--smp', '1', # single cpu is needed to prevent intra-node migration which interacts badly with injection splitting_mutation_writer_switch_wait.
|
|
]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
logger.info(f'server_id = {server.server_id}')
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 2
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# force split on the test table
|
|
expected_tablet_count = 4
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
log_mark = await log.mark()
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
|
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
|
|
|
# needs to skip update of repaired_at on merge, since it stops split task.
|
|
await manager.api.enable_injection(server.ip_addr, "skip_update_repaired_at_for_merge", one_shot=False)
|
|
await manager.api.enable_injection(server.ip_addr, "merge_completion_fiber", one_shot=True)
|
|
|
|
expected_tablet_count = 1
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
# wait for merge to complete
|
|
actual_tablet_count = 0
|
|
started = time.time()
|
|
while expected_tablet_count != actual_tablet_count:
|
|
actual_tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
logger.debug(f'actual/expected tablet count: {actual_tablet_count}/{expected_tablet_count}')
|
|
|
|
assert time.time() - started < 120, 'Timeout while waiting for tablet merge'
|
|
await asyncio.sleep(.1)
|
|
|
|
logger.info(f'Merged test table; new number of tablets: {expected_tablet_count}')
|
|
|
|
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
|
await asyncio.sleep(.1)
|
|
await manager.api.message_injection(server.ip_addr, "merge_completion_fiber")
|
|
|
|
# Reproducer for https://github.com/scylladb/scylladb/issues/26041.
|
|
@pytest.mark.parametrize("primary_replica_only", [False, True])
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablet_load_and_stream_and_split_synchronization(manager: ManagerClient, primary_replica_only):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'table=debug',
|
|
]
|
|
servers = [await manager.server_add(config={
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}, cmdline=cmdline)]
|
|
server = servers[0]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
|
|
initial_tablets = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
keys = range(100)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
|
|
|
async def check(ks_name: str):
|
|
logger.info("Checking table")
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async(f"SELECT * FROM {ks_name}.test BYPASS CACHE;")
|
|
assert len(rows) == len(keys)
|
|
for r in rows:
|
|
assert r.c == r.pk
|
|
|
|
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
|
await check(ks)
|
|
|
|
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
|
|
|
cql = await safe_server_stop_gracefully(manager, servers[0].server_id)
|
|
|
|
table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*"))[0]
|
|
logger.info(f"Table dir: {table_dir}")
|
|
|
|
def move_sstables_to_upload(table_dir: str):
|
|
logger.info("Moving sstables to upload dir")
|
|
table_upload_dir = os.path.join(table_dir, "upload")
|
|
for sst in glob.glob(os.path.join(table_dir, "*-Data.db")):
|
|
for src_path in glob.glob(os.path.join(table_dir, sst.removesuffix("-Data.db") + "*")):
|
|
dst_path = os.path.join(table_upload_dir, os.path.basename(src_path))
|
|
logger.info(f"Moving sstable file {src_path} to {dst_path}")
|
|
os.rename(src_path, dst_path)
|
|
|
|
move_sstables_to_upload(table_dir)
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
cql = manager.get_cql()
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test BYPASS CACHE;")
|
|
assert len(rows) == 0
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "tablet_resize_finalization_post_barrier", one_shot=True)
|
|
|
|
s1_log = await manager.server_open_log(servers[0].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {initial_tablets * 2}}}")
|
|
|
|
await s1_log.wait_for(f"tablet_resize_finalization_post_barrier: waiting", from_mark=s1_mark)
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "stream_mutation_fragments", one_shot=True)
|
|
|
|
load_and_stream_task = asyncio.create_task(manager.api.load_new_sstables(servers[0].ip_addr, ks, "test", primary_replica_only))
|
|
await s1_log.wait_for(f"Loading new SSTables for keyspace", from_mark=s1_mark)
|
|
|
|
await manager.api.message_injection(server.ip_addr, "tablet_resize_finalization_post_barrier")
|
|
await s1_log.wait_for('Detected tablet split for table', from_mark=s1_mark)
|
|
|
|
await s1_log.wait_for(f"stream_mutation_fragments: waiting", from_mark=s1_mark)
|
|
await manager.api.message_injection(server.ip_addr, "stream_mutation_fragments")
|
|
|
|
await load_and_stream_task
|
|
|
|
await check(ks)
|
|
|
|
async def test_update_load_stats_after_rebuild(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack2"},
|
|
])
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def get_tablet_sizes(table):
|
|
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
|
|
# Wait for the coordinator to refresh load_stats
|
|
while True:
|
|
rows = await get_tablet_sizes(table_id)
|
|
if len(rows) == 1 and len(rows[0].replicas) == 1:
|
|
replica_host = [str(u) for u in rows[0].replicas.keys()][0]
|
|
if s0_host_id == replica_host:
|
|
break
|
|
|
|
# Increase load_stats refresh so that it does not race with the
|
|
# topology coordinator updating load_stats during end_migration
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
|
|
|
|
# Wait for the current load_stats refresh interval to elapse
|
|
await asyncio.sleep(2)
|
|
|
|
# Increase RF to trigger a tablet rebuild
|
|
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1', 'rack2']}}")
|
|
|
|
# Check that the new tablet size was added to the host in rack3
|
|
rows = await get_tablet_sizes(table_id)
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
|
|
assert len(replica_hosts) == 2
|
|
assert s0_host_id in replica_hosts and s1_host_id in replica_hosts, "Tablet size was added to load_stats after rebuild"
|
|
|
|
async def test_update_load_stats_after_migration(manager: ManagerClient):
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
]
|
|
|
|
config = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(2, config=config, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
])
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async def get_tablet_sizes(table):
|
|
return await cql.run_async(f"SELECT * FROM system.tablet_sizes WHERE table_id = {table}")
|
|
|
|
# Disable load balancing to avoid the balancer moving the tablet from a node with less to a node with more
|
|
# available disk space. Otherwise, the move_tablet API can fail (if the tablet is already in transisiton) or
|
|
# be a no-op (in case the tablet has already been migrated)
|
|
await manager.disable_tablet_balancing()
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['rack1']}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': 1}};")
|
|
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
|
|
# Wait for the coordinator to refresh load_stats
|
|
while True:
|
|
rows = await get_tablet_sizes(table_id)
|
|
if len(rows) == 1:
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
if s0_host_id in replica_hosts or s1_host_id in replica_hosts:
|
|
break
|
|
|
|
# Increase load_stats refresh so that it does not race with the
|
|
# topology coordinator updating load_stats during end_migration
|
|
await manager.server_update_config(servers[0].server_id, 'tablet_load_stats_refresh_interval_in_seconds', 3600)
|
|
|
|
# Wait for the current load_stats refresh interval to elapse
|
|
await asyncio.sleep(2)
|
|
|
|
# Migrate a tablet between nodes in rack1
|
|
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
|
|
leaving_replica = replicas[0].replicas[0]
|
|
pending_replica = (s0_host_id if leaving_replica[0] == s1_host_id else s1_host_id, 0)
|
|
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", *leaving_replica, *pending_replica, 0)
|
|
|
|
# Check that the new tablet size was moved from leaving to pending host
|
|
rows = await get_tablet_sizes(table_id)
|
|
replica_hosts = set([str(u) for u in rows[0].replicas.keys()])
|
|
|
|
logger.info(f'replica_hosts: {replica_hosts}')
|
|
assert leaving_replica[0] not in replica_hosts, "Leaving replica tablet size is not in load_stats any more"
|
|
assert pending_replica[0] in replica_hosts, "Pending replica tablet size is in load_stats"
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_crash_on_missing_table_from_load_stats(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--smp', '2',
|
|
]
|
|
servers = await manager.servers_add(2, config=cfg, cmdline=cmdline, property_file=[
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
{"dc": "dc1", "rack": "rack1"},
|
|
])
|
|
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int)")
|
|
|
|
# Make sure load_stats has been refreshed and that the coordinator has cached load_stats
|
|
table_id = await manager.get_table_or_view_id(ks, 'test')
|
|
await wait_for_valid_load_stats(cql, table_id)
|
|
|
|
# Kill the non-coordinator node
|
|
await manager.server_stop_gracefully(servers[1].server_id)
|
|
|
|
# Drop the table; this leaves the table size in the cached load_stats on the coordinator
|
|
await cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
# Wait for the next load_stats refresh
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
await s0_log.wait_for('raft topology: Refreshed table load stats for all DC', from_mark=s0_mark)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_tablets_barrier_waits_for_replica_erms(manager: ManagerClient):
|
|
"""
|
|
The test verifies that tablet replicas hold ERMS while processing requests,
|
|
and that the tablet's global barrier waits for all replicas to acknowledge it.
|
|
To do this, the test starts a read request and makes it hang on the
|
|
`replica_query_wait` injection, then initiates tablet migration. Finally, it
|
|
checks that the tablet's global barrier waits for the replica handling that
|
|
request to complete.
|
|
"""
|
|
|
|
logger.info("Bootstrapping cluster")
|
|
cmdline = [
|
|
'--logger-log-level', 'storage_service=debug',
|
|
'--logger-log-level', 'raft_topology=debug',
|
|
'--range-request-timeout-in-ms', '1000', # shorten time coordinator abandon the request, releasing erm
|
|
'--read-request-timeout-in-ms', '1000',
|
|
'--abort-on-internal-error', 'true',
|
|
]
|
|
servers = [await manager.server_add(cmdline=cmdline)]
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
|
|
|
servers.append(await manager.server_add(cmdline=cmdline))
|
|
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
key = 7 # Whatever
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
dst_shard = 0
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "replica_query_wait", one_shot=False, parameters={"table": "test"})
|
|
|
|
replica_query = cql.run_async(f"SELECT * from {ks}.test where pk={key} BYPASS CACHE", host=hosts[1])
|
|
await s0_log.wait_for('replica_query_wait: waiting', from_mark=s0_mark)
|
|
|
|
version_before_move = await get_topology_version(cql, hosts[0])
|
|
|
|
s0_mark = await s0_log.mark()
|
|
migration_task = asyncio.create_task(
|
|
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
|
|
|
|
# migration should proceed once replica query times out on coordinator, causing it to be abandoned
|
|
new_version = version_before_move + 1
|
|
await s0_log.wait_for(re.escape(
|
|
f"Got raft_topology_cmd::barrier_and_drain, version {new_version}, "
|
|
f"current version {new_version}, "
|
|
f"stale versions (version: use_count): {{{version_before_move}: 1}}"),
|
|
from_mark=s0_mark)
|
|
|
|
await manager.api.message_injection(servers[0].ip_addr, "replica_query_wait")
|
|
await manager.api.disable_injection(servers[0].ip_addr, "replica_query_wait")
|
|
|
|
# swallow exception of timed out read.
|
|
try:
|
|
await replica_query
|
|
except:
|
|
pass
|
|
|
|
logger.info("Waiting for migration to finish")
|
|
await migration_task
|
|
logger.info("Migration done")
|
|
|
|
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
|
assert len(list(rows)) == 1
|
|
|
|
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("repair_before_split", [False, True])
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--logger-log-level', 'compaction=debug',
|
|
]
|
|
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 2
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
for server in servers:
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
s0_log = await manager.server_open_log(servers[0].server_id)
|
|
s0_mark = await s0_log.mark()
|
|
s1_log = await manager.server_open_log(servers[1].server_id)
|
|
s1_mark = await s1_log.mark()
|
|
expected_tablet_count = 4 # expected tablet count post split
|
|
|
|
async def run_split_prepare():
|
|
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
|
|
|
# force split on the test table
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
|
|
|
|
async def generate_repair_work():
|
|
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
|
insert_stmt.consistency_level = ConsistencyLevel.ONE
|
|
|
|
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": "test", "what": "throw"})
|
|
pks = range(256, 512)
|
|
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
|
|
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
|
|
|
|
token = 'all'
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
if repair_before_split:
|
|
await generate_repair_work()
|
|
for server in servers:
|
|
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
|
|
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
|
|
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
|
|
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
|
|
|
|
await run_split_prepare()
|
|
|
|
for server in servers:
|
|
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
|
|
await repair_task
|
|
else:
|
|
await run_split_prepare()
|
|
await generate_repair_work()
|
|
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
|
|
|
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
|
|
|
|
async def finished_splitting():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
|
return tablet_count >= expected_tablet_count or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120)
|
|
|
|
await manager.server_stop(servers[0].server_id)
|
|
await manager.server_start(servers[0].server_id)
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
await manager.servers_see_each_other(servers)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'load_balancer=debug',
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--logger-log-level', 'compaction=debug',
|
|
'--smp', '2',
|
|
]
|
|
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
|
|
server = servers[0]
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 1
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
mark = await log.mark()
|
|
|
|
tablet_token = 0 # Doesn't matter since there is one tablet
|
|
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
|
|
|
host_id = await manager.get_host_id(server.server_id)
|
|
src_shard = replica[1]
|
|
|
|
# if tablet replica is at shard 0, move it to shard 1.
|
|
if src_shard == 0:
|
|
dst_shard = 1
|
|
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
|
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
|
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
|
|
|
|
# force split on the test table
|
|
expected_tablet_count = initial_tablets * 2
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
# Check that shard 0 ACKed split.
|
|
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
|
|
|
|
# Move tablet replica back to shard 0, where split was already ACKed.
|
|
src_shard = 1
|
|
dst_shard = 0
|
|
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
|
|
|
|
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
|
|
|
|
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
|
|
|
|
await migration_task
|
|
|
|
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
|
|
|
|
async def finished_splitting():
|
|
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
return tablet_count >= expected_tablet_count or None
|
|
# Give enough time for split to happen in debug mode
|
|
await wait_for(finished_splitting, time.time() + 120)
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_split_stopped_on_shutdown(manager: ManagerClient):
|
|
logger.info('Bootstrapping cluster')
|
|
cfg = { 'enable_tablets': True,
|
|
'tablet_load_stats_refresh_interval_in_seconds': 1
|
|
}
|
|
cmdline = [
|
|
'--logger-log-level', 'debug_error_injection=debug',
|
|
'--smp', '1',
|
|
]
|
|
server = await manager.server_add(cmdline=cmdline, config=cfg)
|
|
|
|
logger.info(f'server_id = {server.server_id}')
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
initial_tablets = 2
|
|
|
|
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
|
|
|
await manager.api.disable_autocompaction(server.ip_addr, ks, 'test')
|
|
|
|
# insert data
|
|
pks = range(256)
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
|
|
|
# flush the table
|
|
await manager.api.flush_keyspace(server.ip_addr, ks)
|
|
|
|
# force split on the test table
|
|
expected_tablet_count = 4
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
|
|
|
log = await manager.server_open_log(server.server_id)
|
|
log_mark = await log.mark()
|
|
|
|
await manager.api.enable_injection(server.ip_addr, "splitting_mutation_writer_switch_wait", one_shot=True)
|
|
await manager.api.enable_injection(server.ip_addr, "storage_service_drain_wait", one_shot=True)
|
|
await manager.enable_tablet_balancing()
|
|
|
|
await log.wait_for('Emitting resize decision of type split', from_mark=log_mark)
|
|
await log.wait_for('splitting_mutation_writer_switch_wait: waiting', from_mark=log_mark)
|
|
|
|
log_mark = await log.mark()
|
|
|
|
shutdown_task = asyncio.create_task(manager.server_stop_gracefully(server.server_id))
|
|
|
|
await log.wait_for('Stopping.*ongoing compactions')
|
|
await manager.api.message_injection(server.ip_addr, "splitting_mutation_writer_switch_wait")
|
|
|
|
await log.wait_for('storage_service_drain_wait: waiting', from_mark=log_mark)
|
|
await log.wait_for('Failed to complete splitting of table', from_mark=log_mark)
|
|
|
|
await manager.api.message_injection(server.ip_addr, "storage_service_drain_wait")
|
|
|
|
await shutdown_task
|
|
|
|
errors = await log.grep_for_errors(from_mark=log_mark)
|
|
assert errors == []
|
|
|
|
await manager.server_start(server.server_id)
|
|
await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
|
|
|
await log.wait_for('Detected tablet split for table', from_mark=log_mark)
|
|
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
|
assert tablet_count >= expected_tablet_count
|