From 89b79d44dec91053af26cc6b2a140487b89e6ea2 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 25 Dec 2015 13:25:08 +0800 Subject: [PATCH] streaming: Get rid of the _connecting_ parameter messaging_service will use private ip address automatically to connect a peer node if possible. There is no need for the upper level like streaming to worry about it. Drop it simplifies things a bit. --- api/stream_manager.cc | 2 +- dht/range_streamer.cc | 3 +- message/messaging_service.cc | 24 +++++++------- message/messaging_service.hh | 16 +++++----- repair/repair.cc | 4 +-- service/storage_service.cc | 16 +++------- streaming/session_info.hh | 4 +-- streaming/stream_coordinator.cc | 8 ++--- streaming/stream_coordinator.hh | 12 +++---- streaming/stream_plan.cc | 22 ++++++------- streaming/stream_plan.hh | 15 +++------ streaming/stream_session.cc | 52 +++++++++++++++---------------- streaming/stream_session.hh | 4 +-- streaming/stream_transfer_task.cc | 2 +- 14 files changed, 81 insertions(+), 103 deletions(-) diff --git a/api/stream_manager.cc b/api/stream_manager.cc index 7d67400436..96cc82f28c 100644 --- a/api/stream_manager.cc +++ b/api/stream_manager.cc @@ -72,7 +72,7 @@ static hs::stream_state get_state( si.peer = boost::lexical_cast(info.peer); si.session_index = info.session_index; si.state = info.state; - si.connecting = boost::lexical_cast(info.connecting); + si.connecting = si.peer; set_summaries(info.receiving_summaries, si.receiving_summaries); set_summaries(info.sending_summaries, si.sending_summaries); set_files(info.receiving_files, si.receiving_files); diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 2d3377f195..baa7b13772 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -241,12 +241,11 @@ future range_streamer::fetch_async() { for (auto& x : fetch.second) { auto& source = x.first; auto& ranges = x.second; - auto preferred = net::get_local_messaging_service().get_preferred_ip(source); /* Send messages to respective folks to stream data over to me */ if (logger.is_enabled(logging::log_level::debug)) { logger.debug("{}ing from {} ranges {}", _description, source, ranges); } - _stream_plan.request_ranges(source, preferred, keyspace, ranges); + _stream_plan.request_ranges(source, keyspace, ranges); } } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index f5b5c31c3f..6eed1d0a31 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -470,24 +470,24 @@ future messaging_service::send_stream_init_message(shard_id id, stream // PREPARE_MESSAGE void messaging_service::register_prepare_message(std::function (streaming::messages::prepare_message msg, UUID plan_id, - inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func) { + inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func) { register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func)); } future messaging_service::send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id, - inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) { + inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::PREPARE_MESSAGE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - std::move(msg), plan_id, from, connecting, src_cpu_id, dst_cpu_id); + std::move(msg), plan_id, from, src_cpu_id, dst_cpu_id); } // PREPARE_DONE_MESSAGE -void messaging_service::register_prepare_done_message(std::function (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) { +void messaging_service::register_prepare_done_message(std::function (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func) { register_handler(this, messaging_verb::PREPARE_DONE_MESSAGE, std::move(func)); } -future<> messaging_service::send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { +future<> messaging_service::send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::PREPARE_DONE_MESSAGE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - plan_id, from, connecting, dst_cpu_id); + plan_id, from, dst_cpu_id); } // STREAM_MUTATION @@ -501,23 +501,23 @@ future<> messaging_service::send_stream_mutation(shard_id id, UUID plan_id, froz } // STREAM_MUTATION_DONE -void messaging_service::register_stream_mutation_done(std::function (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) { +void messaging_service::register_stream_mutation_done(std::function (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id)>&& func) { register_handler(this, messaging_verb::STREAM_MUTATION_DONE, std::move(func)); } -future<> messaging_service::send_stream_mutation_done(shard_id id, UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { +future<> messaging_service::send_stream_mutation_done(shard_id id, UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::STREAM_MUTATION_DONE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - plan_id, std::move(ranges), cf_id, from, connecting, dst_cpu_id); + plan_id, std::move(ranges), cf_id, from, dst_cpu_id); } // COMPLETE_MESSAGE -void messaging_service::register_complete_message(std::function (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) { +void messaging_service::register_complete_message(std::function (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func) { register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func)); } -future<> messaging_service::send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { +future<> messaging_service::send_complete_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id) { return send_message_timeout_and_retry(this, messaging_verb::COMPLETE_MESSAGE, id, streaming_timeout, streaming_nr_retry, streaming_wait_before_retry, - plan_id, from, connecting, dst_cpu_id); + plan_id, from, dst_cpu_id); } void messaging_service::register_echo(std::function ()>&& func) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index d5b5c3577c..5852c48dcc 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -407,23 +407,23 @@ public: // Wrapper for PREPARE_MESSAGE verb void register_prepare_message(std::function (streaming::messages::prepare_message msg, UUID plan_id, - inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func); + inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func); future send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id, - inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id); + inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id); // Wrapper for PREPARE_DONE_MESSAGE verb - void register_prepare_done_message(std::function (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func); - future<> send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id); + void register_prepare_done_message(std::function (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func); + future<> send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id); // Wrapper for STREAM_MUTATION verb void register_stream_mutation(std::function (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func); future<> send_stream_mutation(shard_id id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id); - void register_stream_mutation_done(std::function (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation_done(shard_id id, UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id); + void register_stream_mutation_done(std::function (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id)>&& func); + future<> send_stream_mutation_done(shard_id id, UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id); - void register_complete_message(std::function (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func); - future<> send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id); + void register_complete_message(std::function (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func); + future<> send_complete_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id); // Wrapper for ECHO verb void register_echo(std::function ()>&& func); diff --git a/repair/repair.cc b/repair/repair.cc index cf001d557b..646152e822 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -229,8 +229,8 @@ static future<> repair_range(seastar::sharded& db, sstring keyspace, // request ranges from all of them and only later transfer ranges to // all of them? Otherwise, we won't necessarily fully repair the // other ndoes, just this one? What does Cassandra do here? - sp->transfer_ranges(peer, peer, keyspace, {range}, cfs); - sp->request_ranges(peer, peer, keyspace, {range}, cfs); + sp->transfer_ranges(peer, keyspace, {range}, cfs); + sp->request_ranges(peer, keyspace, {range}, cfs); } return sp->execute().discard_result().then([sp, id] { logger.info("repair session #{} successful", id); diff --git a/service/storage_service.cc b/service/storage_service.cc index fba56f318f..ad8f04a068 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2065,9 +2065,8 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr for (auto& m : maps) { auto source = m.first; auto ranges = m.second; - auto preferred = net::get_local_messaging_service().get_preferred_ip(source); logger.debug("Requesting from {} ranges {}", source, ranges); - sp->request_ranges(source, preferred, keyspace_name, ranges); + sp->request_ranges(source, keyspace_name, ranges); } } return sp->execute().then_wrapped([this, sp, notify_endpoint] (auto&& f) { @@ -2185,10 +2184,8 @@ storage_service::stream_ranges(std::unordered_maptransfer_ranges(new_endpoint, preferred, keyspace_name, ranges); + sp->transfer_ranges(new_endpoint, keyspace_name, ranges); } } return sp->execute().discard_result().then([sp] { @@ -2225,7 +2222,6 @@ future<> storage_service::stream_hints() { snitch->sort_by_proximity(get_broadcast_address(), candidates); auto hints_destination_host = candidates.front(); - auto preferred = net::get_local_messaging_service().get_preferred_ip(hints_destination_host); // stream all hints -- range list will be a singleton of "the entire ring" std::vector> ranges = {range::make_open_ended_both_sides()}; @@ -2234,7 +2230,7 @@ future<> storage_service::stream_hints() { auto sp = make_lw_shared("Hints"); std::vector column_families = { db::system_keyspace::HINTS }; auto keyspace = db::system_keyspace::NAME; - sp->transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families); + sp->transfer_ranges(hints_destination_host, keyspace, ranges, column_families); return sp->execute().discard_result().then([sp] { logger.info("stream_hints successful"); }).handle_exception([] (auto ep) { @@ -2553,8 +2549,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ auto& address = x.first; auto& ranges = x.second; logger.debug("Will stream range {} of keyspace {} to endpoint {}", ranges , keyspace, address); - auto preferred = net::get_local_messaging_service().get_preferred_ip(address); - _stream_plan.transfer_ranges(address, preferred, keyspace, ranges); + _stream_plan.transfer_ranges(address, keyspace, ranges); } // stream requests @@ -2569,8 +2564,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_ auto& address = x.first; auto& ranges = x.second; logger.debug("Will request range {} of keyspace {} from endpoint {}", ranges, keyspace, address); - auto preferred = net::get_local_messaging_service().get_preferred_ip(address); - _stream_plan.request_ranges(address, preferred, keyspace, ranges); + _stream_plan.request_ranges(address, keyspace, ranges); } if (logger.is_enabled(logging::log_level::debug)) { for (auto& x : work) { diff --git a/streaming/session_info.hh b/streaming/session_info.hh index 01063b6a99..20cd5eaa06 100644 --- a/streaming/session_info.hh +++ b/streaming/session_info.hh @@ -55,7 +55,6 @@ public: using inet_address = gms::inet_address; inet_address peer; int session_index; - inet_address connecting; /** Immutable collection of receiving summaries */ std::vector receiving_summaries; /** Immutable collection of sending summaries*/ @@ -67,12 +66,11 @@ public: std::map sending_files; session_info() = default; - session_info(inet_address peer_, int session_index_, inet_address connecting_, + session_info(inet_address peer_, int session_index_, std::vector receiving_summaries_, std::vector sending_summaries_, stream_session_state state_) : peer(peer_) - , connecting(connecting_) , receiving_summaries(std::move(receiving_summaries_)) , sending_summaries(std::move(sending_summaries_)) , state(state_) { diff --git a/streaming/stream_coordinator.cc b/streaming/stream_coordinator.cc index 09ffc7a00b..58315064d1 100644 --- a/streaming/stream_coordinator.cc +++ b/streaming/stream_coordinator.cc @@ -117,11 +117,11 @@ bool stream_coordinator::host_streaming_data::has_active_sessions() { return false; } -shared_ptr stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer, inet_address connecting) { +shared_ptr stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer) { // create int size = _stream_sessions.size(); if (size < _connections_per_host) { - auto session = make_shared(peer, connecting, size, _keep_ss_table_level); + auto session = make_shared(peer, size, _keep_ss_table_level); _stream_sessions.emplace(++_last_returned, session); return _stream_sessions[_last_returned]; // get @@ -142,10 +142,10 @@ std::vector> stream_coordinator::host_streaming_data: } shared_ptr stream_coordinator::host_streaming_data::get_or_create_session_by_id(inet_address peer, - int id, inet_address connecting) { + int id) { auto it = _stream_sessions.find(id); if (it == _stream_sessions.end()) { - it = _stream_sessions.emplace(id, make_shared(peer, connecting, id, _keep_ss_table_level)).first; + it = _stream_sessions.emplace(id, make_shared(peer, id, _keep_ss_table_level)).first; } return it->second; } diff --git a/streaming/stream_coordinator.hh b/streaming/stream_coordinator.hh index 6988faa397..1bb91c752e 100644 --- a/streaming/stream_coordinator.hh +++ b/streaming/stream_coordinator.hh @@ -90,12 +90,12 @@ public: std::set get_peers(); public: - shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting) { - return get_or_create_host_data(peer).get_or_create_next_session(peer, connecting); + shared_ptr get_or_create_next_session(inet_address peer) { + return get_or_create_host_data(peer).get_or_create_next_session(peer); } - shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting) { - return get_or_create_host_data(peer).get_or_create_session_by_id(peer, id, connecting); + shared_ptr get_or_create_session_by_id(inet_address peer, int id) { + return get_or_create_host_data(peer).get_or_create_session_by_id(peer, id); } void update_progress(progress_info info) { @@ -159,13 +159,13 @@ private: bool has_active_sessions(); - shared_ptr get_or_create_next_session(inet_address peer, inet_address connecting); + shared_ptr get_or_create_next_session(inet_address peer); void connect_all_stream_sessions(); std::vector> get_all_stream_sessions(); - shared_ptr get_or_create_session_by_id(inet_address peer, int id, inet_address connecting); + shared_ptr get_or_create_session_by_id(inet_address peer, int id); void update_progress(progress_info info); diff --git a/streaming/stream_plan.cc b/streaming/stream_plan.cc index 38c46cec02..327b871145 100644 --- a/streaming/stream_plan.cc +++ b/streaming/stream_plan.cc @@ -44,28 +44,24 @@ namespace streaming { extern logging::logger sslog; -stream_plan& stream_plan::request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector> ranges) { - return request_ranges(from, connecting, keyspace, ranges, {}); +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges) { + return request_ranges(from, keyspace, ranges, {}); } -stream_plan& stream_plan::request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector> ranges, std::vector column_families) { +stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families) { _range_added = true; - auto session = _coordinator->get_or_create_next_session(from, connecting); + auto session = _coordinator->get_or_create_next_session(from); session->add_stream_request(keyspace, ranges, std::move(column_families), _repaired_at); return *this; } +stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges) { + return transfer_ranges(to, keyspace, ranges, {}); +} + stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families) { - return transfer_ranges(to, to, keyspace, ranges, column_families); -} - -stream_plan& stream_plan::transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector> ranges) { - return transfer_ranges(to, connecting, keyspace, ranges, {}); -} - -stream_plan& stream_plan::transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector> ranges, std::vector column_families) { _range_added = true; - auto session = _coordinator->get_or_create_next_session(to, connecting); + auto session = _coordinator->get_or_create_next_session(to); session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families), _flush_before_transfer, _repaired_at); return *this; } diff --git a/streaming/stream_plan.hh b/streaming/stream_plan.hh index 1a36e44ea2..ceee701d2e 100644 --- a/streaming/stream_plan.hh +++ b/streaming/stream_plan.hh @@ -104,7 +104,7 @@ public: * @param ranges ranges to fetch * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector> ranges); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges); /** * Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node. @@ -116,14 +116,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector> ranges, std::vector column_families); - - /** - * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. - * - * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...) - */ - stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector> ranges, std::vector column_families); /** * Add transfer task to send data of specific keyspace and ranges. @@ -134,7 +127,7 @@ public: * @param ranges ranges to send * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector> ranges); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges); /** * Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}. @@ -146,7 +139,7 @@ public: * @param columnFamilies specific column families * @return this object for chaining */ - stream_plan& transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector> ranges, std::vector column_families); + stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector> ranges, std::vector column_families); stream_plan& listeners(std::vector handlers); #if 0 diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4bc7c12475..2181e18e92 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -86,20 +86,20 @@ void stream_session::init_messaging_service_handler() { return make_ready_future(dst_cpu_id); }); }); - ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) { - return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, from, connecting, src_cpu_id] () mutable { + ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id) { + return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, from, src_cpu_id] () mutable { auto f = get_stream_result_future(plan_id); - sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: from={}, connecting={}", plan_id, from, connecting); + sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: from={}", plan_id, from); if (f) { auto coordinator = f->get_coordinator(); assert(coordinator); - auto session = coordinator->get_or_create_next_session(from, from); + auto session = coordinator->get_or_create_next_session(from); assert(session); session->init(f); session->dst_cpu_id = src_cpu_id; session->start_keep_alive_timer(); - sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: get session peer={} connecting={} src_cpu_id={}, dst_cpu_id={}", - session->plan_id(), session->peer, session->connecting, session->src_cpu_id, session->dst_cpu_id); + sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: get session peer={} src_cpu_id={}, dst_cpu_id={}", + session->plan_id(), session->peer, session->src_cpu_id, session->dst_cpu_id); return session->prepare(std::move(msg.requests), std::move(msg.summaries)); } else { auto err = sprint("[Stream #%s] GOT PREPARE_MESSAGE: Can not find stream_manager", plan_id); @@ -108,14 +108,14 @@ void stream_session::init_messaging_service_handler() { } }); }); - ms().register_prepare_done_message([] (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { - return smp::submit_to(dst_cpu_id, [plan_id, from, connecting] () mutable { - sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE: from={}, connecting={}", plan_id, from, connecting); + ms().register_prepare_done_message([] (UUID plan_id, inet_address from, unsigned dst_cpu_id) { + return smp::submit_to(dst_cpu_id, [plan_id, from] () mutable { + sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE: from={}", plan_id, from); auto f = get_stream_result_future(plan_id); if (f) { auto coordinator = f->get_coordinator(); assert(coordinator); - auto session = coordinator->get_or_create_next_session(from, from); + auto session = coordinator->get_or_create_next_session(from); assert(session); session->start_keep_alive_timer(); session->follower_start_sent(); @@ -138,7 +138,7 @@ void stream_session::init_messaging_service_handler() { if (f) { auto coordinator = f->get_coordinator(); assert(coordinator); - auto session = coordinator->get_or_create_next_session(from, from); + auto session = coordinator->get_or_create_next_session(from); assert(session); session->start_keep_alive_timer(); return service::get_storage_proxy().local().mutate_locally(fm); @@ -149,14 +149,14 @@ void stream_session::init_messaging_service_handler() { } }); }); - ms().register_stream_mutation_done([] (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { - return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from, connecting] () mutable { - sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE: cf_id={}, from={}, connecting={}", plan_id, cf_id, from, connecting); + ms().register_stream_mutation_done([] (UUID plan_id, std::vector> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id) { + return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable { + sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE: cf_id={}, from={}", plan_id, cf_id, from); auto f = get_stream_result_future(plan_id); if (f) { auto coordinator = f->get_coordinator(); assert(coordinator); - auto session = coordinator->get_or_create_next_session(from, from); + auto session = coordinator->get_or_create_next_session(from); assert(session); session->start_keep_alive_timer(); session->receive_task_completed(cf_id); @@ -181,14 +181,14 @@ void stream_session::init_messaging_service_handler() { }); }); #endif - ms().register_complete_message([] (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) { - return smp::submit_to(dst_cpu_id, [plan_id, from, connecting, dst_cpu_id] () mutable { - sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE, from={}, connecting={}, dst_cpu_id={}", plan_id, from, connecting, dst_cpu_id); + ms().register_complete_message([] (UUID plan_id, inet_address from, unsigned dst_cpu_id) { + return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable { + sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE, from={}, dst_cpu_id={}", plan_id, from, dst_cpu_id); auto f = get_stream_result_future(plan_id); if (f) { auto coordinator = f->get_coordinator(); assert(coordinator); - auto session = coordinator->get_or_create_next_session(from, from); + auto session = coordinator->get_or_create_next_session(from); assert(session); session->start_keep_alive_timer(); session->complete(); @@ -220,9 +220,8 @@ distributed* stream_session::_db; stream_session::stream_session() = default; -stream_session::stream_session(inet_address peer_, inet_address connecting_, int index_, bool keep_ss_table_level_) +stream_session::stream_session(inet_address peer_, int index_, bool keep_ss_table_level_) : peer(peer_) - , connecting(connecting_) , _index(index_) , _keep_ss_table_level(keep_ss_table_level_) { //this.metrics = StreamingMetrics.get(connecting); @@ -282,7 +281,7 @@ future<> stream_session::test(distributed& qp) { auto ks = sstring("ks"); std::vector> ranges = {query::range::make_open_ended_both_sides()}; std::vector cfs{tb}; - sp.transfer_ranges(to, to, ks, ranges, cfs).request_ranges(to, to, ks, ranges, cfs).execute().then_wrapped([] (auto&& f) { + sp.transfer_ranges(to, ks, ranges, cfs).request_ranges(to, ks, ranges, cfs).execute().then_wrapped([] (auto&& f) { try { auto state = f.get0(); sslog.debug("plan_id={} description={} DONE", state.plan_id, state.description); @@ -337,7 +336,7 @@ future<> stream_session::on_initialization_complete() { auto from = utils::fb_utilities::get_broadcast_address(); sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id); return ms().send_prepare_message(id, std::move(prepare), plan_id(), from, - this->connecting, this->src_cpu_id, this->dst_cpu_id).then_wrapped([this, id] (auto&& f) { + this->src_cpu_id, this->dst_cpu_id).then_wrapped([this, id] (auto&& f) { try { auto msg = f.get0(); this->start_keep_alive_timer(); @@ -354,7 +353,7 @@ future<> stream_session::on_initialization_complete() { }).then([this, id, from] { auto plan_id = this->plan_id(); sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id); - return ms().send_prepare_done_message(id, plan_id, from, this->connecting, this->dst_cpu_id).then([this] { + return ms().send_prepare_done_message(id, plan_id, from, this->dst_cpu_id).then([this] { this->start_keep_alive_timer(); }).handle_exception([id, plan_id] (auto ep) { sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep); @@ -503,7 +502,7 @@ session_info stream_session::get_session_info() { for (auto& transfer : _transfers) { transfer_summaries.emplace_back(transfer.second.get_summary()); } - return session_info(peer, _index, connecting, std::move(receiving_summaries), std::move(transfer_summaries), _state); + return session_info(peer, _index, std::move(receiving_summaries), std::move(transfer_summaries), _state); } void stream_session::receive_task_completed(UUID cf_id) { @@ -525,7 +524,7 @@ void stream_session::send_complete_message() { auto id = shard_id{this->peer, this->dst_cpu_id}; auto plan_id = this->plan_id(); sslog.debug("[Stream #{}] SEND COMPLETE_MESSAGE to {}", plan_id, id); - this->ms().send_complete_message(id, plan_id, from, this->connecting, this->dst_cpu_id).then([session = shared_from_this(), plan_id] { + this->ms().send_complete_message(id, plan_id, from, this->dst_cpu_id).then([session = shared_from_this(), plan_id] { sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply", plan_id); }).handle_exception([plan_id] (auto ep) { sslog.warn("[Stream #{}] ERROR COMPLETE_MESSAGE Reply: {}", plan_id, ep); @@ -686,6 +685,7 @@ void stream_session::start() { close_session(stream_session_state::COMPLETE); return; } + auto connecting = net::get_local_messaging_service().get_preferred_ip(peer); if (peer == connecting) { sslog.info("[Stream #{}] Starting streaming to {}", plan_id(), peer); } else { diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 2cdb651b8f..bc405e161e 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -162,8 +162,6 @@ public: * Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming. */ inet_address peer; - /** Actual connecting address. Can be the same as {@linkplain #peer}. */ - inet_address connecting; unsigned src_cpu_id; unsigned dst_cpu_id; private: @@ -204,7 +202,7 @@ public: * @param connecting Actual connecting address * @param factory is used for establishing connection */ - stream_session(inet_address peer_, inet_address connecting_, int index_, bool keep_ss_table_level_); + stream_session(inet_address peer_, int index_, bool keep_ss_table_level_); ~stream_session(); UUID plan_id(); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 7b92314456..f16c728f29 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -152,7 +152,7 @@ void stream_transfer_task::complete(int sequence_number) { auto from = utils::fb_utilities::get_broadcast_address(); auto id = shard_id{session->peer, session->dst_cpu_id}; sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, seq={}", plan_id, id, sequence_number); - session->ms().send_stream_mutation_done(id, plan_id, std::move(_ranges), this->cf_id, from, session->connecting, session->dst_cpu_id).then_wrapped([this, id, plan_id] (auto&& f) { + session->ms().send_stream_mutation_done(id, plan_id, std::move(_ranges), this->cf_id, from, session->dst_cpu_id).then_wrapped([this, id, plan_id] (auto&& f) { try { f.get(); session->start_keep_alive_timer();