streaming: use host_id in file streaming

Use host ids instead of ips in file-streaming.

Fixes: #22421.

Closes scylladb/scylladb#24055

(cherry picked from commit 2dcea5a27d)

Closes scylladb/scylladb#24119
This commit is contained in:
Aleksandra Martyniuk
2025-05-07 16:09:18 +02:00
committed by Tomasz Grabiec
parent 26bd28dac9
commit fcde30d2b0
6 changed files with 15 additions and 36 deletions

View File

@@ -6001,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver
});
}
inet_address storage_service::host2ip(locator::host_id host) const {
auto ip = _address_map.find(host);
if (!ip) {
throw std::runtime_error(::format("Cannot map host {} to ip", host));
}
return *ip;
}
// Performs a replica-side operation for a given tablet.
// What operation is performed is determined by "op" based on the
// current state of tablet metadata. The coordinator is supposed to prepare tablet
@@ -7272,11 +7264,7 @@ void storage_service::init_messaging_service() {
[this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future<streaming::stream_files_response> {
streaming::stream_files_response resp;
resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future<size_t> {
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future<gms::inet_address> {
return ss.container().invoke_on(0, [host] (storage_service& ss) {
return ss.host2ip(host);
});
});
auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req);
co_return res.stream_bytes;
},
size_t(0),

View File

@@ -210,7 +210,6 @@ private:
// when both of which sit on the same node. So all the movement is local.
future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending);
future<> cleanup_tablet(locator::global_tablet_id);
inet_address host2ip(locator::host_id) const;
// Handler for table load stats RPC.
future<locator::load_stats> load_stats_for_tablet_based_tables();
future<> process_tablet_split_candidate(table_id) noexcept;

View File

@@ -9,7 +9,6 @@
#include "message/messaging_service.hh"
#include "streaming/stream_blob.hh"
#include "streaming/stream_plan.hh"
#include "gms/inet_address.hh"
#include "utils/pretty_printers.hh"
#include "utils/error_injection.hh"
#include "locator/host_id.hh"
@@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db,
future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source) {
@@ -374,7 +373,7 @@ namespace streaming {
// Send files in the files list to the nodes in targets list over network
// Returns number of bytes sent over network
future<size_t>
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) {
tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sources, std::vector<node_and_shard> targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) {
size_t ops_total_size = 0;
if (targets.empty()) {
co_return ops_total_size;
@@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
ops_id, sources.size(), sources, targets);
struct sink_and_source {
gms::inet_address node;
locator::host_id node;
rpc::sink<streaming::stream_blob_cmd_data> sink;
rpc::source<streaming::stream_blob_cmd_data> source;
bool sink_closed = false;
@@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
for (auto& x : targets) {
const auto& node = x.node;
meta.dst_shard_id = x.shard;
auto ip = co_await host2ip(node);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets);
blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets);
auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node);
ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)});
ss.push_back(sink_and_source{node, std::move(sink), std::move(source)});
}
// This fiber sends data to peer node
@@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list<stream_blob_info> sou
}
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) {
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) {
stream_files_response resp;
auto& table = db.find_column_family(req.table);
auto sstables = co_await table.take_storage_snapshot(req.range);
@@ -653,7 +651,7 @@ future<stream_files_response> tablet_stream_files_handler(replica::database& db,
blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}",
req.ops_id, sstables.size(), files.size(), files, req.range);
auto ops_start_time = std::chrono::steady_clock::now();
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard);
resp.stream_bytes = stream_bytes;
auto duration = std::chrono::steady_clock::now() - ops_start_time;
blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}",

View File

@@ -116,13 +116,13 @@ struct stream_blob_info {
};
// The handler for the STREAM_BLOB verb.
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink<streaming::stream_blob_cmd_data> sink, rpc::source<streaming::stream_blob_cmd_data> source);
// Exposed mainly for testing
future<> stream_blob_handler(replica::database& db,
netw::messaging_service& ms,
gms::inet_address from,
locator::host_id from,
streaming::stream_blob_meta meta,
rpc::sink<streaming::stream_blob_cmd_data> sink,
rpc::source<streaming::stream_blob_cmd_data> source,
@@ -163,11 +163,9 @@ public:
size_t stream_bytes = 0;
};
using host2ip_t = std::function<future<gms::inet_address> (locator::host_id)>;
// The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will
// stream sstables files specified by the stream_files_request req.
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip);
future<stream_files_response> tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req);
// Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb.
future<stream_files_response> tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard);
@@ -178,7 +176,6 @@ future<size_t> tablet_stream_files(netw::messaging_service& ms,
std::vector<node_and_shard> targets,
table_id table,
file_stream_id ops_id,
host2ip_t host2ip,
service::frozen_topology_guard topo_guard,
bool may_inject_errors = false
);

View File

@@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
}
});
ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = _ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) {
sslog.warn("Failed to run stream blob handler: {}", eptr);

View File

@@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
if (!verb_register) {
co_await smp::invoke_on_all([&] {
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
auto from = netw::messaging_service::get_source(cinfo).addr;
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = global_ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto path = meta.filename + suffix;
@@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec
co_return make_file_input_stream(std::move(file), foptions);
};
}
auto host2ip = [&global_db] (locator::host_id id) -> future<gms::inet_address> {
co_return global_db.local().get_token_metadata().get_topology().my_address();
};
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error);
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
co_await mark_tablet_stream_done(ops_id);
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
ret = true;