db: snapshot: backup_task: refactor uploads_worker out of do_backup

Let do_backup deal only with the high level coordination.
A future patch will follow this structure to run
uploads_worker on each shard.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-03-18 19:13:01 +02:00
parent ff25b4c97f
commit e3ba425c2b
2 changed files with 36 additions and 22 deletions

View File

@@ -109,29 +109,8 @@ future<> backup_task_impl::do_backup() {
co_await process_snapshot_dir();
gate uploads;
co_await uploads_worker();
for (auto it = _files.begin(); it != _files.end() && !_ex; ++it) {
auto gh = uploads.hold();
// Pre-upload break point. For testing abort in actual s3 client usage.
co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2)));
std::ignore = upload_component(*it).handle_exception([this] (std::exception_ptr e) {
// keep the first exception
if (!_ex) {
_ex = std::move(e);
}
}).finally([gh = std::move(gh)] {});
co_await coroutine::maybe_yield();
co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2)));
if (impl::_as.abort_requested()) {
_ex = impl::_as.abort_requested_exception_ptr();
break;
}
}
co_await uploads.close();
if (_ex) {
co_await coroutine::return_exception_ptr(std::move(_ex));
}
@@ -160,6 +139,40 @@ future<> backup_task_impl::process_snapshot_dir() {
}
}
future<> backup_task_impl::uploads_worker() {
gate uploads;
try {
for (auto it = _files.begin(); it != _files.end() && !_ex; ++it) {
auto gh = uploads.hold();
// Pre-upload break point. For testing abort in actual s3 client usage.
co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2)));
std::ignore = upload_component(*it).handle_exception([this] (std::exception_ptr e) {
// keep the first exception
if (!_ex) {
_ex = std::move(e);
}
}).finally([gh = std::move(gh)] {});
co_await coroutine::maybe_yield();
co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2)));
if (impl::_as.abort_requested()) {
_ex = impl::_as.abort_requested_exception_ptr();
break;
}
}
} catch (...) {
_ex = std::current_exception();
}
co_await uploads.close();
if (_ex) {
co_await coroutine::return_exception_ptr(_ex);
}
}
future<> backup_task_impl::run() {
// do_backup() removes a file once it is fully uploaded, so we are actually
// mutating snapshots.

View File

@@ -37,6 +37,7 @@ class backup_task_impl : public tasks::task_manager::task::impl {
future<> do_backup();
future<> upload_component(sstring name);
future<> process_snapshot_dir();
future<> uploads_worker();
protected:
virtual future<> run() override;