streaming: Check if _stream_result is valid

If on_error() was called before init() was executed, the
_stream_result can be invalid.
This commit is contained in:
Asias He
2017-07-25 15:30:09 +08:00
parent 8a3f6acdd2
commit be573bcafb

View File

@@ -236,7 +236,9 @@ future<> stream_session::on_initialization_complete() {
for (auto& summary : msg.summaries) {
this->prepare_receiving(summary);
}
_stream_result->handle_session_prepared(this->shared_from_this());
if (_stream_result) {
_stream_result->handle_session_prepared(this->shared_from_this());
}
} catch (...) {
sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception());
throw;
@@ -314,7 +316,9 @@ future<prepare_message> stream_session::prepare(std::vector<stream_request> requ
}
}
prepare.dst_cpu_id = engine().cpu_id();;
_stream_result->handle_session_prepared(shared_from_this());
if (_stream_result) {
_stream_result->handle_session_prepared(shared_from_this());
}
return make_ready_future<prepare_message>(std::move(prepare));
}
@@ -479,7 +483,9 @@ void stream_session::close_session(stream_session_state final_state) {
// Note that we shouldn't block on this close because this method is called on the handler
// incoming thread (so we would deadlock).
//handler.close();
_stream_result->handle_session_complete(shared_from_this());
if (_stream_result) {
_stream_result->handle_session_complete(shared_from_this());
}
sslog.debug("[Stream #{}] close_session session={}, state={}, cancel keep_alive timer", plan_id(), this, final_state);
_keep_alive.cancel();