diff --git a/streaming/stream_result_future.cc b/streaming/stream_result_future.cc index 8eaf1702c0..c8276760b4 100644 --- a/streaming/stream_result_future.cc +++ b/streaming/stream_result_future.cc @@ -44,28 +44,30 @@ namespace streaming { extern logging::logger sslog; -future stream_result_future::init_sending_side(UUID plan_id_, sstring description_, std::vector listeners_, shared_ptr coordinator_) { - auto future = create_and_register(plan_id_, description_, coordinator_); +future stream_result_future::init_sending_side(UUID plan_id_, sstring description_, + std::vector listeners_, shared_ptr coordinator_) { + auto sr = make_shared(plan_id_, description_, coordinator_); + get_local_stream_manager().register_sending(sr); + for (auto& listener : listeners_) { - future->add_event_listener(listener); + sr->add_event_listener(listener); } sslog.info("[Stream #{}] Executing streaming plan for {}", plan_id_, description_); // Initialize and start all sessions for (auto& session : coordinator_->get_all_stream_sessions()) { - session->init(future); + session->init(sr); } coordinator_->connect_all_stream_sessions(); - return future->_done.get_future(); + return sr->_done.get_future(); } -void stream_result_future::init_receiving_side(UUID plan_id, - sstring description, inet_address from) { +void stream_result_future::init_receiving_side(UUID plan_id, sstring description, inet_address from) { auto& sm = get_local_stream_manager(); - auto f = sm.get_receiving_stream(plan_id); - if (f == nullptr) { + auto sr = sm.get_receiving_stream(plan_id); + if (sr == nullptr) { sslog.info("[Stream #{}] Creating new streaming plan for {}, with {}", plan_id, description, from); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. // TODO: stream_result_future needs a ref to stream_coordinator. @@ -134,11 +136,4 @@ void stream_result_future::handle_progress(progress_info progress) { fire_stream_event(progress_event(plan_id, std::move(progress))); } -shared_ptr stream_result_future::create_and_register(UUID plan_id_, sstring description_, shared_ptr coordinator_) { - auto future = make_shared(plan_id_, description_, coordinator_); - auto& sm = get_local_stream_manager(); - sm.register_sending(future); - return future; -} - } // namespace streaming diff --git a/streaming/stream_result_future.hh b/streaming/stream_result_future.hh index 8cb0a4bd53..f776072d67 100644 --- a/streaming/stream_result_future.hh +++ b/streaming/stream_result_future.hh @@ -104,9 +104,6 @@ public: static future init_sending_side(UUID plan_id_, sstring description_, std::vector listeners_, shared_ptr coordinator_); static void init_receiving_side(UUID plan_id, sstring description, inet_address from); -private: - static shared_ptr create_and_register(UUID plan_id_, sstring description_, shared_ptr coordinator_); - public: void add_event_listener(stream_event_handler* listener) { // FIXME: Futures.addCallback(this, listener);