diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3a46388c32..ee1f831290 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -174,10 +174,9 @@ void stream_session::init_messaging_service_handler() { }); ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); - return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable { - auto session = get_session(plan_id, from, "COMPLETE_MESSAGE"); - session->complete(); - }); + // Be compatible with old version. Do nothing but return a ready future. + sslog.debug("[Stream #{}] COMPLETE_MESSAGE from {} dst_cpu_id={}", plan_id, from, dst_cpu_id); + return make_ready_future<>(); }); } @@ -304,17 +303,6 @@ void stream_session::follower_start_sent() { this->start_streaming_files(); } -void stream_session::complete() { - if (_state == stream_session_state::WAIT_COMPLETE) { - send_complete_message(); - sslog.debug("[Stream #{}] complete: WAIT_COMPLETE -> COMPLETE: session={}", plan_id(), this); - close_session(stream_session_state::COMPLETE); - } else { - sslog.debug("[Stream #{}] complete: {} -> WAIT_COMPLETE: session={}", plan_id(), _state, this); - set_state(stream_session_state::WAIT_COMPLETE); - } -} - void stream_session::session_failed() { close_session(stream_session_state::FAILED); } @@ -366,16 +354,9 @@ void stream_session::send_complete_message() { bool stream_session::maybe_completed() { bool completed = _receivers.empty() && _transfers.empty(); if (completed) { - if (_state == stream_session_state::WAIT_COMPLETE) { - send_complete_message(); - sslog.debug("[Stream #{}] maybe_completed: WAIT_COMPLETE -> COMPLETE: session={}, peer={}", plan_id(), this, peer); - close_session(stream_session_state::COMPLETE); - } else { - // notify peer that this session is completed - sslog.debug("[Stream #{}] maybe_completed: {} -> WAIT_COMPLETE: session={}, peer={}", plan_id(), _state, this, peer); - set_state(stream_session_state::WAIT_COMPLETE); - send_complete_message(); - } + send_complete_message(); + sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, this, peer); + close_session(stream_session_state::COMPLETE); } return completed; }