backup and restore tests. This made the testing times explode with both cluster/object_store/test_backup.py and cluster/test_refresh.py taking more than an hour each to complete under test.py and around 14min under pytest directly. This was painful especially in CI because it runs tests under test.py which suffers from the issue of not being able to run test cases from within the same file in parallel (a fix is attempted in 27618). This patch reduces the dataset of these tests to the minimum and gets rid of one of the tested topology as it was redundant. The test times are reduced to 2min under pytest and 14 mins under test.py. Signed-off-by: Robert Bindar <robert.bindar@scylladb.com> Closes scylladb/scylladb#28280
206 lines
8.9 KiB
Python
206 lines
8.9 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import logging
|
|
import asyncio
|
|
import pytest
|
|
import time
|
|
import random
|
|
import shutil
|
|
import uuid
|
|
from collections import defaultdict
|
|
|
|
from test.pylib.minio_server import MinioServer
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.cluster.object_store.conftest import format_tuples
|
|
from test.cluster.object_store.test_backup import topo, create_cluster, take_snapshot, create_dataset, check_data_is_back, do_load_sstables, mark_all_logs, check_mutation_replicas
|
|
from test.cluster.util import wait_for_cql_and_get_hosts
|
|
from test.pylib.rest_client import read_barrier
|
|
from test.pylib.util import unique_name
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.parametrize("topology_rf_validity", [
|
|
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
|
|
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
|
|
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
|
|
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
|
|
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
|
|
])
|
|
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
|
|
'''
|
|
Check that refreshing a cluster with stream scopes works
|
|
|
|
This test creates a cluster specified by the topology parameter above,
|
|
configurable number of nodes, tacks, datacenters, and replication factor.
|
|
|
|
It creates a dataset, takes a snapshot and copies the sstables of all nodes to a temporary
|
|
location. It then truncates the table so all sstables are gone, copies all the sstables into
|
|
each node's upload directory, and refreshes the nodes given the scope passed as the test parameter.
|
|
|
|
The test then performs two types of checks:
|
|
1) Check that the data is back in the table by getting all mutations from the nodes and checking
|
|
that a random sample of them contains the expected key and that they are replicated according to RF * DCS factor.
|
|
2) Check that the streaming communication between nodes is as expected according to the scope parameter of the test.
|
|
This stage parses the logs and checks that the data was streamed to nodes within the configured scope.
|
|
'''
|
|
|
|
topology, rf_rack_valid_keyspaces = topology_rf_validity
|
|
|
|
servers, host_ids = await create_cluster(topology, rf_rack_valid_keyspaces, manager, logger)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
ks = 'ks'
|
|
cf = 'cf'
|
|
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger, num_keys=10, min_tablet_count=5)
|
|
|
|
# validate replicas assertions hold on fresh dataset
|
|
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf, scope=None, primary_replica_only=False, expected_replicas = None)
|
|
|
|
_, sstables = await take_snapshot(ks, servers, manager, logger)
|
|
|
|
logger.info(f'Move sstables to tmp dir')
|
|
tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
|
for s in servers:
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
os.makedirs(tmpbackup, exist_ok=True)
|
|
|
|
snapshots_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'snapshots')
|
|
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
|
|
exclude_list = ['manifest.json', 'schema.cql']
|
|
|
|
for item in os.listdir(snapshots_dir):
|
|
src_path = os.path.join(snapshots_dir, item)
|
|
dst_path = os.path.join(tmpbackup, item)
|
|
if item not in exclude_list:
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Refresh')
|
|
async def do_refresh(manager, logger, ks, cf, s, toc_names, scope, primary_replica_only, _prefix=None, _object_storage=None):
|
|
# Get the list of toc_names that this node needs to load and find all sstables
|
|
# that correspond to these toc_names, copy them to the upload directory and then
|
|
# call refresh
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
upload_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'upload')
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
for toc in toc_names:
|
|
basename = toc.removesuffix('-TOC.txt')
|
|
for item in os.listdir(tmpbackup):
|
|
if item.startswith(basename):
|
|
src_path = os.path.join(tmpbackup, item)
|
|
dst_path = os.path.join(upload_dir, item)
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
|
|
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, primary_replica=primary_replica_only, load_and_stream=True)
|
|
|
|
scopes = ['rack', 'dc'] if build_mode == 'debug' else ['all', 'dc', 'rack', 'node']
|
|
for scope in scopes:
|
|
# We can support rack-aware restore with rack lists, if we restore the rack-list per dc as it was at backup time.
|
|
# Otherwise, with numeric replication_factor we'd pick arbitrary subset of the racks when the keyspace
|
|
# is initially created and an arbitrary subset or the rack at restore time.
|
|
if scope == 'rack' and topology.rf != topology.racks:
|
|
logger.info(f'Skipping scope={scope} test since rf={topology.rf} != racks={topology.racks} and it cannot be supported with numeric replication_factor')
|
|
continue
|
|
pros = [False] if scope == 'node' else [False, True]
|
|
for pro in pros:
|
|
logger.info(f'Clear data by truncating, make sure the tablets map stays intact')
|
|
cql.execute(f'TRUNCATE TABLE {ks}.{cf};')
|
|
|
|
log_marks = await mark_all_logs(manager, servers)
|
|
|
|
await do_load_sstables(ks, cf, servers, topology, sstables, scope, manager, logger, primary_replica_only=pro, load_fn=do_refresh)
|
|
|
|
await check_data_is_back(manager, logger, cql, ks, cf, keys, servers, topology, host_ids, scope, primary_replica_only=pro, log_marks=log_marks)
|
|
|
|
shutil.rmtree(tmpbackup)
|
|
|
|
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
|
'''
|
|
Check that refreshing a cluster deletes the sstable files from the upload directory after loading
|
|
'''
|
|
|
|
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
|
|
|
|
servers, host_ids = await create_cluster(topology, True, manager, logger)
|
|
|
|
cql = manager.get_cql()
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
ks = 'ks'
|
|
cf = 'cf'
|
|
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger)
|
|
|
|
_, sstables = await take_snapshot(ks, servers, manager, logger)
|
|
|
|
dirs = defaultdict(dict)
|
|
|
|
logger.info(f'Move sstables to tmp dir')
|
|
tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
|
|
for s in servers:
|
|
workdir = await manager.server_get_workdir(s.server_id)
|
|
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
|
cf_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir)
|
|
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
|
|
dirs[s.server_id]["workdir"] = workdir
|
|
dirs[s.server_id]["cf_dir"] = cf_dir
|
|
dirs[s.server_id]["tmpbackup"] = tmpbackup
|
|
os.makedirs(tmpbackup, exist_ok=True)
|
|
|
|
snapshots_dir = os.path.join(cf_dir, 'snapshots')
|
|
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
|
|
exclude_list = ['manifest.json', 'schema.cql']
|
|
|
|
for item in os.listdir(snapshots_dir):
|
|
src_path = os.path.join(snapshots_dir, item)
|
|
dst_path = os.path.join(tmpbackup, item)
|
|
if item not in exclude_list:
|
|
shutil.copy2(src_path, dst_path)
|
|
|
|
logger.info(f'Clear data by truncating')
|
|
cql.execute(f'TRUNCATE TABLE {ks}.{cf};')
|
|
|
|
logger.info(f'Copy sstables to upload dir (with shuffling)')
|
|
shuffled = list(range(len(servers)))
|
|
random.shuffle(shuffled)
|
|
for i, s in enumerate(servers):
|
|
other = servers[shuffled[i]]
|
|
cf_dir = dirs[other.server_id]["cf_dir"]
|
|
tmpbackup = dirs[s.server_id]["tmpbackup"]
|
|
shutil.copytree(tmpbackup, os.path.join(cf_dir, 'upload'), dirs_exist_ok=True)
|
|
|
|
logger.info(f'Refresh')
|
|
async def do_refresh(s, toc_names, scope):
|
|
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
|
|
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, load_and_stream=True)
|
|
|
|
scope = 'rack'
|
|
r_servers = servers
|
|
|
|
await asyncio.gather(*(do_refresh(s, sstables, scope) for s in r_servers))
|
|
|
|
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf)
|
|
|
|
for s in r_servers:
|
|
cf_dir = dirs[s.server_id]["cf_dir"]
|
|
files = os.listdir(os.path.join(cf_dir, 'upload'))
|
|
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
|
|
|
|
shutil.rmtree(tmpbackup)
|