mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
test/object_store: various cleanups
just for better readability: * chain comparison statement when appropriate * do not use f-string when there are no place holders * use list comprehension when initializing a set * remove unused import statement * move import statement of the standard library before those which import the 3rd-party modules * put two empty lines in-between top-level functions. this is recommended by PEP8. * remove the extraneous spaces around `=` in parameter list. * remove the extraneous spaces in a list like `[ 1, 2, 3 ]` so it looks like `[1, 2, 3]`. Signed-off-by: Kefu Chai <kefu.chai@scylladb.com> Closes scylladb/scylladb#21561
This commit is contained in:
committed by
Pavel Emelyanov
parent
99d420daa5
commit
5b8c2ad600
@@ -1,10 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import requests
|
||||
import pytest
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.object_store.conftest import format_tuples
|
||||
@@ -14,6 +12,7 @@ from test.pylib.util import unique_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_ks_and_cf(cql):
|
||||
ks = 'test_ks'
|
||||
cf = 'test_cf'
|
||||
@@ -31,10 +30,10 @@ def create_ks_and_cf(cql):
|
||||
|
||||
return ks, cf
|
||||
|
||||
async def prepare_snapshot_for_backup(manager: ManagerClient, server, snap_name = 'backup'):
|
||||
|
||||
async def prepare_snapshot_for_backup(manager: ManagerClient, server, snap_name='backup'):
|
||||
cql = manager.get_cql()
|
||||
workdir = await manager.server_get_workdir(server.server_id)
|
||||
print(f'Create keyspace')
|
||||
print('Create keyspace')
|
||||
ks, cf = create_ks_and_cf(cql)
|
||||
print('Flush keyspace')
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
@@ -43,6 +42,7 @@ async def prepare_snapshot_for_backup(manager: ManagerClient, server, snap_name
|
||||
|
||||
return ks, cf
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_backup(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
@@ -52,7 +52,7 @@ async def test_simple_backup(manager: ManagerClient, s3_server):
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = [ '--logger-log-level', 'snapshots=trace:task_manager=trace' ]
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
@@ -71,7 +71,7 @@ async def test_simple_backup(manager: ManagerClient, s3_server):
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
assert (status['progress_total'] > 0) and (status['progress_completed'] == status['progress_total'])
|
||||
|
||||
objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ])
|
||||
objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all())
|
||||
for f in files:
|
||||
print(f'Check {f} is in backup')
|
||||
assert f'{prefix}/{f}' in objects
|
||||
@@ -116,7 +116,7 @@ async def do_test_backup_abort(manager: ManagerClient, s3_server,
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = [ '--logger-log-level', 'snapshots=trace:task_manager=trace' ]
|
||||
cmd = ['--logger-log-level', 'snapshots=trace:task_manager=trace']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server)
|
||||
|
||||
@@ -132,7 +132,7 @@ async def do_test_backup_abort(manager: ManagerClient, s3_server,
|
||||
print('Backup snapshot')
|
||||
# use a unique(ish) path, because we're running more than one test using the same minio and ks/cf name.
|
||||
# If we just use {cf}/backup, files like "schema.cql" and "manifest.json" will remain after previous test
|
||||
# case, and we will count these erroneously.
|
||||
# case, and we will count these erroneously.
|
||||
prefix = f'{cf_dir}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix)
|
||||
|
||||
@@ -144,7 +144,7 @@ async def do_test_backup_abort(manager: ManagerClient, s3_server,
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
|
||||
objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ])
|
||||
objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all())
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
@@ -154,9 +154,10 @@ async def do_test_backup_abort(manager: ManagerClient, s3_server,
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert uploaded_count >= min_files and uploaded_count < len(files)
|
||||
assert min_files <= uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backup_to_non_existent_snapshot(manager: ManagerClient, s3_server):
|
||||
'''backup should fail if the snapshot does not exist'''
|
||||
@@ -192,13 +193,15 @@ async def test_backup_to_non_existent_snapshot(manager: ManagerClient, s3_server
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pause", min_files = 0)
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name="backup_task_pause", min_files=0)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable_in_s3_client(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pre_upload", min_files = 0, max_files = 1)
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name="backup_task_pre_upload", min_files=0, max_files=1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
@@ -209,7 +212,7 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
'experimental_features': ['keyspace-storage-options'],
|
||||
'task_ttl_in_seconds': 300
|
||||
}
|
||||
cmd = [ '--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug' ]
|
||||
cmd = ['--logger-log-level', 'sstables_loader=debug:sstable_directory=trace:snapshots=trace:s3=trace:sstable=debug:http=debug']
|
||||
server = await manager.server_add(config=cfg, cmdline=cmd)
|
||||
|
||||
cql = manager.get_cql()
|
||||
@@ -222,11 +225,12 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
ks, cf = await prepare_snapshot_for_backup(manager, server, snap_name)
|
||||
|
||||
cf_dir = os.listdir(f'{workdir}/data/{ks}')[0]
|
||||
|
||||
def list_sstables():
|
||||
return [ f for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}') if f.is_file() ]
|
||||
return [f for f in os.scandir(f'{workdir}/data/{ks}/{cf_dir}') if f.is_file()]
|
||||
|
||||
orig_res = cql.execute(f"SELECT * FROM {ks}.{cf}")
|
||||
orig_rows = { x.name: x.value for x in orig_res }
|
||||
orig_rows = {x.name: x.value for x in orig_res}
|
||||
|
||||
# include a "suffix" in the key to mimic the use case where scylla-manager
|
||||
# 1. backups sstables of multiple snapshots, and deduplicate the backup'ed
|
||||
@@ -255,27 +259,27 @@ async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
|
||||
print(f'Drop the table data and validate it\'s gone')
|
||||
print('Drop the table data and validate it\'s gone')
|
||||
cql.execute(f"TRUNCATE TABLE {ks}.{cf};")
|
||||
files = list_sstables()
|
||||
assert len(files) == 0
|
||||
res = cql.execute(f"SELECT * FROM {ks}.{cf};")
|
||||
assert not res
|
||||
objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix) ])
|
||||
objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix))
|
||||
assert len(objects) > 0
|
||||
|
||||
print(f'Try to restore')
|
||||
print('Try to restore')
|
||||
tid = await manager.api.restore(server.ip_addr, ks, cf, s3_server.address, s3_server.bucket_name, prefix, toc_names)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
assert (status is not None) and (status['state'] == 'done')
|
||||
print(f'Check that sstables came back')
|
||||
print('Check that sstables came back')
|
||||
files = list_sstables()
|
||||
assert len(files) > 0
|
||||
print(f'Check that data came back too')
|
||||
print('Check that data came back too')
|
||||
res = cql.execute(f"SELECT * FROM {ks}.{cf};")
|
||||
rows = { x.name: x.value for x in res }
|
||||
rows = {x.name: x.value for x in res}
|
||||
assert rows == orig_rows, "Unexpected table contents after restore"
|
||||
|
||||
print(f'Check that backup files are still there') # regression test for #20938
|
||||
post_objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix) ])
|
||||
print('Check that backup files are still there') # regression test for #20938
|
||||
post_objects = set(o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.filter(Prefix=prefix))
|
||||
assert objects == post_objects
|
||||
|
||||
Reference in New Issue
Block a user