/* * Copyright (C) 2024-present ScyllaDB * */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include #include #include #include #include #include #include #include "utils/upload_progress.hh" #include "utils/small_vector.hh" #include "tasks/task_manager.hh" #include "sstables/component_type.hh" #include "sstables/generation_type.hh" #include "sstables/sstables_manager_subscription.hh" namespace replica { class database; } namespace sstables { class sstables_manager; class storage_manager; class object_storage_client; } namespace db { class snapshot_ctl; namespace snapshot { class backup_task_impl : public tasks::task_manager::task::impl { snapshot_ctl& _snap_ctl; sharded& _sstm; sstring _endpoint; sstring _bucket; sstring _prefix; std::filesystem::path _snapshot_dir; table_id _table_id; bool _remove_on_uploaded; tasks::task_manager::task::progress _total_progress; std::exception_ptr _ex; std::vector _files; using comps_vector = utils::small_vector; using comps_map = std::unordered_map; comps_map _sstable_comps; // Keeps all sstable components to back up, extract entries once queued for upload std::unordered_set _sstables_in_snapshot; // Keeps all sstable generations in snapshot std::vector _deleted_sstables; shard_id _backup_shard; class worker : sstables::sstables_manager_event_handler { sstables::sstables_manager& _manager; backup_task_impl& _task; shared_ptr _client; abort_source _as; std::exception_ptr _ex; public: worker(const replica::database& db, table_id t, backup_task_impl& task); ~worker(); future<> start_uploading(); void abort(); sstables::sstables_manager& manager() const noexcept { return _manager; } private: virtual future<> deleted_sstable(sstables::generation_type gen) const override; struct upload_permit { gate::holder gh; semaphore_units<> units; }; future<> backup_file(sstring name, upload_permit permit); future<> upload_component(sstring name); }; sharded _sharded_worker; std::vector _progress_per_shard{smp::count}; future<> do_backup(); future<> process_snapshot_dir(); // Returns a disengaged optional when done std::optional dequeue(); void dequeue_sstable(); void on_sstable_deletion(sstables::generation_type gen); protected: virtual future<> run() override; public: backup_task_impl(tasks::task_manager::module_ptr module, snapshot_ctl& ctl, sharded& sstm, sstring endpoint, sstring bucket, sstring prefix, sstring ks, std::filesystem::path snapshot_dir, table_id tid, bool move_files) noexcept; virtual std::string type() const override; virtual tasks::is_internal is_internal() const noexcept override; virtual tasks::is_abortable is_abortable() const noexcept override; virtual future get_progress() const override; virtual tasks::is_user_task is_user_task() const noexcept override; }; } // snapshot namespace } // db namespace