diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 1ae136a4c7..bf870f387a 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -100,7 +100,7 @@ future<> stream_session::init_streaming_service(distributed& db) { }); } -void stream_session::on_initialization_complete() { +future<> stream_session::on_initialization_complete() { // send prepare message set_state(stream_session_state::PREPARING); auto prepare = messages::prepare_message(); @@ -391,15 +391,17 @@ void stream_session::start() { return; } - try { - // logger.info("[Stream #{}] Starting streaming to {}{}", plan_id(), - // peer, peer == connecting ? "" : " through " + connecting); - conn_handler.initiate().then([this] { - on_initialization_complete(); - }); - } catch (...) { - on_error(); - } + // logger.info("[Stream #{}] Starting streaming to {}{}", plan_id(), + // peer, peer == connecting ? "" : " through " + connecting); + conn_handler.initiate().then([this] { + return on_initialization_complete(); + }).then_wrapped([this] (auto&& f) { + try { + f.get(); + } catch (...) { + this->on_error(); + } + }); } void stream_session::init(shared_ptr stream_result_) { diff --git a/streaming/stream_session.hh b/streaming/stream_session.hh index f5c980c83c..1524bfb177 100644 --- a/streaming/stream_session.hh +++ b/streaming/stream_session.hh @@ -308,7 +308,7 @@ public: /** * Call back when connection initialization is complete to start the prepare phase. */ - void on_initialization_complete(); + future<> on_initialization_complete(); /**l * Call back for handling exception during streaming.