diff --git a/main.cc b/main.cc index 7a81399a40..023a5ba43d 100644 --- a/main.cc +++ b/main.cc @@ -913,7 +913,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl ss.start(std::ref(stop_signal.as_sharded_abort_source()), std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory), - std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair), + std::ref(messaging), std::ref(repair), std::ref(stream_manager), std::ref(raft_gr), std::ref(lifecycle_notifier), std::ref(bm)).get(); auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] { @@ -1273,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }).get(); with_scheduling_group(maintenance_scheduling_group, [&] { - return ss.local().join_cluster(qp.local(), group0_client); + return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()); }).get(); sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 6b879cfa99..3c0defb59d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -88,7 +88,6 @@ storage_service::storage_service(abort_source& abort_source, locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory, sharded& ms, - sharded& cdc_gen_service, sharded& repair, sharded& stream_manager, raft_group_registry& raft_gr, @@ -106,7 +105,6 @@ storage_service::storage_service(abort_source& abort_source, , _node_ops_abort_thread(node_ops_abort_thread()) , _shared_token_metadata(stm) , _erm_factory(erm_factory) - , _cdc_gen_service(cdc_gen_service) , _lifecycle_notifier(elc_notif) , _batchlog_manager(bm) , _sys_dist_ks(sys_dist_ks) @@ -284,7 +282,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela slogger.info("Checking bootstrapping/leaving nodes: ok"); } -future<> storage_service::join_token_ring( +future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service, std::unordered_set initial_contact_nodes, std::unordered_set loaded_endpoints, std::unordered_map loaded_peer_features, @@ -524,7 +522,7 @@ future<> storage_service::join_token_ring( co_await start_sys_dist_ks(); co_await mark_existing_views_as_built(); co_await _sys_ks.local().update_tokens(bootstrap_tokens); - co_await bootstrap(bootstrap_tokens, cdc_gen_id); + co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id); } else { co_await start_sys_dist_ks(); bootstrap_tokens = co_await db::system_keyspace::get_saved_tokens(); @@ -576,7 +574,7 @@ future<> storage_service::join_token_ring( && (!_sys_ks.local().bootstrap_complete() || cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) { try { - cdc_gen_id = co_await _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node()); + cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()); } catch (...) { cdc_log.warn( "Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems." @@ -608,7 +606,7 @@ future<> storage_service::join_token_ring( throw std::runtime_error(err); } - co_await _cdc_gen_service.local().after_join(std::move(cdc_gen_id)); + co_await cdc_gen_service.after_join(std::move(cdc_gen_id)); } future<> storage_service::mark_existing_views_as_built() { @@ -644,8 +642,8 @@ std::list storage_service::get_ignore_dead_nodes_for_replace( } // Runs inside seastar::async context -future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, std::optional& cdc_gen_id) { - return seastar::async([this, &bootstrap_tokens, &cdc_gen_id] { +future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set& bootstrap_tokens, std::optional& cdc_gen_id) { + return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service] { auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap); set_mode(mode::BOOTSTRAP); @@ -700,7 +698,7 @@ future<> storage_service::bootstrap(std::unordered_set& bootstrap_tokens, // We don't do any other generation switches (unless we crash before complecting bootstrap). assert(!cdc_gen_id); - cdc_gen_id = _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node()).get0(); + cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0(); if (!bootstrap_rbno) { // When is_repair_based_node_ops_enabled is true, the bootstrap node @@ -1338,10 +1336,10 @@ future<> storage_service::uninit_messaging_service_part() { return container().invoke_on_all(&service::storage_service::uninit_messaging_service); } -future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client) { +future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service) { assert(this_shard_id() == 0); - return seastar::async([this, &qp, &client] { + return seastar::async([this, &qp, &client, &cdc_gen_service] { set_mode(mode::STARTING); _group0 = std::make_unique(_abort_source, _raft_gr, _messaging.local(), @@ -1395,7 +1393,7 @@ future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_cl for (auto& x : loaded_peer_features) { slogger.info("peer={}, supported_features={}", x.first, x.second); } - join_token_ring(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get(); + join_token_ring(cdc_gen_service, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get(); }); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 64306c96d8..dc09a5179e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -180,7 +180,6 @@ public: locator::shared_token_metadata& stm, locator::effective_replication_map_factory& erm_factory, sharded& ms, - sharded&, sharded& repair, sharded& stream_manager, raft_group_registry& raft_gr, @@ -263,11 +262,6 @@ private: shared_token_metadata& _shared_token_metadata; locator::effective_replication_map_factory& _erm_factory; - /* CDC generation management service. - * It is sharded<>& and not simply a reference because the service will not yet be started - * when storage_service is constructed (but it will be when init_server is called) - */ - sharded& _cdc_gen_service; public: std::chrono::milliseconds get_ring_delay(); private: @@ -352,7 +346,7 @@ public: * * \see init_messaging_service_part */ - future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client); + future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service); future<> drain_on_shutdown(); @@ -363,7 +357,7 @@ private: std::optional get_replace_address(); bool is_replacing(); bool is_first_node(); - future<> join_token_ring( + future<> join_token_ring(cdc::generation_service& cdc_gen_service, std::unordered_set initial_contact_nodes, std::unordered_set loaded_endpoints, std::unordered_map loaded_peer_features, @@ -380,7 +374,7 @@ private: // Stream data for which we become a new replica. // Before that, if we're not replacing another node, inform other nodes about our chosen tokens // and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming. - future<> bootstrap(std::unordered_set& bootstrap_tokens, std::optional& cdc_gen_id); + future<> bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set& bootstrap_tokens, std::optional& cdc_gen_id); public: /** diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 6617cd0d03..a5e66c1e90 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -710,7 +710,6 @@ public: std::ref(sys_dist_ks), std::ref(sys_ks), std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory), std::ref(ms), - std::ref(cdc_generation_service), std::ref(repair), std::ref(stream_manager), std::ref(raft_gr), std::ref(elc_notif), @@ -783,7 +782,7 @@ public: }); try { - ss.local().join_cluster(qp.local(), group0_client).get(); + ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()).get(); } catch (std::exception& e) { // if any of the defers crashes too, we'll never see // the error