Files
scylladb/streaming/stream_manager.hh
Pavel Emelyanov 6f3f30ee07 storage_service: Use stream_manager group for streaming
The hander of raft_topology_cmd::command::stream_ranges switches to
streaming scheduling group to perform data streaming in it. It grabs the
group from database db_config, which's not great. There's streaming
manager at hand in storage service handlers, since it's using its
functionality, it should use _its_ scheduling group.

This will help splitting the streaming scheduling group into more
elaborated groups under the maintenance supergroup: SCYLLADB-351

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Closes scylladb/scylladb#28363
2026-02-01 20:42:37 +02:00

204 lines
7.1 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "streaming/stream_fwd.hh"
#include "streaming/progress_info.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sharded.hh>
#include "utils/updateable_value.hh"
#include "utils/serialized_action.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"
#include "gms/application_state.hh"
#include "service/topology_guard.hh"
#include "readers/mutation_reader.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/metrics_registration.hh>
namespace db {
class config;
namespace view {
class view_builder;
class view_building_worker;
}
}
namespace service {
class migration_manager;
};
namespace netw {
class messaging_service;
};
namespace gms {
class gossiper;
}
namespace replica {
class database;
}
namespace streaming {
struct stream_bytes {
int64_t bytes_sent = 0;
int64_t bytes_received = 0;
friend stream_bytes operator+(const stream_bytes& x, const stream_bytes& y) {
stream_bytes ret(x);
ret += y;
return ret;
}
stream_bytes& operator+=(const stream_bytes& x) {
bytes_sent += x.bytes_sent;
bytes_received += x.bytes_received;
return *this;
}
};
/**
* StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
*
* All stream operation should be created through this class to track streaming status and progress.
*/
class stream_manager : public gms::i_endpoint_state_change_subscriber, public enable_shared_from_this<stream_manager>, public peering_sharded_service<stream_manager> {
using inet_address = gms::inet_address;
using endpoint_state = gms::endpoint_state;
using endpoint_state_ptr = gms::endpoint_state_ptr;
using application_state = gms::application_state;
using versioned_value = gms::versioned_value;
/*
* Currently running streams. Removed after completion/failure.
* We manage them in two different maps to distinguish plan from initiated ones to
* receiving ones within the same JVM.
*/
private:
sharded<replica::database>& _db;
db::view::view_builder& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
sharded<netw::messaging_service>& _ms;
sharded<service::migration_manager>& _mm;
gms::gossiper& _gossiper;
std::unordered_map<plan_id, shared_ptr<stream_result_future>> _initiated_streams;
std::unordered_map<plan_id, shared_ptr<stream_result_future>> _receiving_streams;
std::unordered_map<plan_id, std::unordered_map<locator::host_id, stream_bytes>> _stream_bytes;
uint64_t _total_incoming_bytes{0};
uint64_t _total_outgoing_bytes{0};
semaphore _mutation_send_limiter{256};
seastar::metrics::metric_groups _metrics;
std::unordered_map<streaming::stream_reason, float> _finished_percentage;
scheduling_group _streaming_group;
utils::updateable_value<uint32_t> _io_throughput_mbs;
serialized_action _io_throughput_updater = serialized_action([this] { return update_io_throughput(_io_throughput_mbs()); });
std::optional<utils::observer<uint32_t>> _io_throughput_option_observer;
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg);
future<> start(abort_source& as);
future<> stop();
semaphore& mutation_send_limiter() { return _mutation_send_limiter; }
void register_sending(shared_ptr<stream_result_future> result);
void register_receiving(shared_ptr<stream_result_future> result);
shared_ptr<stream_result_future> get_sending_stream(streaming::plan_id plan_id) const;
shared_ptr<stream_result_future> get_receiving_stream(streaming::plan_id plan_id) const;
std::vector<shared_ptr<stream_result_future>> get_all_streams() const;
replica::database& db() noexcept { return _db.local(); }
netw::messaging_service& ms() noexcept { return _ms.local(); }
service::migration_manager& mm() noexcept { return _mm.local(); }
const std::unordered_map<plan_id, shared_ptr<stream_result_future>>& get_initiated_streams() const {
return _initiated_streams;
}
const std::unordered_map<plan_id, shared_ptr<stream_result_future>>& get_receiving_streams() const {
return _receiving_streams;
}
void remove_stream(streaming::plan_id plan_id);
void show_streams() const;
future<> shutdown() {
fail_all_sessions();
return make_ready_future<>();
}
void update_progress(streaming::plan_id plan_id, locator::host_id peer, progress_info::direction dir, size_t fm_size);
future<> update_all_progress_info();
void remove_progress(streaming::plan_id plan_id);
stream_bytes get_progress(streaming::plan_id plan_id, locator::host_id peer) const;
stream_bytes get_progress(streaming::plan_id plan_id) const;
future<> remove_progress_on_all_shards(streaming::plan_id plan_id);
future<stream_bytes> get_progress_on_all_shards(streaming::plan_id plan_id, locator::host_id peer) const;
future<stream_bytes> get_progress_on_all_shards(streaming::plan_id plan_id) const;
future<stream_bytes> get_progress_on_all_shards(locator::host_id peer) const;
future<stream_bytes> get_progress_on_all_shards(gms::inet_address peer) const;
future<stream_bytes> get_progress_on_all_shards() const;
stream_bytes get_progress_on_local_shard() const;
shared_ptr<stream_session> get_session(streaming::plan_id plan_id, locator::host_id from, const char* verb, std::optional<table_id> cf_id = {});
mutation_reader_consumer make_streaming_consumer(
uint64_t estimated_partitions, stream_reason, service::frozen_topology_guard);
public:
virtual future<> on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override;
virtual future<> on_remove(inet_address endpoint, locator::host_id id, gms::permit_id) override;
virtual future<> on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) override;
private:
void fail_all_sessions();
void fail_sessions(locator::host_id id);
bool has_peer(locator::host_id id) const;
void init_messaging_service_handler(abort_source& as);
future<> uninit_messaging_service_handler();
future<> update_io_throughput(uint32_t value_mbs);
public:
void update_finished_percentage(streaming::stream_reason reason, float percentage);
uint32_t throughput_mbs() const noexcept {
return _io_throughput_mbs.get();
}
future<> fail_stream_plan(streaming::plan_id plan_id);
scheduling_group get_scheduling_group() const noexcept { return _streaming_group; }
};
} // namespace streaming