From 853175fc61c19fb0d410e0427b4ddaafabab774d Mon Sep 17 00:00:00 2001 From: Asias He Date: Tue, 7 Jul 2015 17:22:57 +0800 Subject: [PATCH] streaming: Implement prepare_message handler This is a bit different from Origin. We always send back a prepare_message even if the initializer requested no data from the follower, to unify the handling. --- streaming/stream_session.cc | 11 +++++++---- streaming/stream_session.hh | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 6d8c720c32..7fe78e113f 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -47,8 +47,9 @@ void stream_session::init_messaging_service_handler() { ms().register_handler(messaging_verb::PREPARE_MESSAGE, [] (messages::prepare_message msg) { auto cpu_id = 0; return smp::submit_to(cpu_id, [msg = std::move(msg)] () mutable { - // TODO - messages::prepare_message msg_ret; + // TODO: find session + stream_session s; + auto msg_ret = s.prepare(std::move(msg.requests), std::move(msg.summaries)); return make_ready_future(std::move(msg_ret)); }); }); @@ -133,7 +134,7 @@ void stream_session::on_error() { close_session(stream_session_state::FAILED); } -void stream_session::prepare(std::vector requests, std::vector summaries) { +messages::prepare_message stream_session::prepare(std::vector requests, std::vector summaries) { // prepare tasks set_state(stream_session_state::PREPARING); for (auto& request : requests) { @@ -145,8 +146,8 @@ void stream_session::prepare(std::vector requests, std::vector requests, std::vector requests, std::vector summaries); + messages::prepare_message prepare(std::vector requests, std::vector summaries); /** * Call back after sending FileMessageHeader.