streaming: Simplify session completion logic

Both the initiator and follower of a stream session knows how many
transfer task and receive task the stream session contains in the
preparation phase. They use the _transfers and _receivers map to track
the tasks, like below:

       std::map<UUID, stream_transfer_task> _transfers;
       std::map<UUID, stream_receive_task> _receivers;

A stream_transfer_task will send STREAM_MUTATION verb to transfer data
with frozen_mutation, when all the STREAM_MUTATIONs are sent, it will
send STREAM_MUTATION_DONE to tell the peer the stream_transfer_task is
completed and remove the stream_transfer_task from _transfers map.  The
peer will remove the corresponding stream_receive_task in _receivers.

We do not really need the COMPLETE_MESSAGE verb to notify the peer we
have completed sending. It makes the session completion logic much
simpler and cleaner if we do not depend on COMPLETE_MESSAGE verb.

However, to be compatible with older version, we always send a
COMPLETE_MESSAGE message and do nothing in the COMPLETE_MESSAGE handler
and replies a ready future even if the stream_session is closed already.
This way, node with older version will get a COMPLETE_MESSAGE message
and manage to send a COMPLETE_MESSAGE message to new node as before.

Message-Id: <1458540564-34277-2-git-send-email-asias@scylladb.com>
This commit is contained in:
Asias He
2016-03-21 14:09:24 +08:00
committed by Avi Kivity
parent 4892a6ded9
commit 208b7fa7ba
Notes: Avi Kivity 2016-03-26 22:06:40 +03:00
backport: no

View File

@@ -174,10 +174,9 @@ void stream_session::init_messaging_service_handler() {
});
ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) {
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable {
auto session = get_session(plan_id, from, "COMPLETE_MESSAGE");
session->complete();
});
// Be compatible with old version. Do nothing but return a ready future.
sslog.debug("[Stream #{}] COMPLETE_MESSAGE from {} dst_cpu_id={}", plan_id, from, dst_cpu_id);
return make_ready_future<>();
});
}
@@ -304,17 +303,6 @@ void stream_session::follower_start_sent() {
this->start_streaming_files();
}
void stream_session::complete() {
if (_state == stream_session_state::WAIT_COMPLETE) {
send_complete_message();
sslog.debug("[Stream #{}] complete: WAIT_COMPLETE -> COMPLETE: session={}", plan_id(), this);
close_session(stream_session_state::COMPLETE);
} else {
sslog.debug("[Stream #{}] complete: {} -> WAIT_COMPLETE: session={}", plan_id(), _state, this);
set_state(stream_session_state::WAIT_COMPLETE);
}
}
void stream_session::session_failed() {
close_session(stream_session_state::FAILED);
}
@@ -366,16 +354,9 @@ void stream_session::send_complete_message() {
bool stream_session::maybe_completed() {
bool completed = _receivers.empty() && _transfers.empty();
if (completed) {
if (_state == stream_session_state::WAIT_COMPLETE) {
send_complete_message();
sslog.debug("[Stream #{}] maybe_completed: WAIT_COMPLETE -> COMPLETE: session={}, peer={}", plan_id(), this, peer);
close_session(stream_session_state::COMPLETE);
} else {
// notify peer that this session is completed
sslog.debug("[Stream #{}] maybe_completed: {} -> WAIT_COMPLETE: session={}, peer={}", plan_id(), _state, this, peer);
set_state(stream_session_state::WAIT_COMPLETE);
send_complete_message();
}
send_complete_message();
sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, this, peer);
close_session(stream_session_state::COMPLETE);
}
return completed;
}