storage_service: Remove cdc_gen_service from storage_service dependencies
This service is only needed join-time, it's better to pass it as argument to join_cluster(). This solves current reversed dependency issuse -- the cdc_gen_svc is now started after it's passed to storage service initialization. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
4
main.cc
4
main.cc
@@ -913,7 +913,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
|
||||
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
|
||||
std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(raft_gr), std::ref(lifecycle_notifier), std::ref(bm)).get();
|
||||
|
||||
auto stop_storage_service = defer_verbose_shutdown("storage_service", [&] {
|
||||
@@ -1273,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return ss.local().join_cluster(qp.local(), group0_client);
|
||||
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local());
|
||||
}).get();
|
||||
|
||||
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {
|
||||
|
||||
@@ -88,7 +88,6 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
locator::shared_token_metadata& stm,
|
||||
locator::effective_replication_map_factory& erm_factory,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<cdc::generation_service>& cdc_gen_service,
|
||||
sharded<repair_service>& repair,
|
||||
sharded<streaming::stream_manager>& stream_manager,
|
||||
raft_group_registry& raft_gr,
|
||||
@@ -106,7 +105,6 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _node_ops_abort_thread(node_ops_abort_thread())
|
||||
, _shared_token_metadata(stm)
|
||||
, _erm_factory(erm_factory)
|
||||
, _cdc_gen_service(cdc_gen_service)
|
||||
, _lifecycle_notifier(elc_notif)
|
||||
, _batchlog_manager(bm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
@@ -284,7 +282,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
|
||||
slogger.info("Checking bootstrapping/leaving nodes: ok");
|
||||
}
|
||||
|
||||
future<> storage_service::join_token_ring(
|
||||
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
@@ -524,7 +522,7 @@ future<> storage_service::join_token_ring(
|
||||
co_await start_sys_dist_ks();
|
||||
co_await mark_existing_views_as_built();
|
||||
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
|
||||
co_await bootstrap(bootstrap_tokens, cdc_gen_id);
|
||||
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id);
|
||||
} else {
|
||||
co_await start_sys_dist_ks();
|
||||
bootstrap_tokens = co_await db::system_keyspace::get_saved_tokens();
|
||||
@@ -576,7 +574,7 @@ future<> storage_service::join_token_ring(
|
||||
&& (!_sys_ks.local().bootstrap_complete()
|
||||
|| cdc::should_propose_first_generation(get_broadcast_address(), _gossiper))) {
|
||||
try {
|
||||
cdc_gen_id = co_await _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node());
|
||||
cdc_gen_id = co_await cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node());
|
||||
} catch (...) {
|
||||
cdc_log.warn(
|
||||
"Could not create a new CDC generation: {}. This may make it impossible to use CDC or cause performance problems."
|
||||
@@ -608,7 +606,7 @@ future<> storage_service::join_token_ring(
|
||||
throw std::runtime_error(err);
|
||||
}
|
||||
|
||||
co_await _cdc_gen_service.local().after_join(std::move(cdc_gen_id));
|
||||
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
|
||||
}
|
||||
|
||||
future<> storage_service::mark_existing_views_as_built() {
|
||||
@@ -644,8 +642,8 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id) {
|
||||
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id] {
|
||||
future<> storage_service::bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id) {
|
||||
return seastar::async([this, &bootstrap_tokens, &cdc_gen_id, &cdc_gen_service] {
|
||||
auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap);
|
||||
|
||||
set_mode(mode::BOOTSTRAP);
|
||||
@@ -700,7 +698,7 @@ future<> storage_service::bootstrap(std::unordered_set<token>& bootstrap_tokens,
|
||||
// We don't do any other generation switches (unless we crash before complecting bootstrap).
|
||||
assert(!cdc_gen_id);
|
||||
|
||||
cdc_gen_id = _cdc_gen_service.local().make_new_generation(bootstrap_tokens, !is_first_node()).get0();
|
||||
cdc_gen_id = cdc_gen_service.make_new_generation(bootstrap_tokens, !is_first_node()).get0();
|
||||
|
||||
if (!bootstrap_rbno) {
|
||||
// When is_repair_based_node_ops_enabled is true, the bootstrap node
|
||||
@@ -1338,10 +1336,10 @@ future<> storage_service::uninit_messaging_service_part() {
|
||||
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
|
||||
}
|
||||
|
||||
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client) {
|
||||
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
return seastar::async([this, &qp, &client] {
|
||||
return seastar::async([this, &qp, &client, &cdc_gen_service] {
|
||||
set_mode(mode::STARTING);
|
||||
|
||||
_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
|
||||
@@ -1395,7 +1393,7 @@ future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_cl
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
join_token_ring(std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
|
||||
join_token_ring(cdc_gen_service, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -180,7 +180,6 @@ public:
|
||||
locator::shared_token_metadata& stm,
|
||||
locator::effective_replication_map_factory& erm_factory,
|
||||
sharded<netw::messaging_service>& ms,
|
||||
sharded<cdc::generation_service>&,
|
||||
sharded<repair_service>& repair,
|
||||
sharded<streaming::stream_manager>& stream_manager,
|
||||
raft_group_registry& raft_gr,
|
||||
@@ -263,11 +262,6 @@ private:
|
||||
shared_token_metadata& _shared_token_metadata;
|
||||
locator::effective_replication_map_factory& _erm_factory;
|
||||
|
||||
/* CDC generation management service.
|
||||
* It is sharded<>& and not simply a reference because the service will not yet be started
|
||||
* when storage_service is constructed (but it will be when init_server is called)
|
||||
*/
|
||||
sharded<cdc::generation_service>& _cdc_gen_service;
|
||||
public:
|
||||
std::chrono::milliseconds get_ring_delay();
|
||||
private:
|
||||
@@ -352,7 +346,7 @@ public:
|
||||
*
|
||||
* \see init_messaging_service_part
|
||||
*/
|
||||
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client);
|
||||
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service);
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
@@ -363,7 +357,7 @@ private:
|
||||
std::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
bool is_first_node();
|
||||
future<> join_token_ring(
|
||||
future<> join_token_ring(cdc::generation_service& cdc_gen_service,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
@@ -380,7 +374,7 @@ private:
|
||||
// Stream data for which we become a new replica.
|
||||
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
|
||||
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
|
||||
future<> bootstrap(std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id);
|
||||
future<> bootstrap(cdc::generation_service& cdc_gen_service, std::unordered_set<token>& bootstrap_tokens, std::optional<cdc::generation_id>& cdc_gen_id);
|
||||
|
||||
public:
|
||||
/**
|
||||
|
||||
@@ -710,7 +710,6 @@ public:
|
||||
std::ref(sys_dist_ks), std::ref(sys_ks),
|
||||
std::ref(feature_service), sscfg, std::ref(mm),
|
||||
std::ref(token_metadata), std::ref(erm_factory), std::ref(ms),
|
||||
std::ref(cdc_generation_service),
|
||||
std::ref(repair),
|
||||
std::ref(stream_manager),
|
||||
std::ref(raft_gr), std::ref(elc_notif),
|
||||
@@ -783,7 +782,7 @@ public:
|
||||
});
|
||||
|
||||
try {
|
||||
ss.local().join_cluster(qp.local(), group0_client).get();
|
||||
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()).get();
|
||||
} catch (std::exception& e) {
|
||||
// if any of the defers crashes too, we'll never see
|
||||
// the error
|
||||
|
||||
Reference in New Issue
Block a user