From 208b7fa7ba81fb1c101dfe913d7003534422ae48 Mon Sep 17 00:00:00 2001 From: Asias He Date: Mon, 21 Mar 2016 14:09:24 +0800 Subject: [PATCH] streaming: Simplify session completion logic Both the initiator and follower of a stream session knows how many transfer task and receive task the stream session contains in the preparation phase. They use the _transfers and _receivers map to track the tasks, like below: std::map _transfers; std::map _receivers; A stream_transfer_task will send STREAM_MUTATION verb to transfer data with frozen_mutation, when all the STREAM_MUTATIONs are sent, it will send STREAM_MUTATION_DONE to tell the peer the stream_transfer_task is completed and remove the stream_transfer_task from _transfers map. The peer will remove the corresponding stream_receive_task in _receivers. We do not really need the COMPLETE_MESSAGE verb to notify the peer we have completed sending. It makes the session completion logic much simpler and cleaner if we do not depend on COMPLETE_MESSAGE verb. However, to be compatible with older version, we always send a COMPLETE_MESSAGE message and do nothing in the COMPLETE_MESSAGE handler and replies a ready future even if the stream_session is closed already. This way, node with older version will get a COMPLETE_MESSAGE message and manage to send a COMPLETE_MESSAGE message to new node as before. Message-Id: <1458540564-34277-2-git-send-email-asias@scylladb.com> --- streaming/stream_session.cc | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 3a46388c32..ee1f831290 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -174,10 +174,9 @@ void stream_session::init_messaging_service_handler() { }); ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); - return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable { - auto session = get_session(plan_id, from, "COMPLETE_MESSAGE"); - session->complete(); - }); + // Be compatible with old version. Do nothing but return a ready future. + sslog.debug("[Stream #{}] COMPLETE_MESSAGE from {} dst_cpu_id={}", plan_id, from, dst_cpu_id); + return make_ready_future<>(); }); } @@ -304,17 +303,6 @@ void stream_session::follower_start_sent() { this->start_streaming_files(); } -void stream_session::complete() { - if (_state == stream_session_state::WAIT_COMPLETE) { - send_complete_message(); - sslog.debug("[Stream #{}] complete: WAIT_COMPLETE -> COMPLETE: session={}", plan_id(), this); - close_session(stream_session_state::COMPLETE); - } else { - sslog.debug("[Stream #{}] complete: {} -> WAIT_COMPLETE: session={}", plan_id(), _state, this); - set_state(stream_session_state::WAIT_COMPLETE); - } -} - void stream_session::session_failed() { close_session(stream_session_state::FAILED); } @@ -366,16 +354,9 @@ void stream_session::send_complete_message() { bool stream_session::maybe_completed() { bool completed = _receivers.empty() && _transfers.empty(); if (completed) { - if (_state == stream_session_state::WAIT_COMPLETE) { - send_complete_message(); - sslog.debug("[Stream #{}] maybe_completed: WAIT_COMPLETE -> COMPLETE: session={}, peer={}", plan_id(), this, peer); - close_session(stream_session_state::COMPLETE); - } else { - // notify peer that this session is completed - sslog.debug("[Stream #{}] maybe_completed: {} -> WAIT_COMPLETE: session={}, peer={}", plan_id(), _state, this, peer); - set_state(stream_session_state::WAIT_COMPLETE); - send_complete_message(); - } + send_complete_message(); + sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, this, peer); + close_session(stream_session_state::COMPLETE); } return completed; }