mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
Merge 'Re-use and improve the take_snapshot() helper in backup tests' from Pavel Emelyanov
The helper is very simple yet generic -- it takes a snapshot of a keyspace on all servers and collects the resulting sstables from workdirs. Re-using it in all test cases saves some lines of code. Also, the method is "sequential", making it "parallel" reduces the waiting time a bit. Will help generalizing existing backup/restore tests to support clustered snapshot/backup/restore API (see #28525) later. Cleaning up tests, not backporting. Closes scylladb/scylladb#28660 * github.com:scylladb/scylladb: test/backup: Run keyspace flush and snapshot taking API in parallel test/backup: Re-use take_snapshot() helper in do_abort_restore() test/backup: Move take_snapshot() helper up
This commit is contained in:
@@ -53,6 +53,22 @@ async def prepare_snapshot_for_backup(manager: ManagerClient, server, snap_name=
|
||||
return ks, cf
|
||||
|
||||
|
||||
async def take_snapshot(ks, servers, manager, logger):
|
||||
logger.info(f'Take snapshot and collect sstables lists')
|
||||
snap_name = unique_name('backup_')
|
||||
sstables = dict()
|
||||
await asyncio.gather(*(manager.api.flush_keyspace(s.ip_addr, ks) for s in servers))
|
||||
await asyncio.gather(*(manager.api.take_snapshot(s.ip_addr, ks, snap_name) for s in servers))
|
||||
for s in servers:
|
||||
workdir = await manager.server_get_workdir(s.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
tocs = [ f.name for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/{snap_name}') if f.is_file() and f.name.endswith('TOC.txt') ]
|
||||
logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}')
|
||||
sstables[s] = tocs
|
||||
|
||||
return snap_name,sstables
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("move_files", [False, True])
|
||||
async def test_simple_backup(manager: ManagerClient, object_storage, move_files):
|
||||
@@ -382,36 +398,7 @@ async def do_abort_restore(manager: ManagerClient, object_storage):
|
||||
num_keys = 10000
|
||||
await asyncio.gather(*(cql.run_async(insert_stmt, (str(i), str(i))) for i in range(num_keys)))
|
||||
|
||||
# Flush keyspace on all servers
|
||||
logger.info("Flushing keyspace on all servers...")
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, keyspace)
|
||||
|
||||
# Take snapshot for keyspace
|
||||
snapshot_name = unique_name('backup_')
|
||||
logger.info(f"Taking snapshot '{snapshot_name}' for keyspace '{keyspace}'...")
|
||||
for server in servers:
|
||||
await manager.api.take_snapshot(server.ip_addr, keyspace, snapshot_name)
|
||||
|
||||
# Collect snapshot files from each server
|
||||
async def get_snapshot_files(server, snapshot_name):
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
data_path = os.path.join(workdir, 'data', keyspace)
|
||||
cf_dirs = os.listdir(data_path)
|
||||
if not cf_dirs:
|
||||
raise RuntimeError(f"No column family directories found in {data_path}")
|
||||
# Assumes that there is only one column family directory under the keyspace.
|
||||
cf_dir = cf_dirs[0]
|
||||
snapshot_path = os.path.join(data_path, cf_dir, 'snapshots', snapshot_name)
|
||||
return [
|
||||
f.name for f in os.scandir(snapshot_path)
|
||||
if f.is_file() and f.name.endswith('TOC.txt')
|
||||
]
|
||||
|
||||
sstables = {}
|
||||
for server in servers:
|
||||
snapshot_files = await get_snapshot_files(server, snapshot_name)
|
||||
sstables[server.server_id] = snapshot_files
|
||||
snapshot_name, sstables = await take_snapshot(keyspace, servers, manager, logger)
|
||||
|
||||
# Backup the keyspace on each server to S3
|
||||
prefix = f"{table}/{snapshot_name}"
|
||||
@@ -449,7 +436,7 @@ async def do_abort_restore(manager: ManagerClient, object_storage):
|
||||
object_storage.address,
|
||||
object_storage.bucket_name,
|
||||
prefix,
|
||||
sstables[server.server_id]
|
||||
sstables[server]
|
||||
)
|
||||
restore_task_ids[server.server_id] = restore_tid
|
||||
|
||||
@@ -544,21 +531,6 @@ async def do_restore_server(manager, logger, ks, cf, s, toc_names, scope, primar
|
||||
status = await manager.api.wait_task(s.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
async def take_snapshot(ks, servers, manager, logger):
|
||||
logger.info(f'Take snapshot and collect sstables lists')
|
||||
snap_name = unique_name('backup_')
|
||||
sstables = dict()
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
await manager.api.take_snapshot(s.ip_addr, ks, snap_name)
|
||||
workdir = await manager.server_get_workdir(s.server_id)
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
tocs = [ f.name for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/{snap_name}') if f.is_file() and f.name.endswith('TOC.txt') ]
|
||||
logger.info(f'Collected sstables from {s.ip_addr}:{cf_dir}/snapshots/{snap_name}: {tocs}')
|
||||
sstables[s] = tocs
|
||||
|
||||
return snap_name,sstables
|
||||
|
||||
async def check_streaming_directions(logger, servers, topology, host_ids, scope, primary_replica_only, log_marks):
|
||||
host_ids_per_dc = defaultdict(list)
|
||||
host_ids_per_dc_rack = dict()
|
||||
|
||||
Reference in New Issue
Block a user