Files
scylladb/db/snapshot/backup_task.hh
Calle Wilund 5d4558df3b sstables: Use object_storage_client for remote storage
Replaces direct s3 interfaces with the abstraction layer, and open
for having multiple implentations/backends
2025-10-13 08:53:25 +00:00

127 lines
3.8 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <filesystem>
#include <exception>
#include <vector>
#include <unordered_map>
#include <unordered_set>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include "utils/upload_progress.hh"
#include "utils/small_vector.hh"
#include "tasks/task_manager.hh"
#include "sstables/component_type.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstables_manager_subscription.hh"
namespace replica {
class database;
}
namespace sstables {
class sstables_manager;
class storage_manager;
class object_storage_client;
}
namespace db {
class snapshot_ctl;
namespace snapshot {
class backup_task_impl : public tasks::task_manager::task::impl {
snapshot_ctl& _snap_ctl;
sharded<sstables::storage_manager>& _sstm;
sstring _endpoint;
sstring _bucket;
sstring _prefix;
std::filesystem::path _snapshot_dir;
table_id _table_id;
bool _remove_on_uploaded;
tasks::task_manager::task::progress _total_progress;
std::exception_ptr _ex;
std::vector<sstring> _files;
using comps_vector = utils::small_vector<std::string, sstables::num_component_types>;
using comps_map = std::unordered_map<sstables::generation_type, comps_vector>;
comps_map _sstable_comps; // Keeps all sstable components to back up, extract entries once queued for upload
std::unordered_set<sstables::generation_type> _sstables_in_snapshot; // Keeps all sstable generations in snapshot
std::vector<sstables::generation_type> _deleted_sstables;
shard_id _backup_shard;
class worker : sstables::sstables_manager_event_handler {
sstables::sstables_manager& _manager;
backup_task_impl& _task;
shared_ptr<sstables::object_storage_client> _client;
abort_source _as;
std::exception_ptr _ex;
public:
worker(const replica::database& db, table_id t, backup_task_impl& task);
~worker();
future<> start_uploading();
void abort();
sstables::sstables_manager& manager() const noexcept {
return _manager;
}
private:
virtual future<> deleted_sstable(sstables::generation_type gen) const override;
struct upload_permit {
gate::holder gh;
semaphore_units<> units;
};
future<> backup_file(sstring name, upload_permit permit);
future<> upload_component(sstring name);
};
sharded<worker> _sharded_worker;
std::vector<utils::upload_progress> _progress_per_shard{smp::count};
future<> do_backup();
future<> process_snapshot_dir();
// Returns a disengaged optional when done
std::optional<std::string> dequeue();
void dequeue_sstable();
void on_sstable_deletion(sstables::generation_type gen);
protected:
virtual future<> run() override;
public:
backup_task_impl(tasks::task_manager::module_ptr module,
snapshot_ctl& ctl,
sharded<sstables::storage_manager>& sstm,
sstring endpoint,
sstring bucket,
sstring prefix,
sstring ks,
std::filesystem::path snapshot_dir,
table_id tid,
bool move_files) noexcept;
virtual std::string type() const override;
virtual tasks::is_internal is_internal() const noexcept override;
virtual tasks::is_abortable is_abortable() const noexcept override;
virtual future<tasks::task_manager::task::progress> get_progress() const override;
virtual tasks::is_user_task is_user_task() const noexcept override;
};
} // snapshot namespace
} // db namespace