streaming: futurize stream_session::on_initialization_complete
This commit is contained in:
@@ -100,7 +100,7 @@ future<> stream_session::init_streaming_service(distributed<database>& db) {
|
||||
});
|
||||
}
|
||||
|
||||
void stream_session::on_initialization_complete() {
|
||||
future<> stream_session::on_initialization_complete() {
|
||||
// send prepare message
|
||||
set_state(stream_session_state::PREPARING);
|
||||
auto prepare = messages::prepare_message();
|
||||
@@ -391,15 +391,17 @@ void stream_session::start() {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// logger.info("[Stream #{}] Starting streaming to {}{}", plan_id(),
|
||||
// peer, peer == connecting ? "" : " through " + connecting);
|
||||
conn_handler.initiate().then([this] {
|
||||
on_initialization_complete();
|
||||
});
|
||||
} catch (...) {
|
||||
on_error();
|
||||
}
|
||||
// logger.info("[Stream #{}] Starting streaming to {}{}", plan_id(),
|
||||
// peer, peer == connecting ? "" : " through " + connecting);
|
||||
conn_handler.initiate().then([this] {
|
||||
return on_initialization_complete();
|
||||
}).then_wrapped([this] (auto&& f) {
|
||||
try {
|
||||
f.get();
|
||||
} catch (...) {
|
||||
this->on_error();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void stream_session::init(shared_ptr<stream_result_future> stream_result_) {
|
||||
|
||||
@@ -308,7 +308,7 @@ public:
|
||||
/**
|
||||
* Call back when connection initialization is complete to start the prepare phase.
|
||||
*/
|
||||
void on_initialization_complete();
|
||||
future<> on_initialization_complete();
|
||||
|
||||
/**l
|
||||
* Call back for handling exception during streaming.
|
||||
|
||||
Reference in New Issue
Block a user