From eace5fc6e8a0c81ee262fef00a5ce6b10ab2f391 Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 25 Jul 2017 15:20:07 +0800 Subject: [PATCH] streaming: Introduce received_failed_complete_message It is the handler for the failed complete message. Add a flag to remember if we received a such message from peer, if so, do not send back the failed complete message back to the peer when running close_session with failed status. --- streaming/stream_session.cc | 18 ++++++++++++++---- streaming/stream_session.hh | 3 +++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4f23d7b570..d5f1196084 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -181,8 +181,8 @@ void stream_session::init_messaging_service_handler() { if (failed && *failed) { return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () { auto session = get_session(plan_id, from, "COMPLETE_MESSAGE"); - sslog.warn("[Stream #{}] COMPLETE_MESSAGE with error flag from {} dst_cpu_id={}", plan_id, from, dst_cpu_id); - session->on_error(); + sslog.debug("[Stream #{}] COMPLETE_MESSAGE with error flag from {} dst_cpu_id={}", plan_id, from, dst_cpu_id); + session->received_failed_complete_message(); return make_ready_future<>(); }); } else { @@ -257,6 +257,12 @@ future<> stream_session::on_initialization_complete() { }); } +void stream_session::received_failed_complete_message() { + sslog.info("[Stream #{}] Received failed complete message, peer={}", plan_id(), peer); + _received_failed_complete_message = true; + close_session(stream_session_state::FAILED); +} + void stream_session::on_error() { sslog.warn("[Stream #{}] Streaming error occurred", plan_id()); // fail session @@ -345,20 +351,24 @@ void stream_session::transfer_task_completed(UUID cf_id) { } void stream_session::send_failed_complete_message() { + auto plan_id = this->plan_id(); + if (_received_failed_complete_message) { + sslog.debug("[Stream #{}] Skip sending failed message back to peer", plan_id); + return; + } if (!_complete_sent) { _complete_sent = true; } else { return; } auto id = msg_addr{this->peer, this->dst_cpu_id}; - auto plan_id = this->plan_id(); sslog.debug("[Stream #{}] SEND COMPLETE_MESSAGE to {}", plan_id, id); auto session = shared_from_this(); bool failed = true; this->ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] { sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply from {}", plan_id, id.addr); }).handle_exception([session, id, plan_id] (auto ep) { - sslog.warn("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep); + sslog.debug("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep); }); } diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index 869066190a..da5bdfafcd 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -174,6 +174,7 @@ private: stream_session_state _state = stream_session_state::INITIALIZED; bool _complete_sent = false; + bool _received_failed_complete_message = false; // If the session is idle for 300 minutes, close the session std::chrono::seconds _keep_alive_timeout{60 * 300}; @@ -299,6 +300,8 @@ public: */ void on_error(); + void received_failed_complete_message(); + /** * Prepare this session for sending/receiving files. */