From 25ae3d0aede9fabc1938e0b6b0852958bb7a233b Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Wed, 16 Oct 2024 13:20:23 +0300 Subject: [PATCH] backup_task: Report uploading progress Do it by passing reference to s3::upload_progress_monitor object that sits on task impl itself. Different files' uploads would then update the monitor with their sizes and uploaded counters. The structure is reported by get_progress() method. Unit size is set to be bytes. Test is updated. Signed-off-by: Pavel Emelyanov --- db/snapshot/backup_task.cc | 10 +++++++++- db/snapshot/backup_task.hh | 3 +++ test/object_store/test_backup.py | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/db/snapshot/backup_task.cc b/db/snapshot/backup_task.cc index dbc895d3d0..8644a29ebb 100644 --- a/db/snapshot/backup_task.cc +++ b/db/snapshot/backup_task.cc @@ -34,6 +34,7 @@ backup_task_impl::backup_task_impl(tasks::task_manager::module_ptr module, , _bucket(std::move(bucket)) , _prefix(std::move(prefix)) , _snapshot_dir(std::move(snapshot_dir)) { + _status.progress_units = "bytes ('total' may grow along the way)"; } std::string backup_task_impl::type() const { @@ -48,6 +49,13 @@ tasks::is_abortable backup_task_impl::is_abortable() const noexcept { return tasks::is_abortable::yes; } +future backup_task_impl::get_progress() const { + co_return tasks::task_manager::task::progress { + .completed = _progress.uploaded, + .total = _progress.total, + }; +} + future<> backup_task_impl::do_backup() { if (!co_await file_exists(_snapshot_dir.native())) { throw std::invalid_argument(fmt::format("snapshot does not exist at {}", _snapshot_dir.native())); @@ -81,7 +89,7 @@ future<> backup_task_impl::do_backup() { // - http::client::max_connections limitation // FIXME -- s3::client is not abortable yet, but when it will be, need to // propagate impl::_as abort requests into upload_file's fibers - std::ignore = _client->upload_file(component_name, destination).handle_exception([comp = component_name, &ex] (std::exception_ptr e) { + std::ignore = _client->upload_file(component_name, destination, _progress).handle_exception([comp = component_name, &ex] (std::exception_ptr e) { snap_log.error("Error uploading {}: {}", comp.native(), e); // keep the first exception if (!ex) { diff --git a/db/snapshot/backup_task.hh b/db/snapshot/backup_task.hh index 602af02dac..c533a7bc7f 100644 --- a/db/snapshot/backup_task.hh +++ b/db/snapshot/backup_task.hh @@ -24,6 +24,8 @@ class backup_task_impl : public tasks::task_manager::task::impl { sstring _bucket; sstring _prefix; std::filesystem::path _snapshot_dir; + s3::upload_progress _progress = {}; + future<> do_backup(); protected: @@ -41,6 +43,7 @@ public: virtual std::string type() const override; virtual tasks::is_internal is_internal() const noexcept override; virtual tasks::is_abortable is_abortable() const noexcept override; + virtual future get_progress() const override; }; } // snapshot namespace diff --git a/test/object_store/test_backup.py b/test/object_store/test_backup.py index 3ad84760bd..85c8470a6f 100644 --- a/test/object_store/test_backup.py +++ b/test/object_store/test_backup.py @@ -69,6 +69,7 @@ async def test_simple_backup(manager: ManagerClient, s3_server): print(f'Status: {status}, waiting to finish') status = await manager.api.wait_task(server.ip_addr, tid) assert (status is not None) and (status['state'] == 'done') + assert (status['progress_total'] > 0) and (status['progress_completed'] == status['progress_total']) objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ]) for f in files: