Files
scylladb/test/cluster/test_refresh.py
Pavel Emelyanov 61af7c8300 test: Re-sort comments around do_test_streaming_scopes()
The test description of refreshing test is very elaborated and it's
worth having it as the description of the streaming scopes test itself.
Callers of the helper can go with smaller descriptions.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-10 10:00:09 +03:00

163 lines
6.6 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
#!/usr/bin/env python3
import os
import logging
import asyncio
import pytest
import time
import random
import shutil
import uuid
from collections import defaultdict
from test.pylib.minio_server import MinioServer
from test.pylib.manager_client import ManagerClient
from test.cluster.object_store.conftest import format_tuples
from test.cluster.object_store.test_backup import topo, create_cluster, take_snapshot, create_dataset, check_mutation_replicas, do_test_streaming_scopes
from test.cluster.util import wait_for_cql_and_get_hosts
from test.pylib.rest_client import read_barrier
from test.pylib.util import unique_name
logger = logging.getLogger(__name__)
class SSTablesOnLocalStorage:
def __init__(self):
self.tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
self.object_storage = None
async def save_one(self, manager, s, ks, cf):
workdir = await manager.server_get_workdir(s.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
tmpbackup = os.path.join(workdir, f'../{self.tmpdir}')
os.makedirs(tmpbackup, exist_ok=True)
snapshots_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'snapshots')
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
exclude_list = ['manifest.json', 'schema.cql']
for item in os.listdir(snapshots_dir):
src_path = os.path.join(snapshots_dir, item)
dst_path = os.path.join(tmpbackup, item)
if item not in exclude_list:
shutil.copy2(src_path, dst_path)
async def refresh_one(self, manager, s, ks, cf, toc_names, scope, primary_replica_only):
# Get the list of toc_names that this node needs to load and find all sstables
# that correspond to these toc_names, copy them to the upload directory and then
# call refresh
workdir = await manager.server_get_workdir(s.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
upload_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir, 'upload')
os.makedirs(upload_dir, exist_ok=True)
tmpbackup = os.path.join(workdir, f'../{self.tmpdir}')
for toc in toc_names:
basename = toc.removesuffix('-TOC.txt')
for item in os.listdir(tmpbackup):
if item.startswith(basename):
src_path = os.path.join(tmpbackup, item)
dst_path = os.path.join(upload_dir, item)
shutil.copy2(src_path, dst_path)
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, primary_replica=primary_replica_only, load_and_stream=True)
async def save(self, manager, servers, snap_name, prefix, ks, cf, logger):
for s in servers:
await self.save_one(manager, s, ks, cf)
async def restore(self, manager, sstables_per_server, prefix, ks, cf, scope, primary_replica_only, logger):
await asyncio.gather(*(self.refresh_one(manager, s, ks, cf, sstables, scope, primary_replica_only) for s, sstables in sstables_per_server.items()))
@pytest.mark.asyncio
@pytest.mark.parametrize("topology_rf_validity", [
(topo(rf = 1, nodes = 3, racks = 1, dcs = 1), True),
(topo(rf = 3, nodes = 5, racks = 1, dcs = 1), False),
(topo(rf = 1, nodes = 4, racks = 2, dcs = 1), True),
(topo(rf = 3, nodes = 6, racks = 2, dcs = 1), False),
(topo(rf = 2, nodes = 8, racks = 4, dcs = 2), True)
])
async def test_refresh_with_streaming_scopes(build_mode: str, manager: ManagerClient, topology_rf_validity):
'''Check that refreshing of a cluster with stream scopes works'''
await do_test_streaming_scopes(build_mode, manager, topology_rf_validity, SSTablesOnLocalStorage())
async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
'''
Check that refreshing a cluster deletes the sstable files from the upload directory after loading
'''
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 1)
servers, host_ids = await create_cluster(topology, True, manager, logger)
cql = manager.get_cql()
await manager.disable_tablet_balancing()
ks = 'ks'
cf = 'cf'
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger)
_, sstables = await take_snapshot(ks, servers, manager, logger)
dirs = defaultdict(dict)
logger.info(f'Move sstables to tmp dir')
tmpdir = f'tmpbackup-{str(uuid.uuid4())}'
for s in servers:
workdir = await manager.server_get_workdir(s.server_id)
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
cf_dir = os.path.join(f'{workdir}/data/{ks}', cf_dir)
tmpbackup = os.path.join(workdir, f'../{tmpdir}')
dirs[s.server_id]["workdir"] = workdir
dirs[s.server_id]["cf_dir"] = cf_dir
dirs[s.server_id]["tmpbackup"] = tmpbackup
os.makedirs(tmpbackup, exist_ok=True)
snapshots_dir = os.path.join(cf_dir, 'snapshots')
snapshots_dir = os.path.join(snapshots_dir, os.listdir(snapshots_dir)[0])
exclude_list = ['manifest.json', 'schema.cql']
for item in os.listdir(snapshots_dir):
src_path = os.path.join(snapshots_dir, item)
dst_path = os.path.join(tmpbackup, item)
if item not in exclude_list:
shutil.copy2(src_path, dst_path)
logger.info(f'Clear data by truncating')
cql.execute(f'TRUNCATE TABLE {ks}.{cf};')
logger.info(f'Copy sstables to upload dir (with shuffling)')
shuffled = list(range(len(servers)))
random.shuffle(shuffled)
for i, s in enumerate(servers):
other = servers[shuffled[i]]
cf_dir = dirs[other.server_id]["cf_dir"]
tmpbackup = dirs[s.server_id]["tmpbackup"]
shutil.copytree(tmpbackup, os.path.join(cf_dir, 'upload'), dirs_exist_ok=True)
logger.info(f'Refresh')
async def do_refresh(s, toc_names, scope):
logger.info(f'Refresh {s.ip_addr} with {toc_names}, scope={scope}')
await manager.api.load_new_sstables(s.ip_addr, ks, cf, scope=scope, load_and_stream=True)
scope = 'rack'
r_servers = servers
await asyncio.gather(*(do_refresh(s, sstables, scope) for s in r_servers))
await check_mutation_replicas(cql, manager, servers, keys, topology, logger, ks, cf)
for s in r_servers:
cf_dir = dirs[s.server_id]["cf_dir"]
files = os.listdir(os.path.join(cf_dir, 'upload'))
assert files == [], f'Upload dir not empty on server {s.server_id}: {files}'
shutil.rmtree(tmpbackup)