From 9c03255fd2bcd2e5565c0a7293dcbc7aaa8fed16 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Mon, 5 May 2025 14:23:57 +0200 Subject: [PATCH] cql_test_env: main: move stream_manager initialization Currently, stream_manager is initialized after storage_service and so it is stopped before the storage_service is. In its stop method storage_service accesses stream_manager which is uninitialized at a time. Move stream_manager initialization over the storage_service initialization. Fixes: #23207. Closes scylladb/scylladb#24008 --- main.cc | 36 ++++++++++++++++++------------------ test/lib/cql_test_env.cc | 6 +++--- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/main.cc b/main.cc index 586a7e73f0..46e8665c84 100644 --- a/main.cc +++ b/main.cc @@ -1757,6 +1757,24 @@ sharded token_metadata; utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); }); + debug::the_stream_manager = &stream_manager; + checkpoint(stop_signal, "starting streaming service"); + stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); + auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { + // FIXME -- keep the instances alive, just call .stop on them + stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); + }); + + checkpoint(stop_signal, "starting streaming manager"); + stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) { + return sm.start(stop_signal.as_local_abort_source()); + }).get(); + + api::set_server_stream_manager(ctx, stream_manager).get(); + auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] { + api::unset_server_stream_manager(ctx).get(); + }); + checkpoint(stop_signal, "initializing storage service"); debug::the_storage_service = &ss; ss.start(std::ref(stop_signal.as_sharded_abort_source()), @@ -1923,24 +1941,6 @@ sharded token_metadata; proxy.invoke_on_all(&service::storage_proxy::stop_remote).get(); }); - debug::the_stream_manager = &stream_manager; - checkpoint(stop_signal, "starting streaming service"); - stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get(); - auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] { - // FIXME -- keep the instances alive, just call .stop on them - stream_manager.invoke_on_all(&streaming::stream_manager::stop).get(); - }); - - checkpoint(stop_signal, "starting streaming manager"); - stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) { - return sm.start(stop_signal.as_local_abort_source()); - }).get(); - - api::set_server_stream_manager(ctx, stream_manager).get(); - auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] { - api::unset_server_stream_manager(ctx).get(); - }); - checkpoint(stop_signal, "starting hinted handoff manager"); if (!hinted_handoff_enabled.is_disabled_for_all()) { hints_dir_initializer.ensure_rebalanced().get(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 0400809960..66ec1e8c9d 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -900,6 +900,9 @@ private: _view_builder.stop().get(); }); + _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); + auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); }); + _ss.start(std::ref(abort_sources), std::ref(_db), std::ref(_gossiper), std::ref(_sys_ks), @@ -999,9 +1002,6 @@ private: } }); - _stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get(); - auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); }); - _sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) { qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor = ::static_pointer_cast(