/* * Copyright (C) 2021-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include "dht/i_partitioner_fwd.hh" #include "dht/token.hh" #include "schema/schema_fwd.hh" #include "sstables/shared_sstable.hh" #include "tasks/task_manager.hh" using namespace seastar; namespace replica { class database; } namespace sstables { class storage_manager; } namespace netw { class messaging_service; } namespace db { namespace view { class view_builder; class view_building_worker; } } namespace service { class storage_service; } namespace locator { class effective_replication_map; } struct stream_progress { float total = 0.; float completed = 0.; virtual ~stream_progress() = default; stream_progress& operator+=(const stream_progress& p) { total += p.total; completed += p.completed; return *this; } void start(float amount) { assert(amount >= 0); total = amount; } virtual void advance(float amount) { // we should not move backward assert(amount >= 0); completed += amount; assert(completed <= total); } }; // The handler of the 'storage_service/load_new_ss_tables' endpoint which, in // turn, is the target of the 'nodetool refresh' command. // Gets sstables from the upload directory and makes them available in the // system. Built on top of the distributed_loader functionality. class sstables_loader : public seastar::peering_sharded_service { public: enum class stream_scope { all, dc, rack, node }; class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "sstables_loader") {} }; private: sharded& _db; sharded& _ss; netw::messaging_service& _messaging; sharded& _view_builder; sharded& _view_building_worker; shared_ptr _task_manager_module; sstables::storage_manager& _storage_manager; seastar::scheduling_group _sched_group; // Note that this is obviously only valid for the current shard. Users of // this facility should elect a shard to be the coordinator based on any // given objective criteria // // It shouldn't be impossible to actively serialize two callers if the need // ever arise. bool _loading_new_sstables = false; future<> load_and_stream(sstring ks_name, sstring cf_name, table_id, std::vector sstables, bool_class primary_replica_only, bool unlink_sstables, stream_scope scope, shared_ptr progress); future> await_topology_quiesced_and_get_erm(table_id table_id); public: sstables_loader(sharded& db, sharded& ss, netw::messaging_service& messaging, sharded& vb, sharded& vbw, tasks::task_manager& tm, sstables::storage_manager& sstm, seastar::scheduling_group sg); future<> stop(); /** * Load new SSTables not currently tracked by the system * * This can be called, for instance, after copying a batch of SSTables to a CF directory. * * This should not be called in parallel for the same keyspace / column family, and doing * so will throw an std::runtime_exception. * * @param ks_name the keyspace in which to search for new SSTables. * @param cf_name the column family in which to search for new SSTables. * @param load_and_stream load SSTables that do not belong to this node and stream them to the appropriate nodes. * @param primary_replica_only whether to stream only to the primary replica that owns the data. * @param skip_cleanup whether to skip the cleanup step when loading SSTables. * @param skip_reshape whether to skip the reshape step when loading SSTables. * @return a future<> when the operation finishes. */ future<> load_new_sstables(sstring ks_name, sstring cf_name, bool load_and_stream, bool primary_replica_only, bool skip_cleanup, bool skip_reshape, stream_scope scope); /** * Download new SSTables not currently tracked by the system from object store */ future download_new_sstables(sstring ks_name, sstring cf_name, sstring prefix, std::vector sstables, sstring endpoint, sstring bucket, stream_scope scope, bool primary_replica); class download_task_impl; }; template <> struct fmt::formatter : fmt::formatter { template auto format(const sstables_loader::stream_scope a, FormatContext& ctx) const { using enum sstables_loader::stream_scope; switch (a) { case all: return formatter::format("all", ctx); case dc: return formatter::format("dc", ctx); case rack: return formatter::format("rack", ctx); case node: return formatter::format("node", ctx); } } }; struct tablet_sstable_collection { dht::token_range tablet_range; std::vector sstables_fully_contained; std::vector sstables_partially_contained; }; // This function is intended for test purposes only. // It assigns the given sstables to the given tablet ranges based on token containment. // It returns a vector of tablet_sstable_collection, each containing the tablet range // and the sstables that are fully or partially contained within that range. // The prerequisite is the tablet ranges are sorted by the range in ascending order and non-overlapping. // Another prerequisite is that the sstables' token ranges are sorted by its `start` in descending order. future> get_sstables_for_tablets_for_tests(const std::vector& sstables, std::vector&& tablets_ranges);