diff --git a/db/snapshot/backup_task.cc b/db/snapshot/backup_task.cc index c9b635270d..eee4027832 100644 --- a/db/snapshot/backup_task.cc +++ b/db/snapshot/backup_task.cc @@ -109,29 +109,8 @@ future<> backup_task_impl::do_backup() { co_await process_snapshot_dir(); - gate uploads; + co_await uploads_worker(); - for (auto it = _files.begin(); it != _files.end() && !_ex; ++it) { - auto gh = uploads.hold(); - - // Pre-upload break point. For testing abort in actual s3 client usage. - co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2))); - - std::ignore = upload_component(*it).handle_exception([this] (std::exception_ptr e) { - // keep the first exception - if (!_ex) { - _ex = std::move(e); - } - }).finally([gh = std::move(gh)] {}); - co_await coroutine::maybe_yield(); - co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2))); - if (impl::_as.abort_requested()) { - _ex = impl::_as.abort_requested_exception_ptr(); - break; - } - } - - co_await uploads.close(); if (_ex) { co_await coroutine::return_exception_ptr(std::move(_ex)); } @@ -160,6 +139,40 @@ future<> backup_task_impl::process_snapshot_dir() { } } +future<> backup_task_impl::uploads_worker() { + gate uploads; + + try { + for (auto it = _files.begin(); it != _files.end() && !_ex; ++it) { + auto gh = uploads.hold(); + + // Pre-upload break point. For testing abort in actual s3 client usage. + co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2))); + + std::ignore = upload_component(*it).handle_exception([this] (std::exception_ptr e) { + // keep the first exception + if (!_ex) { + _ex = std::move(e); + } + }).finally([gh = std::move(gh)] {}); + co_await coroutine::maybe_yield(); + co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2))); + if (impl::_as.abort_requested()) { + _ex = impl::_as.abort_requested_exception_ptr(); + break; + } + } + } catch (...) { + _ex = std::current_exception(); + } + + co_await uploads.close(); + + if (_ex) { + co_await coroutine::return_exception_ptr(_ex); + } +} + future<> backup_task_impl::run() { // do_backup() removes a file once it is fully uploaded, so we are actually // mutating snapshots. diff --git a/db/snapshot/backup_task.hh b/db/snapshot/backup_task.hh index 824243422c..25725170ae 100644 --- a/db/snapshot/backup_task.hh +++ b/db/snapshot/backup_task.hh @@ -37,6 +37,7 @@ class backup_task_impl : public tasks::task_manager::task::impl { future<> do_backup(); future<> upload_component(sstring name); future<> process_snapshot_dir(); + future<> uploads_worker(); protected: virtual future<> run() override;