storage_service: Remove cdc_gen_service from storage_service dependencies

This service is only needed join-time, it's better to pass it as
argument to join_cluster(). This solves current reversed dependency
issuse -- the cdc_gen_svc is now started after it's passed to storage
service initialization.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-05-20 15:57:11 +03:00
parent 0c40b69411
commit 5a97ba7121
4 changed files with 16 additions and 25 deletions

View File

@@ -913,7 +913,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
std::ref(messaging), std::ref(repair),
std::ref(stream_manager), std::ref(raft_gr), std::ref(lifecycle_notifier), std::ref(bm)).get();
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
@@ -1273,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
with_scheduling_group(maintenance_scheduling_group, [&] {
return ss.local().join_cluster(qp.local(), group0_client);
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local());
}).get();
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {

View File

@@ -88,7 +88,6 @@ storage_service::storage_service(abort_source& abort_source,
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<cdc::generation_service>& cdc_gen_service,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
raft_group_registry& raft_gr,
@@ -106,7 +105,6 @@ storage_service::storage_service(abort_source& abort_source,
, _node_ops_abort_thread(node_ops_abort_thread())
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
, _cdc_gen_service(cdc_gen_service)
, _lifecycle_notifier(elc_notif)
, _batchlog_manager(bm)
, _sys_dist_ks(sys_dist_ks)
@@ -284,7 +282,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
slogger.info("Checking bootstrapping/leaving nodes: ok");
}
future<> storage_service::join_token_ring(
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
@@ -524,7 +522,7 @@ future<> storage_service::join_token_ring(
co_await start_sys_dist_ks();
co_await mark_existing_views_as_built();
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
co_await bootstrap(bootstrap_tokens, cdc_gen_id);
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id);
} else {
co_await start_sys_dist_ks();
bootstrap_tokens = co_await db::system_keyspace::get_saved_tokens();
@@ -576,7 +574,7 @@ future<> storage_service::join_token_ring(
&& (!_sys_ks.local().bootstrap_complete()
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
try {
cdc_gen_id = co_await _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node());
cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node());
} catch (...) {
cdc_log.warn(
"Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems."
@@ -608,7 +606,7 @@ future<> storage_service::join_token_ring(
throw std::runtime_error(err);
}
co_await _cdc_gen_service.local().after_join(std::move(cdc_gen_id));
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
}
future<> storage_service::mark_existing_views_as_built() {
@@ -644,8 +642,8 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
}
// Runs inside seastar::async context
future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id] {
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id) {
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service] {
auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap);
set_mode(mode::BOOTSTRAP);
@@ -700,7 +698,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
// We don't do any other generation switches (unless we crash before complecting bootstrap).
assert(!cdc_gen_id);
cdc_gen_id = _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node()).get0();
cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0();
if (!bootstrap_rbno) {
// When is_repair_based_node_ops_enabled is true, the bootstrap node
@@ -1338,10 +1336,10 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client) {
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service) {
assert(this_shard_id() == 0);
return seastar::async([this, &qp, &client] {
return seastar::async([this, &qp, &client, &cdc_gen_service] {
set_mode(mode::STARTING);
_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
@@ -1395,7 +1393,7 @@ future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_cl
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
join_token_ring(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
join_token_ring(cdc_gen_service, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
});
}

View File

@@ -180,7 +180,6 @@ public:
locator::shared_token_metadata& stm,
locator::effective_replication_map_factory& erm_factory,
sharded<netw::messaging_service>& ms,
sharded<cdc::generation_service>&,
sharded<repair_service>& repair,
sharded<streaming::stream_manager>& stream_manager,
raft_group_registry& raft_gr,
@@ -263,11 +262,6 @@ private:
shared_token_metadata& _shared_token_metadata;
locator::effective_replication_map_factory& _erm_factory;
/* CDC generation management service.
* It is sharded<>& and not simply a reference because the service will not yet be started
* when storage_service is constructed (but it will be when init_server is called)
*/
sharded<cdc::generation_service>& _cdc_gen_service;
public:
std::chrono::milliseconds get_ring_delay();
private:
@@ -352,7 +346,7 @@ public:
*
* \see init_messaging_service_part
*/
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client);
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service);
future<> drain_on_shutdown();
@@ -363,7 +357,7 @@ private:
std::optional<gms::inet_address> get_replace_address();
bool is_replacing();
bool is_first_node();
future<> join_token_ring(
future<> join_token_ring(cdc::generation_service& cdc_gen_service,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
@@ -380,7 +374,7 @@ private:
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
future<> bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id);
future<> bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id);
public:
/**

View File

@@ -710,7 +710,6 @@ public:
std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(feature_service), sscfg, std::ref(mm),
std::ref(token_metadata), std::ref(erm_factory), std::ref(ms),
std::ref(cdc_generation_service),
std::ref(repair),
std::ref(stream_manager),
std::ref(raft_gr), std::ref(elc_notif),
@@ -783,7 +782,7 @@ public:
});
try {
ss.local().join_cluster(qp.local(), group0_client).get();
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()).get();
} catch (std::exception& e) {
// if any of the defers crashes too, we'll never see
// the error