stream_session: Use manager reference from result-future

When the stream_session initializes it's being equipped with
the shared-pointer on the stream_result_future very early. In
all the places where stream_session needs the manager this
pointer is alive and session get get manager from it.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2021-11-15 18:10:00 +03:00
parent 56f5327450
commit a3b4d4d3cf
2 changed files with 10 additions and 7 deletions

View File

@@ -238,7 +238,7 @@ future<> stream_session::on_initialization_complete() {
}
auto id = msg_addr{this->peer, 0};
sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id);
return get_local_stream_manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) {
return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason()).then_wrapped([this, id] (auto&& f) {
try {
auto msg = f.get0();
sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer);
@@ -257,7 +257,7 @@ future<> stream_session::on_initialization_complete() {
}).then([this, id] {
auto plan_id = this->plan_id();
sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id);
return get_local_stream_manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
return manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] {
sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer);
}).handle_exception([id, plan_id] (auto ep) {
sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep);
@@ -295,7 +295,7 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
sslog.debug("[Stream #{}] prepare requests nr={}, summaries nr={}", plan_id, requests.size(), summaries.size());
// prepare tasks
set_state(stream_session_state::PREPARING);
auto& db = get_local_stream_manager().db();
auto& db = manager().db();
for (auto& request : requests) {
// always flush on stream request
sslog.debug("[Stream #{}] prepare stream_request={}", plan_id, request);
@@ -398,7 +398,7 @@ void stream_session::send_failed_complete_message() {
auto session = shared_from_this();
bool failed = true;
//FIXME: discarded future.
(void)get_local_stream_manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] {
(void)manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] {
sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply from {}", plan_id, id.addr);
}).handle_exception([session, id, plan_id] (auto ep) {
sslog.debug("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep);
@@ -441,7 +441,7 @@ void stream_session::start_streaming_files() {
std::vector<column_family*> stream_session::get_column_family_stores(const sstring& keyspace, const std::vector<sstring>& column_families) {
// if columnfamilies are not specified, we add all cf under the keyspace
std::vector<column_family*> stores;
auto& db = get_local_stream_manager().db();
auto& db = manager().db();
if (column_families.empty()) {
for (auto& x : db.get_column_families()) {
column_family& cf = *(x.second);
@@ -526,7 +526,7 @@ void stream_session::start() {
close_session(stream_session_state::COMPLETE);
return;
}
auto connecting = get_local_stream_manager().ms().get_preferred_ip(peer);
auto connecting = manager().ms().get_preferred_ip(peer);
if (peer == connecting) {
sslog.debug("[Stream #{}] Starting streaming to {}", plan_id(), peer);
} else {
@@ -555,7 +555,7 @@ sstring stream_session::description() const {
}
future<> stream_session::update_progress() {
return get_local_stream_manager().get_progress_on_all_shards(plan_id(), peer).then([this] (auto sbytes) {
return manager().get_progress_on_all_shards(plan_id(), peer).then([this] (auto sbytes) {
auto bytes_sent = sbytes.bytes_sent;
if (bytes_sent > 0) {
auto tx = progress_info(this->peer, "txnofile", progress_info::direction::OUT, bytes_sent, bytes_sent);

View File

@@ -335,6 +335,9 @@ public:
return _session_info;
}
stream_manager& manager() noexcept { return _mgr; }
const stream_manager& manager() const noexcept { return _mgr; }
future<> update_progress();
void receive_task_completed(UUID cf_id);