diff --git a/streaming/stream_manager.cc b/streaming/stream_manager.cc index 4667e2de03..3622365cd7 100644 --- a/streaming/stream_manager.cc +++ b/streaming/stream_manager.cc @@ -22,9 +22,12 @@ #include "core/distributed.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_result_future.hh" +#include "log.hh" namespace streaming { +extern logging::logger sslog; + distributed _the_stream_manager; void stream_manager::register_sending(shared_ptr result) { @@ -73,4 +76,19 @@ shared_ptr stream_manager::get_receiving_stream(UUID plan_ return {}; } +void stream_manager::remove_stream(UUID plan_id) { + sslog.debug("stream_manager: removing plan_id={}", plan_id); + _initiated_streams.erase(plan_id); + _receiving_streams.erase(plan_id); +} + +void stream_manager::show_streams() { + for (auto& x : _initiated_streams) { + sslog.debug("stream_manager:initiated_stream: plan_id={}", x.first); + } + for (auto& x : _receiving_streams) { + sslog.debug("stream_manager:receiving_stream: plan_id={}", x.first); + } +} + } // namespace streaming diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index a8192a5cf5..6274286835 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -121,6 +121,10 @@ public: shared_ptr get_sending_stream(UUID plan_id); shared_ptr get_receiving_stream(UUID plan_id); + + void remove_stream(UUID plan_id); + + void show_streams(); #if 0 public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) diff --git a/streaming/stream_result_future.cc b/streaming/stream_result_future.cc index 49453d3cc0..370e8d65e3 100644 --- a/streaming/stream_result_future.cc +++ b/streaming/stream_result_future.cc @@ -91,6 +91,11 @@ void stream_result_future::fire_stream_event(Event event) { void stream_result_future::maybe_complete() { if (!_coordinator->has_active_sessions()) { + auto& sm = get_local_stream_manager(); + if (sslog.is_enabled(logging::log_level::debug)) { + sm.show_streams(); + } + sm.remove_stream(plan_id); auto final_state = get_current_state(); if (final_state.has_failed_session()) { sslog.warn("[Stream #{}] Stream failed", plan_id);