streaming: Implement prepare_message handler

This is a bit different from Origin. We always send back a
prepare_message even if the initializer requested no data from the
follower, to unify the handling.
This commit is contained in:
Asias He
2015-07-07 17:22:57 +08:00
parent abfcf7d825
commit 853175fc61
2 changed files with 9 additions and 5 deletions

View File

@@ -47,8 +47,9 @@ void stream_session::init_messaging_service_handler() {
ms().register_handler(messaging_verb::PREPARE_MESSAGE, [] (messages::prepare_message msg) {
auto cpu_id = 0;
return smp::submit_to(cpu_id, [msg = std::move(msg)] () mutable {
// TODO
messages::prepare_message msg_ret;
// TODO: find session
stream_session s;
auto msg_ret = s.prepare(std::move(msg.requests), std::move(msg.summaries));
return make_ready_future<messages::prepare_message>(std::move(msg_ret));
});
});
@@ -133,7 +134,7 @@ void stream_session::on_error() {
close_session(stream_session_state::FAILED);
}
void stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
messages::prepare_message stream_session::prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries) {
// prepare tasks
set_state(stream_session_state::PREPARING);
for (auto& request : requests) {
@@ -145,8 +146,8 @@ void stream_session::prepare(std::vector<stream_request> requests, std::vector<s
}
// send back prepare message if prepare message contains stream request
messages::prepare_message prepare;
if (!requests.empty()) {
messages::prepare_message prepare;
for (auto& x: _transfers) {
auto& task = x.second;
prepare.summaries.emplace_back(task.get_summary());
@@ -158,6 +159,8 @@ void stream_session::prepare(std::vector<stream_request> requests, std::vector<s
if (!maybe_completed()) {
start_streaming_files();
}
return prepare;
}
void stream_session::file_sent(const messages::file_message_header& header) {

View File

@@ -31,6 +31,7 @@
#include "streaming/stream_receive_task.hh"
#include "streaming/stream_request.hh"
#include "streaming/messages/incoming_file_message.hh"
#include "streaming/messages/prepare_message.hh"
#include "streaming/stream_detail.hh"
#include "sstables/sstables.hh"
#include "query-request.hh"
@@ -317,7 +318,7 @@ public:
/**
* Prepare this session for sending/receiving files.
*/
void prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries);
messages::prepare_message prepare(std::vector<stream_request> requests, std::vector<stream_summary> summaries);
/**
* Call back after sending FileMessageHeader.