Files
scylladb/streaming/stream_manager.hh
Pavel Emelyanov 15c41bfb6c code: Live-update IO throughputs from main
Currently we have two live-updateable IO-throughput options -- one for
streaming and one for compaction. Both are observed and the changed
value is applied to the corresponding scheduling_group by the relevant
serice -- respectively, stream_manager and compaction_manager.

Both observe/react/apply places use pretty heavy boilerplate code for
such simple task. Next patches will make things worse by adding two more
options to control IO throughput of some other groups.

Said that, the proposal is to hold the updating code in main.cc with the
help of a wrapper class. In there all the needed bits are at hand, and
classes can get their IO updates applied easily.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
2026-03-23 16:00:02 +03:00

202 lines
6.9 KiB
C++

/*
*
* Modified by ScyllaDB
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
*/
#pragma once
#include "streaming/stream_fwd.hh"
#include "streaming/progress_info.hh"
#include "streaming/stream_reason.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sharded.hh>
#include "utils/updateable_value.hh"
#include "utils/serialized_action.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "gms/inet_address.hh"
#include "gms/endpoint_state.hh"
#include "gms/application_state.hh"
#include "service/topology_guard.hh"
#include "readers/mutation_reader.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/metrics_registration.hh>
namespace db {
class config;
namespace view {
class view_builder;
class view_building_worker;
}
}
namespace service {
class migration_manager;
};
namespace netw {
class messaging_service;
};
namespace gms {
class gossiper;
}
namespace replica {
class database;
}
namespace streaming {
struct stream_bytes {
int64_t bytes_sent = 0;
int64_t bytes_received = 0;
friend stream_bytes operator+(const stream_bytes& x, const stream_bytes& y) {
stream_bytes ret(x);
ret += y;
return ret;
}
stream_bytes& operator+=(const stream_bytes& x) {
bytes_sent += x.bytes_sent;
bytes_received += x.bytes_received;
return *this;
}
};
/**
* StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
*
* All stream operation should be created through this class to track streaming status and progress.
*/
class stream_manager : public gms::i_endpoint_state_change_subscriber, public enable_shared_from_this<stream_manager>, public peering_sharded_service<stream_manager> {
using inet_address = gms::inet_address;
using endpoint_state = gms::endpoint_state;
using endpoint_state_ptr = gms::endpoint_state_ptr;
using application_state = gms::application_state;
using versioned_value = gms::versioned_value;
/*
* Currently running streams. Removed after completion/failure.
* We manage them in two different maps to distinguish plan from initiated ones to
* receiving ones within the same JVM.
*/
private:
sharded<replica::database>& _db;
db::view::view_builder& _view_builder;
sharded<db::view::view_building_worker>& _view_building_worker;
sharded<netw::messaging_service>& _ms;
sharded<service::migration_manager>& _mm;
gms::gossiper& _gossiper;
std::unordered_map<plan_id, shared_ptr<stream_result_future>> _initiated_streams;
std::unordered_map<plan_id, shared_ptr<stream_result_future>> _receiving_streams;
std::unordered_map<plan_id, std::unordered_map<locator::host_id, stream_bytes>> _stream_bytes;
uint64_t _total_incoming_bytes{0};
uint64_t _total_outgoing_bytes{0};
semaphore _mutation_send_limiter{256};
seastar::metrics::metric_groups _metrics;
std::unordered_map<streaming::stream_reason, float> _finished_percentage;
scheduling_group _streaming_group;
utils::updateable_value<uint32_t> _io_throughput_mbs;
public:
stream_manager(db::config& cfg, sharded<replica::database>& db,
db::view::view_builder& view_builder,
sharded<db::view::view_building_worker>& view_building_worker,
sharded<netw::messaging_service>& ms,
sharded<service::migration_manager>& mm,
gms::gossiper& gossiper, scheduling_group sg);
future<> start(abort_source& as);
future<> stop();
semaphore& mutation_send_limiter() { return _mutation_send_limiter; }
void register_sending(shared_ptr<stream_result_future> result);
void register_receiving(shared_ptr<stream_result_future> result);
shared_ptr<stream_result_future> get_sending_stream(streaming::plan_id plan_id) const;
shared_ptr<stream_result_future> get_receiving_stream(streaming::plan_id plan_id) const;
std::vector<shared_ptr<stream_result_future>> get_all_streams() const;
replica::database& db() noexcept { return _db.local(); }
netw::messaging_service& ms() noexcept { return _ms.local(); }
service::migration_manager& mm() noexcept { return _mm.local(); }
const std::unordered_map<plan_id, shared_ptr<stream_result_future>>& get_initiated_streams() const {
return _initiated_streams;
}
const std::unordered_map<plan_id, shared_ptr<stream_result_future>>& get_receiving_streams() const {
return _receiving_streams;
}
void remove_stream(streaming::plan_id plan_id);
void show_streams() const;
future<> shutdown() {
fail_all_sessions();
return make_ready_future<>();
}
void update_progress(streaming::plan_id plan_id, locator::host_id peer, progress_info::direction dir, size_t fm_size);
future<> update_all_progress_info();
void remove_progress(streaming::plan_id plan_id);
stream_bytes get_progress(streaming::plan_id plan_id, locator::host_id peer) const;
stream_bytes get_progress(streaming::plan_id plan_id) const;
future<> remove_progress_on_all_shards(streaming::plan_id plan_id);
future<stream_bytes> get_progress_on_all_shards(streaming::plan_id plan_id, locator::host_id peer) const;
future<stream_bytes> get_progress_on_all_shards(streaming::plan_id plan_id) const;
future<stream_bytes> get_progress_on_all_shards(locator::host_id peer) const;
future<stream_bytes> get_progress_on_all_shards(gms::inet_address peer) const;
future<stream_bytes> get_progress_on_all_shards() const;
stream_bytes get_progress_on_local_shard() const;
shared_ptr<stream_session> get_session(streaming::plan_id plan_id, locator::host_id from, const char* verb, std::optional<table_id> cf_id = {});
mutation_reader_consumer make_streaming_consumer(
uint64_t estimated_partitions, stream_reason, service::frozen_topology_guard);
public:
virtual future<> on_dead(inet_address endpoint, locator::host_id id, endpoint_state_ptr state, gms::permit_id) override;
virtual future<> on_remove(inet_address endpoint, locator::host_id id, gms::permit_id) override;
virtual future<> on_restart(inet_address endpoint, locator::host_id id, endpoint_state_ptr ep_state, gms::permit_id) override;
private:
void fail_all_sessions();
void fail_sessions(locator::host_id id);
bool has_peer(locator::host_id id) const;
void init_messaging_service_handler(abort_source& as);
future<> uninit_messaging_service_handler();
future<> update_io_throughput(uint32_t value_mbs);
public:
void update_finished_percentage(streaming::stream_reason reason, float percentage);
uint32_t throughput_mbs() const noexcept {
return _io_throughput_mbs.get();
}
future<> fail_stream_plan(streaming::plan_id plan_id);
scheduling_group get_scheduling_group() const noexcept { return _streaming_group; }
};
} // namespace streaming