mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
backup_task: Use task abort source in s3 client call
Fixes #20716 Propagates abort source in task object to actual network call, thus making the upload workload more quickly abortable. v2: Fix test to handle two versions after each other
This commit is contained in:
@@ -82,14 +82,16 @@ future<> backup_task_impl::do_backup() {
|
||||
auto component_name = _snapshot_dir / component_ent->name;
|
||||
auto destination = fmt::format("/{}/{}/{}", _bucket, _prefix, component_ent->name);
|
||||
snap_log.trace("Upload {} to {}", component_name.native(), destination);
|
||||
|
||||
// Pre-upload break point. For testing abort in actual s3 client usage.
|
||||
co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2)));
|
||||
|
||||
// Start uploading in the background. The caller waits for these fibers
|
||||
// with the uploads gate.
|
||||
// Parallelism is implicitly controlled in two ways:
|
||||
// - s3::client::claim_memory semaphore
|
||||
// - http::client::max_connections limitation
|
||||
// FIXME -- s3::client is not abortable yet, but when it will be, need to
|
||||
// propagate impl::_as abort requests into upload_file's fibers
|
||||
std::ignore = _client->upload_file(component_name, destination, _progress).handle_exception([comp = component_name, &ex] (std::exception_ptr e) {
|
||||
std::ignore = _client->upload_file(component_name, destination, _progress, &_as).handle_exception([comp = component_name, &ex] (std::exception_ptr e) {
|
||||
snap_log.error("Error uploading {}: {}", comp.native(), e);
|
||||
// keep the first exception
|
||||
if (!ex) {
|
||||
|
||||
@@ -107,10 +107,9 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, s3_server):
|
||||
assert status['state'] == 'failed'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
async def do_test_backup_abort(manager: ManagerClient, s3_server,
|
||||
breakpoint_name, min_files, max_files = None):
|
||||
'''helper for backup abort testing'''
|
||||
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'object_storage_config_file': str(s3_server.config_file),
|
||||
@@ -126,18 +125,21 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server):
|
||||
files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup'))
|
||||
assert len(files) > 1
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, "backup_task_pause", one_shot=True)
|
||||
await manager.api.enable_injection(server.ip_addr, breakpoint_name, one_shot=True)
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
print('Backup snapshot')
|
||||
prefix = f'{cf}/backup'
|
||||
# use a unique(ish) path, because we're running more than one test using the same minio and ks/cf name.
|
||||
# If we just use {cf}/backup, files like "schema.cql" and "manifest.json" will remain after previous test
|
||||
# case, and we will count these erroneously.
|
||||
prefix = f'{cf_dir}/backup'
|
||||
tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix)
|
||||
|
||||
print(f'Started task {tid}, aborting it early')
|
||||
await log.wait_for('backup_task_pause: waiting', from_mark=mark)
|
||||
await log.wait_for(breakpoint_name + ': waiting', from_mark=mark)
|
||||
await manager.api.abort_task(server.ip_addr, tid)
|
||||
await manager.api.message_injection(server.ip_addr, "backup_task_pause")
|
||||
await manager.api.message_injection(server.ip_addr, breakpoint_name)
|
||||
status = await manager.api.wait_task(server.ip_addr, tid)
|
||||
print(f'Status: {status}')
|
||||
assert (status is not None) and (status['state'] == 'failed')
|
||||
@@ -145,11 +147,27 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server):
|
||||
objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ])
|
||||
uploaded_count = 0
|
||||
for f in files:
|
||||
print(f'Check {f} is in backup')
|
||||
if f'{prefix}/{f}' in objects:
|
||||
in_backup = f'{prefix}/{f}' in objects
|
||||
print(f'Check {f} is in backup: {in_backup}')
|
||||
if in_backup:
|
||||
uploaded_count += 1
|
||||
assert uploaded_count > 0 and uploaded_count < len(files)
|
||||
# Note: since s3 client is abortable and run async, we might fail even the first file
|
||||
# regardless of if we set the abort status before or after the upload is initiated.
|
||||
# Parallelism is a pain.
|
||||
assert uploaded_count >= min_files and uploaded_count < len(files)
|
||||
assert max_files is None or uploaded_count < max_files
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pause", min_files = 0)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_backup_is_abortable_in_s3_client(manager: ManagerClient, s3_server):
|
||||
'''check that backing up a snapshot for a keyspace works'''
|
||||
await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pre_upload", min_files = 0, max_files = 1)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):
|
||||
|
||||
Reference in New Issue
Block a user