cql_test_env: main: move stream_manager initialization

Currently, stream_manager is initialized after storage_service and
so it is stopped before the storage_service is. In its stop method
storage_service accesses stream_manager which is uninitialized
at a time.

Move stream_manager initialization over the storage_service initialization.

Fixes: #23207.

Closes scylladb/scylladb#24008
This commit is contained in:
Aleksandra Martyniuk
2025-05-05 14:23:57 +02:00
committed by Botond Dénes
parent 4f87362abb
commit 9c03255fd2
2 changed files with 21 additions and 21 deletions

36
main.cc
View File

@@ -1757,6 +1757,24 @@ sharded<locator::shared_token_metadata> token_metadata;
utils::get_local_injector().inject("stop_after_starting_repair", [] { std::raise(SIGSTOP); });
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "initializing storage service");
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
@@ -1923,24 +1941,6 @@ sharded<locator::shared_token_metadata> token_metadata;
proxy.invoke_on_all(&service::storage_proxy::stop_remote).get();
});
debug::the_stream_manager = &stream_manager;
checkpoint(stop_signal, "starting streaming service");
stream_manager.start(std::ref(*cfg), std::ref(db), std::ref(view_builder), std::ref(messaging), std::ref(mm), std::ref(gossiper), maintenance_scheduling_group).get();
auto stop_stream_manager = defer_verbose_shutdown("stream manager", [&stream_manager] {
// FIXME -- keep the instances alive, just call .stop on them
stream_manager.invoke_on_all(&streaming::stream_manager::stop).get();
});
checkpoint(stop_signal, "starting streaming manager");
stream_manager.invoke_on_all([&stop_signal] (streaming::stream_manager& sm) {
return sm.start(stop_signal.as_local_abort_source());
}).get();
api::set_server_stream_manager(ctx, stream_manager).get();
auto stop_stream_manager_api = defer_verbose_shutdown("stream manager api", [&ctx] {
api::unset_server_stream_manager(ctx).get();
});
checkpoint(stop_signal, "starting hinted handoff manager");
if (!hinted_handoff_enabled.is_disabled_for_all()) {
hints_dir_initializer.ensure_rebalanced().get();

View File

@@ -900,6 +900,9 @@ private:
_view_builder.stop().get();
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_ss.start(std::ref(abort_sources), std::ref(_db),
std::ref(_gossiper),
std::ref(_sys_ks),
@@ -999,9 +1002,6 @@ private:
}
});
_stream_manager.start(std::ref(*cfg), std::ref(_db), std::ref(_view_builder), std::ref(_ms), std::ref(_mm), std::ref(_gossiper), scheduling_groups.streaming_scheduling_group).get();
auto stop_streaming = defer_verbose_shutdown("stream manager", [this] { _stream_manager.stop().get(); });
_sl_controller.invoke_on_all([this, &group0_client] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(