From 0db4b9fd94674fb87bcedefc470e2ca75528137a Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 4 Nov 2024 15:26:34 +0000 Subject: [PATCH] backup_task: Use task abort source in s3 client call Fixes #20716 Propagates abort source in task object to actual network call, thus making the upload workload more quickly abortable. v2: Fix test to handle two versions after each other --- db/snapshot/backup_task.cc | 8 ++++--- test/object_store/test_backup.py | 40 +++++++++++++++++++++++--------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/db/snapshot/backup_task.cc b/db/snapshot/backup_task.cc index c7819061a4..33c135d849 100644 --- a/db/snapshot/backup_task.cc +++ b/db/snapshot/backup_task.cc @@ -82,14 +82,16 @@ future<> backup_task_impl::do_backup() { auto component_name = _snapshot_dir / component_ent->name; auto destination = fmt::format("/{}/{}/{}", _bucket, _prefix, component_ent->name); snap_log.trace("Upload {} to {}", component_name.native(), destination); + + // Pre-upload break point. For testing abort in actual s3 client usage. + co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2))); + // Start uploading in the background. The caller waits for these fibers // with the uploads gate. // Parallelism is implicitly controlled in two ways: // - s3::client::claim_memory semaphore // - http::client::max_connections limitation - // FIXME -- s3::client is not abortable yet, but when it will be, need to - // propagate impl::_as abort requests into upload_file's fibers - std::ignore = _client->upload_file(component_name, destination, _progress).handle_exception([comp = component_name, &ex] (std::exception_ptr e) { + std::ignore = _client->upload_file(component_name, destination, _progress, &_as).handle_exception([comp = component_name, &ex] (std::exception_ptr e) { snap_log.error("Error uploading {}: {}", comp.native(), e); // keep the first exception if (!ex) { diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index c37f5d3ba3..ebabce4fa9 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -107,10 +107,9 @@ async def test_backup_to_non_existent_bucket(manager: ManagerClient, s3_server): assert status['state'] == 'failed' -@pytest.mark.asyncio -@skip_mode('release', 'error injections are not supported in release mode') -async def test_backup_is_abortable(manager: ManagerClient, s3_server): - '''check that backing up a snapshot for a keyspace works''' +async def do_test_backup_abort(manager: ManagerClient, s3_server, + breakpoint_name, min_files, max_files = None): + '''helper for backup abort testing''' cfg = {'enable_user_defined_functions': False, 'object_storage_config_file': str(s3_server.config_file), @@ -126,18 +125,21 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server): files = set(os.listdir(f'{workdir}/data/{ks}/{cf_dir}/snapshots/backup')) assert len(files) > 1 - await manager.api.enable_injection(server.ip_addr, "backup_task_pause", one_shot=True) + await manager.api.enable_injection(server.ip_addr, breakpoint_name, one_shot=True) log = await manager.server_open_log(server.server_id) mark = await log.mark() print('Backup snapshot') - prefix = f'{cf}/backup' + # use a unique(ish) path, because we're running more than one test using the same minio and ks/cf name. + # If we just use {cf}/backup, files like "schema.cql" and "manifest.json" will remain after previous test + # case, and we will count these erroneously. + prefix = f'{cf_dir}/backup' tid = await manager.api.backup(server.ip_addr, ks, cf, 'backup', s3_server.address, s3_server.bucket_name, prefix) print(f'Started task {tid}, aborting it early') - await log.wait_for('backup_task_pause: waiting', from_mark=mark) + await log.wait_for(breakpoint_name + ': waiting', from_mark=mark) await manager.api.abort_task(server.ip_addr, tid) - await manager.api.message_injection(server.ip_addr, "backup_task_pause") + await manager.api.message_injection(server.ip_addr, breakpoint_name) status = await manager.api.wait_task(server.ip_addr, tid) print(f'Status: {status}') assert (status is not None) and (status['state'] == 'failed') @@ -145,11 +147,27 @@ async def test_backup_is_abortable(manager: ManagerClient, s3_server): objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ]) uploaded_count = 0 for f in files: - print(f'Check {f} is in backup') - if f'{prefix}/{f}' in objects: + in_backup = f'{prefix}/{f}' in objects + print(f'Check {f} is in backup: {in_backup}') + if in_backup: uploaded_count += 1 - assert uploaded_count > 0 and uploaded_count < len(files) + # Note: since s3 client is abortable and run async, we might fail even the first file + # regardless of if we set the abort status before or after the upload is initiated. + # Parallelism is a pain. + assert uploaded_count >= min_files and uploaded_count < len(files) + assert max_files is None or uploaded_count < max_files +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_backup_is_abortable(manager: ManagerClient, s3_server): + '''check that backing up a snapshot for a keyspace works''' + await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pause", min_files = 0) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_backup_is_abortable_in_s3_client(manager: ManagerClient, s3_server): + '''check that backing up a snapshot for a keyspace works''' + await do_test_backup_abort(manager, s3_server, breakpoint_name = "backup_task_pre_upload", min_files = 0, max_files = 1) @pytest.mark.asyncio async def test_simple_backup_and_restore(manager: ManagerClient, s3_server):