From a3b4d4d3cfccb7c842d4b2ccc7e45ecc770c8b1e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Mon, 15 Nov 2021 18:10:00 +0300 Subject: [PATCH] stream_session: Use manager reference from result-future When the stream_session initializes it's being equipped with the shared-pointer on the stream_result_future very early. In all the places where stream_session needs the manager this pointer is alive and session get get manager from it. Signed-off-by: Pavel Emelyanov --- streaming/stream_session.cc | 14 +++++++------- streaming/stream_session.hh | 3 +++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3ebf84a0e6..200a2da471 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() { } auto id = msg_addr{this->peer, 0}; sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id); - return get_local_stream_manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) { + return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) { try { auto msg = f.get0(); sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer); @@ -257,7 +257,7 @@ future<> stream_session::on_initialization_complete() { }).then([this, id] { auto plan_id = this->plan_id(); sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id); - return get_local_stream_manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] { + return manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] { sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer); }).handle_exception([id, plan_id] (auto ep) { sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep); @@ -295,7 +295,7 @@ future stream_session::prepare(std::vector requ sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, requests.size(), summaries.size()); // prepare tasks set_state(stream_session_state::PREPARING); - auto& db = get_local_stream_manager().db(); + auto& db = manager().db(); for (auto& request : requests) { // always flush on stream request sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request); @@ -398,7 +398,7 @@ void stream_session::send_failed_complete_message() { auto session = shared_from_this(); bool failed = true; //FIXME: discarded future. - (void)get_local_stream_manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] { + (void)manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] { sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply from {}", plan_id, id.addr); }).handle_exception([session, id, plan_id] (auto ep) { sslog.debug("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep); @@ -441,7 +441,7 @@ void stream_session::start_streaming_files() { std::vector stream_session::get_column_family_stores(const sstring& keyspace, const std::vector& column_families) { // if columnfamilies are not specified, we add all cf under the keyspace std::vector stores; - auto& db = get_local_stream_manager().db(); + auto& db = manager().db(); if (column_families.empty()) { for (auto& x : db.get_column_families()) { column_family& cf = *(x.second); @@ -526,7 +526,7 @@ void stream_session::start() { close_session(stream_session_state::COMPLETE); return; } - auto connecting = get_local_stream_manager().ms().get_preferred_ip(peer); + auto connecting = manager().ms().get_preferred_ip(peer); if (peer == connecting) { sslog.debug("[Stream #{}] Starting streaming to {}", plan_id(), peer); } else { @@ -555,7 +555,7 @@ sstring stream_session::description() const { } future<> stream_session::update_progress() { - return get_local_stream_manager().get_progress_on_all_shards(plan_id(), peer).then([this] (auto sbytes) { + return manager().get_progress_on_all_shards(plan_id(), peer).then([this] (auto sbytes) { auto bytes_sent = sbytes.bytes_sent; if (bytes_sent > 0) { auto tx = progress_info(this->peer, "txnofile", progress_info::direction::OUT, bytes_sent, bytes_sent); diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 340dc155c8..6cc1431236 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -335,6 +335,9 @@ public: return _session_info; } + stream_manager& manager() noexcept { return _mgr; } + const stream_manager& manager() const noexcept { return _mgr; } + future<> update_progress(); void receive_task_completed(UUID cf_id);