mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-24 00:32:15 +00:00
streaming: Remove completed stream in stream_manager
This commit is contained in:
@@ -22,9 +22,12 @@
|
||||
#include "core/distributed.hh"
|
||||
#include "streaming/stream_manager.hh"
|
||||
#include "streaming/stream_result_future.hh"
|
||||
#include "log.hh"
|
||||
|
||||
namespace streaming {
|
||||
|
||||
extern logging::logger sslog;
|
||||
|
||||
distributed<stream_manager> _the_stream_manager;
|
||||
|
||||
void stream_manager::register_sending(shared_ptr<stream_result_future> result) {
|
||||
@@ -73,4 +76,19 @@ shared_ptr<stream_result_future> stream_manager::get_receiving_stream(UUID plan_
|
||||
return {};
|
||||
}
|
||||
|
||||
void stream_manager::remove_stream(UUID plan_id) {
|
||||
sslog.debug("stream_manager: removing plan_id={}", plan_id);
|
||||
_initiated_streams.erase(plan_id);
|
||||
_receiving_streams.erase(plan_id);
|
||||
}
|
||||
|
||||
void stream_manager::show_streams() {
|
||||
for (auto& x : _initiated_streams) {
|
||||
sslog.debug("stream_manager:initiated_stream: plan_id={}", x.first);
|
||||
}
|
||||
for (auto& x : _receiving_streams) {
|
||||
sslog.debug("stream_manager:receiving_stream: plan_id={}", x.first);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace streaming
|
||||
|
||||
@@ -121,6 +121,10 @@ public:
|
||||
shared_ptr<stream_result_future> get_sending_stream(UUID plan_id);
|
||||
|
||||
shared_ptr<stream_result_future> get_receiving_stream(UUID plan_id);
|
||||
|
||||
void remove_stream(UUID plan_id);
|
||||
|
||||
void show_streams();
|
||||
#if 0
|
||||
|
||||
public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
|
||||
|
||||
@@ -91,6 +91,11 @@ void stream_result_future::fire_stream_event(Event event) {
|
||||
|
||||
void stream_result_future::maybe_complete() {
|
||||
if (!_coordinator->has_active_sessions()) {
|
||||
auto& sm = get_local_stream_manager();
|
||||
if (sslog.is_enabled(logging::log_level::debug)) {
|
||||
sm.show_streams();
|
||||
}
|
||||
sm.remove_stream(plan_id);
|
||||
auto final_state = get_current_state();
|
||||
if (final_state.has_failed_session()) {
|
||||
sslog.warn("[Stream #{}] Stream failed", plan_id);
|
||||
|
||||
Reference in New Issue
Block a user