diff --git a/service/storage_service.cc b/service/storage_service.cc index 3cd662b733..690511bafc 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6001,14 +6001,6 @@ future<> storage_service::update_fence_version(token_metadata::version_t new_ver }); } -inet_address storage_service::host2ip(locator::host_id host) const { - auto ip = _address_map.find(host); - if (!ip) { - throw std::runtime_error(::format("Cannot map host {} to ip", host)); - } - return *ip; -} - // Performs a replica-side operation for a given tablet. // What operation is performed is determined by "op" based on the // current state of tablet metadata. The coordinator is supposed to prepare tablet @@ -7272,11 +7264,7 @@ void storage_service::init_messaging_service() { [this] (const rpc::client_info& cinfo, streaming::stream_files_request req) -> future { streaming::stream_files_response resp; resp.stream_bytes = co_await container().map_reduce0([req] (storage_service& ss) -> future { - auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req, [&ss] (locator::host_id host) -> future { - return ss.container().invoke_on(0, [host] (storage_service& ss) { - return ss.host2ip(host); - }); - }); + auto res = co_await streaming::tablet_stream_files_handler(ss._db.local(), ss._messaging.local(), req); co_return res.stream_bytes; }, size_t(0), diff --git a/service/storage_service.hh b/service/storage_service.hh index e3d0f09c73..112c401d70 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -210,7 +210,6 @@ private: // when both of which sit on the same node. So all the movement is local. future<> clone_locally_tablet_storage(locator::global_tablet_id, locator::tablet_replica leaving, locator::tablet_replica pending); future<> cleanup_tablet(locator::global_tablet_id); - inet_address host2ip(locator::host_id) const; // Handler for table load stats RPC. future load_stats_for_tablet_based_tables(); future<> process_tablet_split_candidate(table_id) noexcept; diff --git a/streaming/stream_blob.cc b/streaming/stream_blob.cc index e816759853..30f9b97b38 100644 --- a/streaming/stream_blob.cc +++ b/streaming/stream_blob.cc @@ -9,7 +9,6 @@ #include "message/messaging_service.hh" #include "streaming/stream_blob.hh" #include "streaming/stream_plan.hh" -#include "gms/inet_address.hh" #include "utils/pretty_printers.hh" #include "utils/error_injection.hh" #include "locator/host_id.hh" @@ -120,7 +119,7 @@ static void may_inject_error(const streaming::stream_blob_meta& meta, bool may_i future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, - gms::inet_address from, + locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source, @@ -310,7 +309,7 @@ future<> stream_blob_handler(replica::database& db, future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, - gms::inet_address from, + locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source) { @@ -374,7 +373,7 @@ namespace streaming { // Send files in the files list to the nodes in targets list over network // Returns number of bytes sent over network future -tablet_stream_files(netw::messaging_service& ms, std::list sources, std::vector targets, table_id table, file_stream_id ops_id, host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool inject_errors) { +tablet_stream_files(netw::messaging_service& ms, std::list sources, std::vector targets, table_id table, file_stream_id ops_id, service::frozen_topology_guard topo_guard, bool inject_errors) { size_t ops_total_size = 0; if (targets.empty()) { co_return ops_total_size; @@ -387,7 +386,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list sou ops_id, sources.size(), sources, targets); struct sink_and_source { - gms::inet_address node; + locator::host_id node; rpc::sink sink; rpc::source source; bool sink_closed = false; @@ -428,10 +427,9 @@ tablet_stream_files(netw::messaging_service& ms, std::list sou for (auto& x : targets) { const auto& node = x.node; meta.dst_shard_id = x.shard; - auto ip = co_await host2ip(node); - blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, ip, filename, targets); + blogger.debug("fstream[{}] Master creating sink and source for node={}/{}, file={}, targets={}", ops_id, node, node, filename, targets); auto [sink, source] = co_await ms.make_sink_and_source_for_stream_blob(meta, node); - ss.push_back(sink_and_source{ip, std::move(sink), std::move(source)}); + ss.push_back(sink_and_source{node, std::move(sink), std::move(source)}); } // This fiber sends data to peer node @@ -600,7 +598,7 @@ tablet_stream_files(netw::messaging_service& ms, std::list sou } -future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip) { +future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req) { stream_files_response resp; auto& table = db.find_column_family(req.table); auto sstables = co_await table.take_storage_snapshot(req.range); @@ -653,7 +651,7 @@ future tablet_stream_files_handler(replica::database& db, blogger.debug("stream_sstables[{}] Started sending sstable_nr={} files_nr={} files={} range={}", req.ops_id, sstables.size(), files.size(), files, req.range); auto ops_start_time = std::chrono::steady_clock::now(); - size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, std::move(host2ip), req.topo_guard); + size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), req.targets, req.table, req.ops_id, req.topo_guard); resp.stream_bytes = stream_bytes; auto duration = std::chrono::steady_clock::now() - ops_start_time; blogger.info("stream_sstables[{}] Finished sending sstable_nr={} files_nr={} files={} range={} stream_bytes={} stream_time={} stream_bw={}", diff --git a/streaming/stream_blob.hh b/streaming/stream_blob.hh index 4964d4dec4..d95814dc17 100644 --- a/streaming/stream_blob.hh +++ b/streaming/stream_blob.hh @@ -116,13 +116,13 @@ struct stream_blob_info { }; // The handler for the STREAM_BLOB verb. -seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, gms::inet_address from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source); +seastar::future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source); // Exposed mainly for testing future<> stream_blob_handler(replica::database& db, netw::messaging_service& ms, - gms::inet_address from, + locator::host_id from, streaming::stream_blob_meta meta, rpc::sink sink, rpc::source source, @@ -163,11 +163,9 @@ public: size_t stream_bytes = 0; }; -using host2ip_t = std::function (locator::host_id)>; - // The handler for the TABLET_STREAM_FILES verb. The receiver of this verb will // stream sstables files specified by the stream_files_request req. -future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req, host2ip_t host2ip); +future tablet_stream_files_handler(replica::database& db, netw::messaging_service& ms, streaming::stream_files_request req); // Ask the src node to stream sstables to dst node for table in the given token range using TABLET_STREAM_FILES verb. future tablet_stream_files(const file_stream_id& ops_id, replica::table& table, const dht::token_range& range, const locator::host_id& src, const locator::host_id& dst, seastar::shard_id dst_shard_id, netw::messaging_service& ms, abort_source& as, service::frozen_topology_guard topo_guard); @@ -178,7 +176,6 @@ future tablet_stream_files(netw::messaging_service& ms, std::vector targets, table_id table, file_stream_id ops_id, - host2ip_t host2ip, service::frozen_topology_guard topo_guard, bool may_inject_errors = false ); diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 26abd75e1a..f3ded0bf15 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -294,7 +294,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { } }); ms.register_stream_blob([this] (const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { - auto from = netw::messaging_service::get_source(cinfo).addr; + const auto& from = cinfo.retrieve_auxiliary("host_id"); auto sink = _ms.local().make_sink_for_stream_blob(source); (void)stream_blob_handler(_db.local(), _ms.local(), from, meta, sink, source).handle_exception([ms = _ms.local().shared_from_this()] (std::exception_ptr eptr) { sslog.warn("Failed to run stream blob handler: {}", eptr); diff --git a/test/boost/file_stream_test.cc b/test/boost/file_stream_test.cc index 3d5a27dcae..be165738f0 100644 --- a/test/boost/file_stream_test.cc +++ b/test/boost/file_stream_test.cc @@ -76,7 +76,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec if (!verb_register) { co_await smp::invoke_on_all([&] { return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { - auto from = netw::messaging_service::get_source(cinfo).addr; + const auto& from = cinfo.retrieve_auxiliary("host_id"); auto sink = global_ms.local().make_sink_for_stream_blob(source); (void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future { auto path = meta.filename + suffix; @@ -115,10 +115,7 @@ do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vec co_return make_file_input_stream(std::move(file), foptions); }; } - auto host2ip = [&global_db] (locator::host_id id) -> future { - co_return global_db.local().get_token_metadata().get_topology().my_address(); - }; - size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, host2ip, service::null_topology_guard, inject_error); + size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error); co_await mark_tablet_stream_done(ops_id); testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes); ret = true;