/* * * Modified by ScyllaDB * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0) */ #pragma once #include "streaming/stream_fwd.hh" #include "streaming/progress_info.hh" #include "streaming/stream_reason.hh" #include #include #include "utils/updateable_value.hh" #include "utils/serialized_action.hh" #include "gms/i_endpoint_state_change_subscriber.hh" #include "gms/inet_address.hh" #include "gms/endpoint_state.hh" #include "gms/application_state.hh" #include "service/topology_guard.hh" #include "readers/mutation_reader.hh" #include #include namespace db { class config; namespace view { class view_builder; class view_building_worker; } } namespace service { class migration_manager; }; namespace netw { class messaging_service; }; namespace gms { class gossiper; } namespace replica { class database; } namespace streaming { struct stream_bytes { int64_t bytes_sent = 0; int64_t bytes_received = 0; friend stream_bytes operator+(const stream_bytes& x, const stream_bytes& y) { stream_bytes ret(x); ret += y; return ret; } stream_bytes& operator+=(const stream_bytes& x) { bytes_sent += x.bytes_sent; bytes_received += x.bytes_received; return *this; } }; /** * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked. * * All stream operation should be created through this class to track streaming status and progress. */ class stream_manager : public gms::i_endpoint_state_change_subscriber, public enable_shared_from_this, public peering_sharded_service { using inet_address = gms::inet_address; using endpoint_state = gms::endpoint_state; using endpoint_state_ptr = gms::endpoint_state_ptr; using application_state = gms::application_state; using versioned_value = gms::versioned_value; /* * Currently running streams. Removed after completion/failure. * We manage them in two different maps to distinguish plan from initiated ones to * receiving ones within the same JVM. */ private: sharded& _db; db::view::view_builder& _view_builder; sharded& _view_building_worker; sharded& _ms; sharded& _mm; gms::gossiper& _gossiper; std::unordered_map> _initiated_streams; std::unordered_map> _receiving_streams; std::unordered_map> _stream_bytes; uint64_t _total_incoming_bytes{0}; uint64_t _total_outgoing_bytes{0}; semaphore _mutation_send_limiter{256}; seastar::metrics::metric_groups _metrics; std::unordered_map _finished_percentage; scheduling_group _streaming_group; utils::updateable_value _io_throughput_mbs; serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); }); std::optional> _io_throughput_option_observer; public: stream_manager(db::config& cfg, sharded& db, db::view::view_builder& view_builder, sharded& view_building_worker, sharded& ms, sharded& mm, gms::gossiper& gossiper, scheduling_group sg); future<> start(abort_source& as); future<> stop(); semaphore& mutation_send_limiter() { return _mutation_send_limiter; } void register_sending(shared_ptr result); void register_receiving(shared_ptr result); shared_ptr get_sending_stream(streaming::plan_id plan_id) const; shared_ptr get_receiving_stream(streaming::plan_id plan_id) const; std::vector> get_all_streams() const; replica::database& db() noexcept { return _db.local(); } netw::messaging_service& ms() noexcept { return _ms.local(); } service::migration_manager& mm() noexcept { return _mm.local(); } const std::unordered_map>& get_initiated_streams() const { return _initiated_streams; } const std::unordered_map>& get_receiving_streams() const { return _receiving_streams; } void remove_stream(streaming::plan_id plan_id); void show_streams() const; future<> shutdown() { fail_all_sessions(); return make_ready_future<>(); } void update_progress(streaming::plan_id plan_id, locator::host_id peer, progress_info::direction dir, size_t fm_size); future<> update_all_progress_info(); void remove_progress(streaming::plan_id plan_id); stream_bytes get_progress(streaming::plan_id plan_id, locator::host_id peer) const; stream_bytes get_progress(streaming::plan_id plan_id) const; future<> remove_progress_on_all_shards(streaming::plan_id plan_id); future get_progress_on_all_shards(streaming::plan_id plan_id, locator::host_id peer) const; future get_progress_on_all_shards(streaming::plan_id plan_id) const; future get_progress_on_all_shards(locator::host_id peer) const; future get_progress_on_all_shards(gms::inet_address peer) const; future get_progress_on_all_shards() const; stream_bytes get_progress_on_local_shard() const; shared_ptr get_session(streaming::plan_id plan_id, locator::host_id from, const char* verb, std::optional cf_id = {}); mutation_reader_consumer make_streaming_consumer( uint64_t estimated_partitions, stream_reason, service::frozen_topology_guard); public: virtual future<> on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override; virtual future<> on_remove(inet_address endpoint, locator::host_id id, gms::permit_id) override; virtual future<> on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) override; private: void fail_all_sessions(); void fail_sessions(locator::host_id id); bool has_peer(locator::host_id id) const; void init_messaging_service_handler(abort_source& as); future<> uninit_messaging_service_handler(); future<> update_io_throughput(uint32_t value_mbs); public: void update_finished_percentage(streaming::stream_reason reason, float percentage); uint32_t throughput_mbs() const noexcept { return _io_throughput_mbs.get(); } future<> fail_stream_plan(streaming::plan_id plan_id); scheduling_group get_scheduling_group() const noexcept { return _streaming_group; } }; } // namespace streaming