This method does two things -- checks that the data is indeed back, and validates streaming directions. The latter is not quite about "data is back", so better to have it as explicit dedicated method. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
207 lines
9.0 KiB
Python
207 lines
9.0 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import logging
|
|
import asyncio
|
|
import pytest
|
|
import time
|
|
import random
|
|
import shutil
|
|
import uuid
|
|
from collections import defaultdict
|
|
|
|
from test.pylib.minio_server import MinioServer
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.cluster.object_store.conftest import format_tuples
|
|
from test.cluster.object_store.test_backup import topo, create_cluster, take_snapshot, create_dataset, do_load_sstables, mark_all_logs, check_mutation_replicas, check_streaming_directions
|
|
from test.cluster.util import wait_for_cql_and_get_hosts
|
|
from test.pylib.rest_client import read_barrier
|
|
from test.pylib.util import unique_name
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("topology_rf_validity", [
|
|
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
|
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
|
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
|
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
|
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
|
])
|
|
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
|
|
'''
|
|
Check that refreshing a cluster with stream scopes works
|
|
|
|
This test creates a cluster specified by the topology parameter above,
|
|
configurable number of nodes, tacks, datacenters, and replication factor.
|
|
|
|
It creates a dataset, takes a snapshot and copies the sstables of all nodes to a temporary
|
|
location. It then truncates the table so all sstables are gone, copies all the sstables into
|
|
each node's upload directory, and refreshes the nodes given the scope passed as the test parameter.
|
|
|
|
The test then performs two types of checks:
|
|
1) Check that the data is back in the table by getting all mutations from the nodes and checking
|
|
that a random sample of them contains the expected key and that they are replicated according to RF * DCS factor.
|
|
2) Check that the streaming communication between nodes is as expected according to the scope parameter of the test.
|
|
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
|
|
'''
|
|
|
|
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
|
|
|
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
ks = 'ks'
|
|
cf = 'cf'
|
|
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger, num_keys=10, min_tablet_count=5)
|
|
|
|
# validate replicas assertions hold on fresh dataset
|
|
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, scope=None, primary_replica_only=False, expected_replicas = None)
|
|
|
|
_, sstables = await take_snapshot(ks, servers, manager, logger)
|
|
|
|
logger.info(f'Move sstables to tmp dir')
|
|
tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
|
for s in servers:
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
os.makedirs(tmpbackup, exist_ok=True)
|
|
|
|
snapshots_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'snapshots')
|
|
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
|
|
exclude_list = ['manifest.json', 'schema.cql']
|
|
|
|
for item in os.listdir(snapshots_dir):
|
|
src_path = os.path.join(snapshots_dir, item)
|
|
dst_path = os.path.join(tmpbackup, item)
|
|
if item not in exclude_list:
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Refresh')
|
|
async def do_refresh(manager, logger, ks, cf, s, toc_names, scope, primary_replica_only, _prefix=None, _object_storage=None):
|
|
# Get the list of toc_names that this node needs to load and find all sstables
|
|
# that correspond to these toc_names, copy them to the upload directory and then
|
|
# call refresh
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
upload_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'upload')
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
for toc in toc_names:
|
|
basename = toc.removesuffix('-TOC.txt')
|
|
for item in os.listdir(tmpbackup):
|
|
if item.startswith(basename):
|
|
src_path = os.path.join(tmpbackup, item)
|
|
dst_path = os.path.join(upload_dir, item)
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
|
|
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, primary_replica=primary_replica_only, load_and_stream=True)
|
|
|
|
scopes = ['rack', 'dc'] if build_mode == 'debug' else ['all', 'dc', 'rack', 'node']
|
|
for scope in scopes:
|
|
# We can support rack-aware restore with rack lists, if we restore the rack-list per dc as it was at backup time.
|
|
# Otherwise, with numeric replication_factor we'd pick arbitrary subset of the racks when the keyspace
|
|
# is initially created and an arbitrary subset or the rack at restore time.
|
|
if scope == 'rack' and topology.rf != topology.racks:
|
|
logger.info(f'Skipping scope={scope} test since rf={topology.rf} != racks={topology.racks} and it cannot be supported with numeric replication_factor')
|
|
continue
|
|
pros = [False] if scope == 'node' else [False, True]
|
|
for pro in pros:
|
|
logger.info(f'Clear data by truncating, make sure the tablets map stays intact')
|
|
cql.execute(f'TRUNCATE TABLE {ks}.{cf};')
|
|
|
|
log_marks = await mark_all_logs(manager, servers)
|
|
|
|
await do_load_sstables(ks, cf, servers, topology, sstables, scope, manager, logger, primary_replica_only=pro, load_fn=do_refresh)
|
|
|
|
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, scope, primary_replica_only=pro)
|
|
await check_streaming_directions(logger, servers, topology, host_ids, scope, pro, log_marks)
|
|
|
|
shutil.rmtree(tmpbackup)
|
|
|
|
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
|
'''
|
|
Check that refreshing a cluster deletes the sstable files from the upload directory after loading
|
|
'''
|
|
|
|
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
|
|
|
|
servers, host_ids = await create_cluster(topology, True, manager, logger)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
ks = 'ks'
|
|
cf = 'cf'
|
|
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger)
|
|
|
|
_, sstables = await take_snapshot(ks, servers, manager, logger)
|
|
|
|
dirs = defaultdict(dict)
|
|
|
|
logger.info(f'Move sstables to tmp dir')
|
|
tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
|
for s in servers:
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
cf_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir)
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
dirs[s.server_id]["workdir"] = workdir
|
|
dirs[s.server_id]["cf_dir"] = cf_dir
|
|
dirs[s.server_id]["tmpbackup"] = tmpbackup
|
|
os.makedirs(tmpbackup, exist_ok=True)
|
|
|
|
snapshots_dir = os.path.join(cf_dir, 'snapshots')
|
|
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
|
|
exclude_list = ['manifest.json', 'schema.cql']
|
|
|
|
for item in os.listdir(snapshots_dir):
|
|
src_path = os.path.join(snapshots_dir, item)
|
|
dst_path = os.path.join(tmpbackup, item)
|
|
if item not in exclude_list:
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Clear data by truncating')
|
|
cql.execute(f'TRUNCATE TABLE {ks}.{cf};')
|
|
|
|
logger.info(f'Copy sstables to upload dir (with shuffling)')
|
|
shuffled = list(range(len(servers)))
|
|
random.shuffle(shuffled)
|
|
for i, s in enumerate(servers):
|
|
other = servers[shuffled[i]]
|
|
cf_dir = dirs[other.server_id]["cf_dir"]
|
|
tmpbackup = dirs[s.server_id]["tmpbackup"]
|
|
shutil.copytree(tmpbackup, os.path.join(cf_dir, 'upload'), dirs_exist_ok=True)
|
|
|
|
logger.info(f'Refresh')
|
|
async def do_refresh(s, toc_names, scope):
|
|
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
|
|
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, load_and_stream=True)
|
|
|
|
scope = 'rack'
|
|
r_servers = servers
|
|
|
|
await asyncio.gather(*(do_refresh(s, sstables, scope) for s in r_servers))
|
|
|
|
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf)
|
|
|
|
for s in r_servers:
|
|
cf_dir = dirs[s.server_id]["cf_dir"]
|
|
files = os.listdir(os.path.join(cf_dir, 'upload'))
|
|
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
|
|
|
|
shutil.rmtree(tmpbackup)
|