In Scylla there are two options that control IO bandwidth limit -- the /storage_service/(compaction|stream)_throughput REST API endpoints. The endpoints are partially implemented and have no counterparts in the nodetool. This set implements the missing bits and adds tests for new functionality. Closes scylladb/scylladb#21877 * github.com:scylladb/scylladb: nodetool: Implement [gs]etstreamthroughput commands nodetool: Implement [gs]etcompationthroughput commands test: Add validation of how IO-updating endpoints work api: Implement /storage_service/(stream|compaction)_throughput endpoints api: Disqualify const config reference api: Implement /storage_service/stream_throughput endpoint api: Move stream throughput set/get endpoints from storage service block api: Move set_compaction_throughput_mb_per_sec to config block util: Include fmt/ranges.h in config_file.hh
171 lines
6.4 KiB
C++
171 lines
6.4 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "stream_manager.hh"
|
|
#include "streaming/stream_manager.hh"
|
|
#include "streaming/stream_result_future.hh"
|
|
#include "api/api.hh"
|
|
#include "api/api-doc/stream_manager.json.hh"
|
|
#include "api/api-doc/storage_service.json.hh"
|
|
#include <vector>
|
|
#include <rapidjson/document.h>
|
|
#include "gms/gossiper.hh"
|
|
|
|
namespace api {
|
|
using namespace seastar::httpd;
|
|
|
|
namespace ss = httpd::storage_service_json;
|
|
namespace hs = httpd::stream_manager_json;
|
|
|
|
static void set_summaries(const std::vector<streaming::stream_summary>& from,
|
|
json::json_list<hs::stream_summary>& to) {
|
|
if (!from.empty()) {
|
|
hs::stream_summary res;
|
|
res.cf_id = fmt::to_string(from.front().cf_id);
|
|
// For each stream_session, we pretend we are sending/receiving one
|
|
// file, to make it compatible with nodetool.
|
|
res.files = 1;
|
|
// We can not estimate total number of bytes the stream_session will
|
|
// send or recvieve since we don't know the size of the frozen_mutation
|
|
// until we read it.
|
|
res.total_size = 0;
|
|
to.push(res);
|
|
}
|
|
}
|
|
|
|
static hs::progress_info get_progress_info(const streaming::progress_info& info) {
|
|
hs::progress_info res;
|
|
res.current_bytes = info.current_bytes;
|
|
res.direction = info.dir;
|
|
res.file_name = info.file_name;
|
|
res.peer = fmt::to_string(info.peer);
|
|
res.session_index = 0;
|
|
res.total_bytes = info.total_bytes;
|
|
return res;
|
|
}
|
|
|
|
static void set_files(const std::map<sstring, streaming::progress_info>& from,
|
|
json::json_list<hs::progress_info_mapper>& to) {
|
|
for (auto i : from) {
|
|
hs::progress_info_mapper m;
|
|
m.key = i.first;
|
|
m.value = get_progress_info(i.second);
|
|
to.push(m);
|
|
}
|
|
}
|
|
|
|
static hs::stream_state get_state(
|
|
streaming::stream_result_future& result_future) {
|
|
hs::stream_state state;
|
|
state.description = result_future.description;
|
|
state.plan_id = result_future.plan_id.to_sstring();
|
|
for (auto info : result_future.get_coordinator().get()->get_all_session_info()) {
|
|
hs::stream_info si;
|
|
si.peer = fmt::to_string(info.peer);
|
|
si.session_index = 0;
|
|
si.state = info.state;
|
|
si.connecting = si.peer;
|
|
set_summaries(info.receiving_summaries, si.receiving_summaries);
|
|
set_summaries(info.sending_summaries, si.sending_summaries);
|
|
set_files(info.receiving_files, si.receiving_files);
|
|
set_files(info.sending_files, si.sending_files);
|
|
state.sessions.push(si);
|
|
}
|
|
return state;
|
|
}
|
|
|
|
void set_stream_manager(http_context& ctx, routes& r, sharded<streaming::stream_manager>& sm) {
|
|
hs::get_current_streams.set(r,
|
|
[&sm] (std::unique_ptr<request> req) {
|
|
return sm.invoke_on_all([] (auto& sm) {
|
|
return sm.update_all_progress_info();
|
|
}).then([&sm] {
|
|
return sm.map_reduce0([](streaming::stream_manager& stream) {
|
|
std::vector<hs::stream_state> res;
|
|
for (auto i : stream.get_initiated_streams()) {
|
|
res.push_back(get_state(*i.second.get()));
|
|
}
|
|
for (auto i : stream.get_receiving_streams()) {
|
|
res.push_back(get_state(*i.second.get()));
|
|
}
|
|
return res;
|
|
}, std::vector<hs::stream_state>(),concat<hs::stream_state>).
|
|
then([](const std::vector<hs::stream_state>& res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
});
|
|
|
|
hs::get_all_active_streams_outbound.set(r, [&sm](std::unique_ptr<request> req) {
|
|
return sm.map_reduce0([](streaming::stream_manager& stream) {
|
|
return stream.get_initiated_streams().size();
|
|
}, 0, std::plus<int64_t>()).then([](int64_t res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
|
|
hs::get_total_incoming_bytes.set(r, [&sm](std::unique_ptr<request> req) {
|
|
gms::inet_address peer(req->get_path_param("peer"));
|
|
return sm.map_reduce0([peer](streaming::stream_manager& sm) {
|
|
return sm.get_progress_on_all_shards(peer).then([] (auto sbytes) {
|
|
return sbytes.bytes_received;
|
|
});
|
|
}, 0, std::plus<int64_t>()).then([](int64_t res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
|
|
hs::get_all_total_incoming_bytes.set(r, [&sm](std::unique_ptr<request> req) {
|
|
return sm.map_reduce0([](streaming::stream_manager& sm) {
|
|
return sm.get_progress_on_all_shards().then([] (auto sbytes) {
|
|
return sbytes.bytes_received;
|
|
});
|
|
}, 0, std::plus<int64_t>()).then([](int64_t res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
|
|
hs::get_total_outgoing_bytes.set(r, [&sm](std::unique_ptr<request> req) {
|
|
gms::inet_address peer(req->get_path_param("peer"));
|
|
return sm.map_reduce0([peer] (streaming::stream_manager& sm) {
|
|
return sm.get_progress_on_all_shards(peer).then([] (auto sbytes) {
|
|
return sbytes.bytes_sent;
|
|
});
|
|
}, 0, std::plus<int64_t>()).then([](int64_t res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
|
|
hs::get_all_total_outgoing_bytes.set(r, [&sm](std::unique_ptr<request> req) {
|
|
return sm.map_reduce0([](streaming::stream_manager& sm) {
|
|
return sm.get_progress_on_all_shards().then([] (auto sbytes) {
|
|
return sbytes.bytes_sent;
|
|
});
|
|
}, 0, std::plus<int64_t>()).then([](int64_t res) {
|
|
return make_ready_future<json::json_return_type>(res);
|
|
});
|
|
});
|
|
|
|
ss::get_stream_throughput_mb_per_sec.set(r, [&sm](std::unique_ptr<http::request> req) {
|
|
auto value = sm.local().throughput_mbs();
|
|
return make_ready_future<json::json_return_type>(value);
|
|
});
|
|
}
|
|
|
|
void unset_stream_manager(http_context& ctx, routes& r) {
|
|
hs::get_current_streams.unset(r);
|
|
hs::get_all_active_streams_outbound.unset(r);
|
|
hs::get_total_incoming_bytes.unset(r);
|
|
hs::get_all_total_incoming_bytes.unset(r);
|
|
hs::get_total_outgoing_bytes.unset(r);
|
|
hs::get_all_total_outgoing_bytes.unset(r);
|
|
ss::get_stream_throughput_mb_per_sec.unset(r);
|
|
}
|
|
|
|
}
|