streaming: Get rid of the _connecting_ parameter

messaging_service will use private ip address automatically to connect a
peer node if possible. There is no need for the upper level like
streaming to worry about it. Drop it simplifies things a bit.
This commit is contained in:
Asias He
2015-12-25 13:25:08 +08:00
committed by Tomasz Grabiec
parent 8d31c27f7b
commit 89b79d44de
14 changed files with 81 additions and 103 deletions

View File

@@ -72,7 +72,7 @@ static hs::stream_state get_state(
si.peer = boost::lexical_cast<std::string>(info.peer);
si.session_index = info.session_index;
si.state = info.state;
si.connecting = boost::lexical_cast<std::string>(info.connecting);
si.connecting = si.peer;
set_summaries(info.receiving_summaries, si.receiving_summaries);
set_summaries(info.sending_summaries, si.sending_summaries);
set_files(info.receiving_files, si.receiving_files);

View File

@@ -241,12 +241,11 @@ future<streaming::stream_state> range_streamer::fetch_async() {
for (auto& x : fetch.second) {
auto& source = x.first;
auto& ranges = x.second;
auto preferred = net::get_local_messaging_service().get_preferred_ip(source);
/* Send messages to respective folks to stream data over to me */
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("{}ing from {} ranges {}", _description, source, ranges);
}
_stream_plan.request_ranges(source, preferred, keyspace, ranges);
_stream_plan.request_ranges(source, keyspace, ranges);
}
}

View File

@@ -470,24 +470,24 @@ future<unsigned> messaging_service::send_stream_init_message(shard_id id, stream
// PREPARE_MESSAGE
void messaging_service::register_prepare_message(std::function<future<streaming::messages::prepare_message> (streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func) {
inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func));
}
future<streaming::messages::prepare_message> messaging_service::send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) {
inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<streaming::messages::prepare_message>(this, messaging_verb::PREPARE_MESSAGE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
std::move(msg), plan_id, from, connecting, src_cpu_id, dst_cpu_id);
std::move(msg), plan_id, from, src_cpu_id, dst_cpu_id);
}
// PREPARE_DONE_MESSAGE
void messaging_service::register_prepare_done_message(std::function<future<> (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
void messaging_service::register_prepare_done_message(std::function<future<> (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::PREPARE_DONE_MESSAGE, std::move(func));
}
future<> messaging_service::send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
future<> messaging_service::send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::PREPARE_DONE_MESSAGE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, from, connecting, dst_cpu_id);
plan_id, from, dst_cpu_id);
}
// STREAM_MUTATION
@@ -501,23 +501,23 @@ future<> messaging_service::send_stream_mutation(shard_id id, UUID plan_id, froz
}
// STREAM_MUTATION_DONE
void messaging_service::register_stream_mutation_done(std::function<future<> (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
void messaging_service::register_stream_mutation_done(std::function<future<> (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::STREAM_MUTATION_DONE, std::move(func));
}
future<> messaging_service::send_stream_mutation_done(shard_id id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
future<> messaging_service::send_stream_mutation_done(shard_id id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::STREAM_MUTATION_DONE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, std::move(ranges), cf_id, from, connecting, dst_cpu_id);
plan_id, std::move(ranges), cf_id, from, dst_cpu_id);
}
// COMPLETE_MESSAGE
void messaging_service::register_complete_message(std::function<future<> (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func) {
void messaging_service::register_complete_message(std::function<future<> (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func) {
register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func));
}
future<> messaging_service::send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
future<> messaging_service::send_complete_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id) {
return send_message_timeout_and_retry<void>(this, messaging_verb::COMPLETE_MESSAGE, id,
streaming_timeout, streaming_nr_retry, streaming_wait_before_retry,
plan_id, from, connecting, dst_cpu_id);
plan_id, from, dst_cpu_id);
}
void messaging_service::register_echo(std::function<future<> ()>&& func) {

View File

@@ -407,23 +407,23 @@ public:
// Wrapper for PREPARE_MESSAGE verb
void register_prepare_message(std::function<future<streaming::messages::prepare_message> (streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func);
inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id)>&& func);
future<streaming::messages::prepare_message> send_prepare_message(shard_id id, streaming::messages::prepare_message msg, UUID plan_id,
inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id);
inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id);
// Wrapper for PREPARE_DONE_MESSAGE verb
void register_prepare_done_message(std::function<future<> (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
future<> send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id);
void register_prepare_done_message(std::function<future<> (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func);
future<> send_prepare_done_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id);
// Wrapper for STREAM_MUTATION verb
void register_stream_mutation(std::function<future<> (const rpc::client_info& cinfo, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation(shard_id id, UUID plan_id, frozen_mutation fm, unsigned dst_cpu_id);
void register_stream_mutation_done(std::function<future<> (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(shard_id id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id);
void register_stream_mutation_done(std::function<future<> (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id)>&& func);
future<> send_stream_mutation_done(shard_id id, UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id);
void register_complete_message(std::function<future<> (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id)>&& func);
future<> send_complete_message(shard_id id, UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id);
void register_complete_message(std::function<future<> (UUID plan_id, inet_address from, unsigned dst_cpu_id)>&& func);
future<> send_complete_message(shard_id id, UUID plan_id, inet_address from, unsigned dst_cpu_id);
// Wrapper for ECHO verb
void register_echo(std::function<future<> ()>&& func);

View File

@@ -229,8 +229,8 @@ static future<> repair_range(seastar::sharded<database>& db, sstring keyspace,
// request ranges from all of them and only later transfer ranges to
// all of them? Otherwise, we won't necessarily fully repair the
// other ndoes, just this one? What does Cassandra do here?
sp->transfer_ranges(peer, peer, keyspace, {range}, cfs);
sp->request_ranges(peer, peer, keyspace, {range}, cfs);
sp->transfer_ranges(peer, keyspace, {range}, cfs);
sp->request_ranges(peer, keyspace, {range}, cfs);
}
return sp->execute().discard_result().then([sp, id] {
logger.info("repair session #{} successful", id);

View File

@@ -2065,9 +2065,8 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
for (auto& m : maps) {
auto source = m.first;
auto ranges = m.second;
auto preferred = net::get_local_messaging_service().get_preferred_ip(source);
logger.debug("Requesting from {} ranges {}", source, ranges);
sp->request_ranges(source, preferred, keyspace_name, ranges);
sp->request_ranges(source, keyspace_name, ranges);
}
}
return sp->execute().then_wrapped([this, sp, notify_endpoint] (auto&& f) {
@@ -2185,10 +2184,8 @@ storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multim
for (auto& ranges_entry : ranges_per_endpoint) {
auto& ranges = ranges_entry.second;
auto new_endpoint = ranges_entry.first;
auto preferred = net::get_local_messaging_service().get_preferred_ip(new_endpoint);
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
sp->transfer_ranges(new_endpoint, preferred, keyspace_name, ranges);
sp->transfer_ranges(new_endpoint, keyspace_name, ranges);
}
}
return sp->execute().discard_result().then([sp] {
@@ -2225,7 +2222,6 @@ future<> storage_service::stream_hints() {
snitch->sort_by_proximity(get_broadcast_address(), candidates);
auto hints_destination_host = candidates.front();
auto preferred = net::get_local_messaging_service().get_preferred_ip(hints_destination_host);
// stream all hints -- range list will be a singleton of "the entire ring"
std::vector<range<token>> ranges = {range<token>::make_open_ended_both_sides()};
@@ -2234,7 +2230,7 @@ future<> storage_service::stream_hints() {
auto sp = make_lw_shared<streaming::stream_plan>("Hints");
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
auto keyspace = db::system_keyspace::NAME;
sp->transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families);
sp->transfer_ranges(hints_destination_host, keyspace, ranges, column_families);
return sp->execute().discard_result().then([sp] {
logger.info("stream_hints successful");
}).handle_exception([] (auto ep) {
@@ -2553,8 +2549,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
auto& address = x.first;
auto& ranges = x.second;
logger.debug("Will stream range {} of keyspace {} to endpoint {}", ranges , keyspace, address);
auto preferred = net::get_local_messaging_service().get_preferred_ip(address);
_stream_plan.transfer_ranges(address, preferred, keyspace, ranges);
_stream_plan.transfer_ranges(address, keyspace, ranges);
}
// stream requests
@@ -2569,8 +2564,7 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
auto& address = x.first;
auto& ranges = x.second;
logger.debug("Will request range {} of keyspace {} from endpoint {}", ranges, keyspace, address);
auto preferred = net::get_local_messaging_service().get_preferred_ip(address);
_stream_plan.request_ranges(address, preferred, keyspace, ranges);
_stream_plan.request_ranges(address, keyspace, ranges);
}
if (logger.is_enabled(logging::log_level::debug)) {
for (auto& x : work) {

View File

@@ -55,7 +55,6 @@ public:
using inet_address = gms::inet_address;
inet_address peer;
int session_index;
inet_address connecting;
/** Immutable collection of receiving summaries */
std::vector<stream_summary> receiving_summaries;
/** Immutable collection of sending summaries*/
@@ -67,12 +66,11 @@ public:
std::map<sstring, progress_info> sending_files;
session_info() = default;
session_info(inet_address peer_, int session_index_, inet_address connecting_,
session_info(inet_address peer_, int session_index_,
std::vector<stream_summary> receiving_summaries_,
std::vector<stream_summary> sending_summaries_,
stream_session_state state_)
: peer(peer_)
, connecting(connecting_)
, receiving_summaries(std::move(receiving_summaries_))
, sending_summaries(std::move(sending_summaries_))
, state(state_) {

View File

@@ -117,11 +117,11 @@ bool stream_coordinator::host_streaming_data::has_active_sessions() {
return false;
}
shared_ptr<stream_session> stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer, inet_address connecting) {
shared_ptr<stream_session> stream_coordinator::host_streaming_data::get_or_create_next_session(inet_address peer) {
// create
int size = _stream_sessions.size();
if (size < _connections_per_host) {
auto session = make_shared<stream_session>(peer, connecting, size, _keep_ss_table_level);
auto session = make_shared<stream_session>(peer, size, _keep_ss_table_level);
_stream_sessions.emplace(++_last_returned, session);
return _stream_sessions[_last_returned];
// get
@@ -142,10 +142,10 @@ std::vector<shared_ptr<stream_session>> stream_coordinator::host_streaming_data:
}
shared_ptr<stream_session> stream_coordinator::host_streaming_data::get_or_create_session_by_id(inet_address peer,
int id, inet_address connecting) {
int id) {
auto it = _stream_sessions.find(id);
if (it == _stream_sessions.end()) {
it = _stream_sessions.emplace(id, make_shared<stream_session>(peer, connecting, id, _keep_ss_table_level)).first;
it = _stream_sessions.emplace(id, make_shared<stream_session>(peer, id, _keep_ss_table_level)).first;
}
return it->second;
}

View File

@@ -90,12 +90,12 @@ public:
std::set<inet_address> get_peers();
public:
shared_ptr<stream_session> get_or_create_next_session(inet_address peer, inet_address connecting) {
return get_or_create_host_data(peer).get_or_create_next_session(peer, connecting);
shared_ptr<stream_session> get_or_create_next_session(inet_address peer) {
return get_or_create_host_data(peer).get_or_create_next_session(peer);
}
shared_ptr<stream_session> get_or_create_session_by_id(inet_address peer, int id, inet_address connecting) {
return get_or_create_host_data(peer).get_or_create_session_by_id(peer, id, connecting);
shared_ptr<stream_session> get_or_create_session_by_id(inet_address peer, int id) {
return get_or_create_host_data(peer).get_or_create_session_by_id(peer, id);
}
void update_progress(progress_info info) {
@@ -159,13 +159,13 @@ private:
bool has_active_sessions();
shared_ptr<stream_session> get_or_create_next_session(inet_address peer, inet_address connecting);
shared_ptr<stream_session> get_or_create_next_session(inet_address peer);
void connect_all_stream_sessions();
std::vector<shared_ptr<stream_session>> get_all_stream_sessions();
shared_ptr<stream_session> get_or_create_session_by_id(inet_address peer, int id, inet_address connecting);
shared_ptr<stream_session> get_or_create_session_by_id(inet_address peer, int id);
void update_progress(progress_info info);

View File

@@ -44,28 +44,24 @@ namespace streaming {
extern logging::logger sslog;
stream_plan& stream_plan::request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges) {
return request_ranges(from, connecting, keyspace, ranges, {});
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges) {
return request_ranges(from, keyspace, ranges, {});
}
stream_plan& stream_plan::request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
stream_plan& stream_plan::request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_next_session(from, connecting);
auto session = _coordinator->get_or_create_next_session(from);
session->add_stream_request(keyspace, ranges, std::move(column_families), _repaired_at);
return *this;
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges) {
return transfer_ranges(to, keyspace, ranges, {});
}
stream_plan& stream_plan::transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
return transfer_ranges(to, to, keyspace, ranges, column_families);
}
stream_plan& stream_plan::transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges) {
return transfer_ranges(to, connecting, keyspace, ranges, {});
}
stream_plan& stream_plan::transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families) {
_range_added = true;
auto session = _coordinator->get_or_create_next_session(to, connecting);
auto session = _coordinator->get_or_create_next_session(to);
session->add_transfer_ranges(keyspace, std::move(ranges), std::move(column_families), _flush_before_transfer, _repaired_at);
return *this;
}

View File

@@ -104,7 +104,7 @@ public:
* @param ranges ranges to fetch
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges);
/**
* Request data in {@code columnFamilies} under {@code keyspace} and {@code ranges} from specific node.
@@ -116,14 +116,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& request_ranges(inet_address from, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
*
* @see #transferRanges(java.net.InetAddress, java.net.InetAddress, String, java.util.Collection, String...)
*/
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
stream_plan& request_ranges(inet_address from, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
/**
* Add transfer task to send data of specific keyspace and ranges.
@@ -134,7 +127,7 @@ public:
* @param ranges ranges to send
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges);
/**
* Add transfer task to send data of specific {@code columnFamilies} under {@code keyspace} and {@code ranges}.
@@ -146,7 +139,7 @@ public:
* @param columnFamilies specific column families
* @return this object for chaining
*/
stream_plan& transfer_ranges(inet_address to, inet_address connecting, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
stream_plan& transfer_ranges(inet_address to, sstring keyspace, std::vector<query::range<token>> ranges, std::vector<sstring> column_families);
stream_plan& listeners(std::vector<stream_event_handler*> handlers);
#if 0

View File

@@ -86,20 +86,20 @@ void stream_session::init_messaging_service_handler() {
return make_ready_future<unsigned>(dst_cpu_id);
});
});
ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, inet_address connecting, unsigned src_cpu_id, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, from, connecting, src_cpu_id] () mutable {
ms().register_prepare_message([] (messages::prepare_message msg, UUID plan_id, inet_address from, unsigned src_cpu_id, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [msg = std::move(msg), plan_id, from, src_cpu_id] () mutable {
auto f = get_stream_result_future(plan_id);
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: from={}, connecting={}", plan_id, from, connecting);
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: from={}", plan_id, from);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
auto session = coordinator->get_or_create_next_session(from);
assert(session);
session->init(f);
session->dst_cpu_id = src_cpu_id;
session->start_keep_alive_timer();
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: get session peer={} connecting={} src_cpu_id={}, dst_cpu_id={}",
session->plan_id(), session->peer, session->connecting, session->src_cpu_id, session->dst_cpu_id);
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE: get session peer={} src_cpu_id={}, dst_cpu_id={}",
session->plan_id(), session->peer, session->src_cpu_id, session->dst_cpu_id);
return session->prepare(std::move(msg.requests), std::move(msg.summaries));
} else {
auto err = sprint("[Stream #%s] GOT PREPARE_MESSAGE: Can not find stream_manager", plan_id);
@@ -108,14 +108,14 @@ void stream_session::init_messaging_service_handler() {
}
});
});
ms().register_prepare_done_message([] (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, from, connecting] () mutable {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE: from={}, connecting={}", plan_id, from, connecting);
ms().register_prepare_done_message([] (UUID plan_id, inet_address from, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, from] () mutable {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE: from={}", plan_id, from);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
auto session = coordinator->get_or_create_next_session(from);
assert(session);
session->start_keep_alive_timer();
session->follower_start_sent();
@@ -138,7 +138,7 @@ void stream_session::init_messaging_service_handler() {
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
auto session = coordinator->get_or_create_next_session(from);
assert(session);
session->start_keep_alive_timer();
return service::get_storage_proxy().local().mutate_locally(fm);
@@ -149,14 +149,14 @@ void stream_session::init_messaging_service_handler() {
}
});
});
ms().register_stream_mutation_done([] (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from, connecting] () mutable {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE: cf_id={}, from={}, connecting={}", plan_id, cf_id, from, connecting);
ms().register_stream_mutation_done([] (UUID plan_id, std::vector<range<dht::token>> ranges, UUID cf_id, inet_address from, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] () mutable {
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE: cf_id={}, from={}", plan_id, cf_id, from);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
auto session = coordinator->get_or_create_next_session(from);
assert(session);
session->start_keep_alive_timer();
session->receive_task_completed(cf_id);
@@ -181,14 +181,14 @@ void stream_session::init_messaging_service_handler() {
});
});
#endif
ms().register_complete_message([] (UUID plan_id, inet_address from, inet_address connecting, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, from, connecting, dst_cpu_id] () mutable {
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE, from={}, connecting={}, dst_cpu_id={}", plan_id, from, connecting, dst_cpu_id);
ms().register_complete_message([] (UUID plan_id, inet_address from, unsigned dst_cpu_id) {
return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable {
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE, from={}, dst_cpu_id={}", plan_id, from, dst_cpu_id);
auto f = get_stream_result_future(plan_id);
if (f) {
auto coordinator = f->get_coordinator();
assert(coordinator);
auto session = coordinator->get_or_create_next_session(from, from);
auto session = coordinator->get_or_create_next_session(from);
assert(session);
session->start_keep_alive_timer();
session->complete();
@@ -220,9 +220,8 @@ distributed<database>* stream_session::_db;
stream_session::stream_session() = default;
stream_session::stream_session(inet_address peer_, inet_address connecting_, int index_, bool keep_ss_table_level_)
stream_session::stream_session(inet_address peer_, int index_, bool keep_ss_table_level_)
: peer(peer_)
, connecting(connecting_)
, _index(index_)
, _keep_ss_table_level(keep_ss_table_level_) {
//this.metrics = StreamingMetrics.get(connecting);
@@ -282,7 +281,7 @@ future<> stream_session::test(distributed<cql3::query_processor>& qp) {
auto ks = sstring("ks");
std::vector<query::range<token>> ranges = {query::range<token>::make_open_ended_both_sides()};
std::vector<sstring> cfs{tb};
sp.transfer_ranges(to, to, ks, ranges, cfs).request_ranges(to, to, ks, ranges, cfs).execute().then_wrapped([] (auto&& f) {
sp.transfer_ranges(to, ks, ranges, cfs).request_ranges(to, ks, ranges, cfs).execute().then_wrapped([] (auto&& f) {
try {
auto state = f.get0();
sslog.debug("plan_id={} description={} DONE", state.plan_id, state.description);
@@ -337,7 +336,7 @@ future<> stream_session::on_initialization_complete() {
auto from = utils::fb_utilities::get_broadcast_address();
sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id);
return ms().send_prepare_message(id, std::move(prepare), plan_id(), from,
this->connecting, this->src_cpu_id, this->dst_cpu_id).then_wrapped([this, id] (auto&& f) {
this->src_cpu_id, this->dst_cpu_id).then_wrapped([this, id] (auto&& f) {
try {
auto msg = f.get0();
this->start_keep_alive_timer();
@@ -354,7 +353,7 @@ future<> stream_session::on_initialization_complete() {
}).then([this, id, from] {
auto plan_id = this->plan_id();
sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id);
return ms().send_prepare_done_message(id, plan_id, from, this->connecting, this->dst_cpu_id).then([this] {
return ms().send_prepare_done_message(id, plan_id, from, this->dst_cpu_id).then([this] {
this->start_keep_alive_timer();
}).handle_exception([id, plan_id] (auto ep) {
sslog.error("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
@@ -503,7 +502,7 @@ session_info stream_session::get_session_info() {
for (auto& transfer : _transfers) {
transfer_summaries.emplace_back(transfer.second.get_summary());
}
return session_info(peer, _index, connecting, std::move(receiving_summaries), std::move(transfer_summaries), _state);
return session_info(peer, _index, std::move(receiving_summaries), std::move(transfer_summaries), _state);
}
void stream_session::receive_task_completed(UUID cf_id) {
@@ -525,7 +524,7 @@ void stream_session::send_complete_message() {
auto id = shard_id{this->peer, this->dst_cpu_id};
auto plan_id = this->plan_id();
sslog.debug("[Stream #{}] SEND COMPLETE_MESSAGE to {}", plan_id, id);
this->ms().send_complete_message(id, plan_id, from, this->connecting, this->dst_cpu_id).then([session = shared_from_this(), plan_id] {
this->ms().send_complete_message(id, plan_id, from, this->dst_cpu_id).then([session = shared_from_this(), plan_id] {
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply", plan_id);
}).handle_exception([plan_id] (auto ep) {
sslog.warn("[Stream #{}] ERROR COMPLETE_MESSAGE Reply: {}", plan_id, ep);
@@ -686,6 +685,7 @@ void stream_session::start() {
close_session(stream_session_state::COMPLETE);
return;
}
auto connecting = net::get_local_messaging_service().get_preferred_ip(peer);
if (peer == connecting) {
sslog.info("[Stream #{}] Starting streaming to {}", plan_id(), peer);
} else {

View File

@@ -162,8 +162,6 @@ public:
* Each {@code StreamSession} is identified by this InetAddress which is broadcast address of the node streaming.
*/
inet_address peer;
/** Actual connecting address. Can be the same as {@linkplain #peer}. */
inet_address connecting;
unsigned src_cpu_id;
unsigned dst_cpu_id;
private:
@@ -204,7 +202,7 @@ public:
* @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
stream_session(inet_address peer_, inet_address connecting_, int index_, bool keep_ss_table_level_);
stream_session(inet_address peer_, int index_, bool keep_ss_table_level_);
~stream_session();
UUID plan_id();

View File

@@ -152,7 +152,7 @@ void stream_transfer_task::complete(int sequence_number) {
auto from = utils::fb_utilities::get_broadcast_address();
auto id = shard_id{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, seq={}", plan_id, id, sequence_number);
session->ms().send_stream_mutation_done(id, plan_id, std::move(_ranges), this->cf_id, from, session->connecting, session->dst_cpu_id).then_wrapped([this, id, plan_id] (auto&& f) {
session->ms().send_stream_mutation_done(id, plan_id, std::move(_ranges), this->cf_id, from, session->dst_cpu_id).then_wrapped([this, id, plan_id] (auto&& f) {
try {
f.get();
session->start_keep_alive_timer();