streaming: Simplify session completion logic
Both the initiator and follower of a stream session knows how many
transfer task and receive task the stream session contains in the
preparation phase. They use the _transfers and _receivers map to track
the tasks, like below:
std::map<UUID, stream_transfer_task> _transfers;
std::map<UUID, stream_receive_task> _receivers;
A stream_transfer_task will send STREAM_MUTATION verb to transfer data
with frozen_mutation, when all the STREAM_MUTATIONs are sent, it will
send STREAM_MUTATION_DONE to tell the peer the stream_transfer_task is
completed and remove the stream_transfer_task from _transfers map. The
peer will remove the corresponding stream_receive_task in _receivers.
We do not really need the COMPLETE_MESSAGE verb to notify the peer we
have completed sending. It makes the session completion logic much
simpler and cleaner if we do not depend on COMPLETE_MESSAGE verb.
However, to be compatible with older version, we always send a
COMPLETE_MESSAGE message and do nothing in the COMPLETE_MESSAGE handler
and replies a ready future even if the stream_session is closed already.
This way, node with older version will get a COMPLETE_MESSAGE message
and manage to send a COMPLETE_MESSAGE message to new node as before.
Message-Id: <1458540564-34277-2-git-send-email-asias@scylladb.com>
This commit is contained in:
Notes:
Avi Kivity
2016-03-26 22:06:40 +03:00
backport: no
@@ -174,10 +174,9 @@ void stream_session::init_messaging_service_handler() {
|
||||
});
|
||||
ms().register_complete_message([] (const rpc::client_info& cinfo, UUID plan_id, unsigned dst_cpu_id) {
|
||||
const auto& from = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
return smp::submit_to(dst_cpu_id, [plan_id, from, dst_cpu_id] () mutable {
|
||||
auto session = get_session(plan_id, from, "COMPLETE_MESSAGE");
|
||||
session->complete();
|
||||
});
|
||||
// Be compatible with old version. Do nothing but return a ready future.
|
||||
sslog.debug("[Stream #{}] COMPLETE_MESSAGE from {} dst_cpu_id={}", plan_id, from, dst_cpu_id);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -304,17 +303,6 @@ void stream_session::follower_start_sent() {
|
||||
this->start_streaming_files();
|
||||
}
|
||||
|
||||
void stream_session::complete() {
|
||||
if (_state == stream_session_state::WAIT_COMPLETE) {
|
||||
send_complete_message();
|
||||
sslog.debug("[Stream #{}] complete: WAIT_COMPLETE -> COMPLETE: session={}", plan_id(), this);
|
||||
close_session(stream_session_state::COMPLETE);
|
||||
} else {
|
||||
sslog.debug("[Stream #{}] complete: {} -> WAIT_COMPLETE: session={}", plan_id(), _state, this);
|
||||
set_state(stream_session_state::WAIT_COMPLETE);
|
||||
}
|
||||
}
|
||||
|
||||
void stream_session::session_failed() {
|
||||
close_session(stream_session_state::FAILED);
|
||||
}
|
||||
@@ -366,16 +354,9 @@ void stream_session::send_complete_message() {
|
||||
bool stream_session::maybe_completed() {
|
||||
bool completed = _receivers.empty() && _transfers.empty();
|
||||
if (completed) {
|
||||
if (_state == stream_session_state::WAIT_COMPLETE) {
|
||||
send_complete_message();
|
||||
sslog.debug("[Stream #{}] maybe_completed: WAIT_COMPLETE -> COMPLETE: session={}, peer={}", plan_id(), this, peer);
|
||||
close_session(stream_session_state::COMPLETE);
|
||||
} else {
|
||||
// notify peer that this session is completed
|
||||
sslog.debug("[Stream #{}] maybe_completed: {} -> WAIT_COMPLETE: session={}, peer={}", plan_id(), _state, this, peer);
|
||||
set_state(stream_session_state::WAIT_COMPLETE);
|
||||
send_complete_message();
|
||||
}
|
||||
send_complete_message();
|
||||
sslog.debug("[Stream #{}] maybe_completed: {} -> COMPLETE: session={}, peer={}", plan_id(), _state, this, peer);
|
||||
close_session(stream_session_state::COMPLETE);
|
||||
}
|
||||
return completed;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user