mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-25 19:10:42 +00:00
When a backup upload is aborted due to instance shutdown, change the log level from ERROR to INFO since this is expected behavior. Previously, `abort_requested_exception` during upload would trigger an ERROR log, causing test failures since error logs indicate unexpected issues. This change: - Catches `abort_requested_exception` specifically during file uploads - Logs these shutdown-triggered aborts at INFO level instead of ERROR - Aligns with how `abort_requested_exception` is handled elsewhere in the service This prevents false test failures while still informing administrators about aborted uploads during shutdown. Fixes scylladb/scylladb#22391 Signed-off-by: Kefu Chai <kefu.chai@scylladb.com> Closes scylladb/scylladb#22995
164 lines
5.5 KiB
C++
164 lines
5.5 KiB
C++
/*
|
|
* Copyright (C) 2024-present ScyllaDB
|
|
*
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <seastar/core/abort_source.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/coroutine/maybe_yield.hh>
|
|
|
|
#include "utils/lister.hh"
|
|
#include "utils/s3/client.hh"
|
|
#include "replica/database.hh"
|
|
#include "db/config.hh"
|
|
#include "db/snapshot-ctl.hh"
|
|
#include "db/snapshot/backup_task.hh"
|
|
#include "schema/schema_fwd.hh"
|
|
#include "sstables/sstables.hh"
|
|
#include "utils/error_injection.hh"
|
|
|
|
extern logging::logger snap_log;
|
|
|
|
namespace db::snapshot {
|
|
|
|
backup_task_impl::backup_task_impl(tasks::task_manager::module_ptr module,
|
|
snapshot_ctl& ctl,
|
|
shared_ptr<s3::client> client,
|
|
sstring bucket,
|
|
sstring prefix,
|
|
sstring ks,
|
|
std::filesystem::path snapshot_dir,
|
|
bool move_files) noexcept
|
|
: tasks::task_manager::task::impl(module, tasks::task_id::create_random_id(), 0, "node", ks, "", "", tasks::task_id::create_null_id())
|
|
, _snap_ctl(ctl)
|
|
, _client(std::move(client))
|
|
, _bucket(std::move(bucket))
|
|
, _prefix(std::move(prefix))
|
|
, _snapshot_dir(std::move(snapshot_dir))
|
|
, _remove_on_uploaded(move_files) {
|
|
_status.progress_units = "bytes ('total' may grow along the way)";
|
|
}
|
|
|
|
std::string backup_task_impl::type() const {
|
|
return "backup";
|
|
}
|
|
|
|
tasks::is_internal backup_task_impl::is_internal() const noexcept {
|
|
return tasks::is_internal::no;
|
|
}
|
|
|
|
tasks::is_abortable backup_task_impl::is_abortable() const noexcept {
|
|
return tasks::is_abortable::yes;
|
|
}
|
|
|
|
future<tasks::task_manager::task::progress> backup_task_impl::get_progress() const {
|
|
co_return tasks::task_manager::task::progress {
|
|
.completed = _progress.uploaded,
|
|
.total = _progress.total,
|
|
};
|
|
}
|
|
|
|
tasks::is_user_task backup_task_impl::is_user_task() const noexcept {
|
|
return tasks::is_user_task::yes;
|
|
}
|
|
|
|
future<> backup_task_impl::upload_component(sstring name) {
|
|
auto component_name = _snapshot_dir / name;
|
|
auto destination = fmt::format("/{}/{}/{}", _bucket, _prefix, name);
|
|
snap_log.trace("Upload {} to {}", component_name.native(), destination);
|
|
|
|
// Start uploading in the background. The caller waits for these fibers
|
|
// with the uploads gate.
|
|
// Parallelism is implicitly controlled in two ways:
|
|
// - s3::client::claim_memory semaphore
|
|
// - http::client::max_connections limitation
|
|
try {
|
|
co_await _client->upload_file(component_name, destination, _progress, &_as);
|
|
} catch (const abort_requested_exception&) {
|
|
snap_log.info("Upload aborted per requested: {}", component_name.native());
|
|
throw;
|
|
} catch (...) {
|
|
snap_log.error("Error uploading {}: {}", component_name.native(), std::current_exception());
|
|
throw;
|
|
}
|
|
|
|
if (!_remove_on_uploaded) {
|
|
co_return;
|
|
}
|
|
|
|
// Delete the uploaded component to:
|
|
// 1. Free up disk space immediately
|
|
// 2. Avoid costly S3 existence checks on future backup attempts
|
|
try {
|
|
co_await remove_file(component_name.native());
|
|
} catch (...) {
|
|
// If deletion of an uploaded file fails, the backup process will continue.
|
|
// While this doesn't halt the backup, it may indicate filesystem permissions
|
|
// issues or system constraints that should be investigated.
|
|
snap_log.warn("Failed to remove {}: {}", component_name, std::current_exception());
|
|
}
|
|
}
|
|
|
|
future<> backup_task_impl::do_backup() {
|
|
if (!co_await file_exists(_snapshot_dir.native())) {
|
|
throw std::invalid_argument(fmt::format("snapshot does not exist at {}", _snapshot_dir.native()));
|
|
}
|
|
|
|
std::exception_ptr ex;
|
|
gate uploads;
|
|
auto snapshot_dir_lister = directory_lister(_snapshot_dir, lister::dir_entry_types::of<directory_entry_type::regular>());
|
|
|
|
for (;;) {
|
|
std::optional<directory_entry> component_ent;
|
|
try {
|
|
component_ent = co_await snapshot_dir_lister.get();
|
|
} catch (...) {
|
|
if (!ex) {
|
|
ex = std::current_exception();
|
|
break;
|
|
}
|
|
}
|
|
if (!component_ent.has_value()) {
|
|
break;
|
|
}
|
|
auto gh = uploads.hold();
|
|
|
|
// Pre-upload break point. For testing abort in actual s3 client usage.
|
|
co_await utils::get_local_injector().inject("backup_task_pre_upload", utils::wait_for_message(std::chrono::minutes(2)));
|
|
|
|
std::ignore = upload_component(component_ent->name).handle_exception([&ex] (std::exception_ptr e) {
|
|
// keep the first exception
|
|
if (!ex) {
|
|
ex = std::move(e);
|
|
}
|
|
}).finally([gh = std::move(gh)] {});
|
|
co_await coroutine::maybe_yield();
|
|
co_await utils::get_local_injector().inject("backup_task_pause", utils::wait_for_message(std::chrono::minutes(2)));
|
|
if (impl::_as.abort_requested()) {
|
|
ex = impl::_as.abort_requested_exception_ptr();
|
|
break;
|
|
}
|
|
}
|
|
|
|
co_await snapshot_dir_lister.close();
|
|
co_await uploads.close();
|
|
if (ex) {
|
|
co_await coroutine::return_exception_ptr(std::move(ex));
|
|
}
|
|
}
|
|
|
|
future<> backup_task_impl::run() {
|
|
// do_backup() removes a file once it is fully uploaded, so we are actually
|
|
// mutating snapshots.
|
|
co_await _snap_ctl.run_snapshot_modify_operation([this] {
|
|
return do_backup();
|
|
});
|
|
snap_log.info("Finished backup");
|
|
}
|
|
|
|
} // db::snapshot namespace
|