diff --git a/test/cluster/object_store/test_backup.py b/test/cluster/object_store/test_backup.py index 2cca840c7b..fed8b51d24 100644 --- a/test/cluster/object_store/test_backup.py +++ b/test/cluster/object_store/test_backup.py @@ -53,6 +53,22 @@ async def prepare_snapshot_for_backup(manager: ManagerClient, server, snap_name= return ks, cf +async def take_snapshot(ks, servers, manager, logger): + logger.info(f'Take snapshot and collect sstables lists') + snap_name = unique_name('backup_') + sstables = dict() + await asyncio.gather(*(manager.api.flush_keyspace(s.ip_addr, ks) for s in servers)) + await asyncio.gather(*(manager.api.take_snapshot(s.ip_addr, ks, snap_name) for s in servers)) + for s in servers: + workdir = await manager.server_get_workdir(s.server_id) + cf_dir = os.listdir(f'{workdir}/data/{ks}')[0] + tocs = [ f.name for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/{snap_name}') if f.is_file() and f.name.endswith('TOC.txt') ] + logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}') + sstables[s] = tocs + + return snap_name,sstables + + @pytest.mark.asyncio @pytest.mark.parametrize("move_files", [False, True]) async def test_simple_backup(manager: ManagerClient, object_storage, move_files): @@ -382,36 +398,7 @@ async def do_abort_restore(manager: ManagerClient, object_storage): num_keys = 10000 await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), str(i))) for i in range(num_keys))) - # Flush keyspace on all servers - logger.info("Flushing keyspace on all servers...") - for server in servers: - await manager.api.flush_keyspace(server.ip_addr, keyspace) - - # Take snapshot for keyspace - snapshot_name = unique_name('backup_') - logger.info(f"Taking snapshot '{snapshot_name}' for keyspace '{keyspace}'...") - for server in servers: - await manager.api.take_snapshot(server.ip_addr, keyspace, snapshot_name) - - # Collect snapshot files from each server - async def get_snapshot_files(server, snapshot_name): - workdir = await manager.server_get_workdir(server.server_id) - data_path = os.path.join(workdir, 'data', keyspace) - cf_dirs = os.listdir(data_path) - if not cf_dirs: - raise RuntimeError(f"No column family directories found in {data_path}") - # Assumes that there is only one column family directory under the keyspace. - cf_dir = cf_dirs[0] - snapshot_path = os.path.join(data_path, cf_dir, 'snapshots', snapshot_name) - return [ - f.name for f in os.scandir(snapshot_path) - if f.is_file() and f.name.endswith('TOC.txt') - ] - - sstables = {} - for server in servers: - snapshot_files = await get_snapshot_files(server, snapshot_name) - sstables[server.server_id] = snapshot_files + snapshot_name, sstables = await take_snapshot(keyspace, servers, manager, logger) # Backup the keyspace on each server to S3 prefix = f"{table}/{snapshot_name}" @@ -449,7 +436,7 @@ async def do_abort_restore(manager: ManagerClient, object_storage): object_storage.address, object_storage.bucket_name, prefix, - sstables[server.server_id] + sstables[server] ) restore_task_ids[server.server_id] = restore_tid @@ -544,21 +531,6 @@ async def do_restore_server(manager, logger, ks, cf, s, toc_names, scope, primar status = await manager.api.wait_task(s.ip_addr, tid) assert (status is not None) and (status['state'] == 'done') -async def take_snapshot(ks, servers, manager, logger): - logger.info(f'Take snapshot and collect sstables lists') - snap_name = unique_name('backup_') - sstables = dict() - for s in servers: - await manager.api.flush_keyspace(s.ip_addr, ks) - await manager.api.take_snapshot(s.ip_addr, ks, snap_name) - workdir = await manager.server_get_workdir(s.server_id) - cf_dir = os.listdir(f'{workdir}/data/{ks}')[0] - tocs = [ f.name for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/{snap_name}') if f.is_file() and f.name.endswith('TOC.txt') ] - logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}') - sstables[s] = tocs - - return snap_name,sstables - async def check_streaming_directions(logger, servers, topology, host_ids, scope, primary_replica_only, log_marks): host_ids_per_dc = defaultdict(list) host_ids_per_dc_rack = dict()