diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 19506eeb12..78050066b0 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -236,7 +236,9 @@ future<> stream_session::on_initialization_complete() { for (auto& summary : msg.summaries) { this->prepare_receiving(summary); } - _stream_result->handle_session_prepared(this->shared_from_this()); + if (_stream_result) { + _stream_result->handle_session_prepared(this->shared_from_this()); + } } catch (...) { sslog.warn("[Stream #{}] Fail to send PREPARE_MESSAGE to {}, {}", this->plan_id(), id, std::current_exception()); throw; @@ -314,7 +316,9 @@ future stream_session::prepare(std::vector requ } } prepare.dst_cpu_id = engine().cpu_id();; - _stream_result->handle_session_prepared(shared_from_this()); + if (_stream_result) { + _stream_result->handle_session_prepared(shared_from_this()); + } return make_ready_future(std::move(prepare)); } @@ -479,7 +483,9 @@ void stream_session::close_session(stream_session_state final_state) { // Note that we shouldn't block on this close because this method is called on the handler // incoming thread (so we would deadlock). //handler.close(); - _stream_result->handle_session_complete(shared_from_this()); + if (_stream_result) { + _stream_result->handle_session_complete(shared_from_this()); + } sslog.debug("[Stream #{}] close_session session={}, state={}, cancel keep_alive timer", plan_id(), this, final_state); _keep_alive.cancel();