backup_task: Report uploading progress

Do it by passing reference to s3::upload_progress_monitor object that
sits on task impl itself. Different files' uploads would then update the
monitor with their sizes and uploaded counters. The structure is
reported by get_progress() method. Unit size is set to be bytes. Test is
updated.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2024-10-16 13:20:23 +03:00
parent 2efcfc13e8
commit 25ae3d0aed
3 changed files with 13 additions and 1 deletions

View File

@@ -34,6 +34,7 @@ backup_task_impl::backup_task_impl(tasks::task_manager::module_ptr module,
, _bucket(std::move(bucket))
, _prefix(std::move(prefix))
, _snapshot_dir(std::move(snapshot_dir)) {
_status.progress_units = "bytes ('total' may grow along the way)";
}
std::string backup_task_impl::type() const {
@@ -48,6 +49,13 @@ tasks::is_abortable backup_task_impl::is_abortable() const noexcept {
return tasks::is_abortable::yes;
}
future<tasks::task_manager::task::progress> backup_task_impl::get_progress() const {
co_return tasks::task_manager::task::progress {
.completed = _progress.uploaded,
.total = _progress.total,
};
}
future<> backup_task_impl::do_backup() {
if (!co_await file_exists(_snapshot_dir.native())) {
throw std::invalid_argument(fmt::format("snapshot does not exist at {}", _snapshot_dir.native()));
@@ -81,7 +89,7 @@ future<> backup_task_impl::do_backup() {
// - http::client::max_connections limitation
// FIXME -- s3::client is not abortable yet, but when it will be, need to
// propagate impl::_as abort requests into upload_file's fibers
std::ignore = _client->upload_file(component_name, destination).handle_exception([comp = component_name, &ex] (std::exception_ptr e) {
std::ignore = _client->upload_file(component_name, destination, _progress).handle_exception([comp = component_name, &ex] (std::exception_ptr e) {
snap_log.error("Error uploading {}: {}", comp.native(), e);
// keep the first exception
if (!ex) {

View File

@@ -24,6 +24,8 @@ class backup_task_impl : public tasks::task_manager::task::impl {
sstring _bucket;
sstring _prefix;
std::filesystem::path _snapshot_dir;
s3::upload_progress _progress = {};
future<> do_backup();
protected:
@@ -41,6 +43,7 @@ public:
virtual std::string type() const override;
virtual tasks::is_internal is_internal() const noexcept override;
virtual tasks::is_abortable is_abortable() const noexcept override;
virtual future<tasks::task_manager::task::progress> get_progress() const override;
};
} // snapshot namespace

View File

@@ -69,6 +69,7 @@ async def test_simple_backup(manager: ManagerClient, s3_server):
print(f'Status: {status}, waiting to finish')
status = await manager.api.wait_task(server.ip_addr, tid)
assert (status is not None) and (status['state'] == 'done')
assert (status['progress_total'] > 0) and (status['progress_completed'] == status['progress_total'])
objects = set([ o.key for o in get_s3_resource(s3_server).Bucket(s3_server.bucket_name).objects.all() ])
for f in files: