storage_service: Remove sys_dist_ks from storage_service dependencies

The service in question is only needed join_cluster-time, no need to
keep it in the dependencies list. This also solves the dependency
trouble -- the distributed keyspace is sharded::start-ed after it's
passed to storage_service initialization.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-05-20 16:03:42 +03:00
parent 5a97ba7121
commit bc051387c5
4 changed files with 22 additions and 27 deletions

View File

@@ -911,7 +911,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
sscfg.available_memory = memory::stats().total_memory();
debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(db), std::ref(gossiper), std::ref(sys_ks),
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
std::ref(messaging), std::ref(repair),
std::ref(stream_manager), std::ref(raft_gr), std::ref(lifecycle_notifier), std::ref(bm)).get();
@@ -1273,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}).get();
with_scheduling_group(maintenance_scheduling_group, [&] {
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local());
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local(), sys_dist_ks);
}).get();
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {

View File

@@ -80,7 +80,6 @@ static logging::logger slogger("storage_service");
storage_service::storage_service(abort_source& abort_source,
distributed<replica::database>& db, gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
gms::feature_service& feature_service,
storage_service_config config,
@@ -107,7 +106,6 @@ storage_service::storage_service(abort_source& abort_source,
, _erm_factory(erm_factory)
, _lifecycle_notifier(elc_notif)
, _batchlog_manager(bm)
, _sys_dist_ks(sys_dist_ks)
, _sys_ks(sys_ks)
, _snitch_reconfigure([this] { return snitch_reconfigured(); })
{
@@ -220,11 +218,6 @@ future<> storage_service::snitch_reconfigured() {
return update_topology(utils::fb_utilities::get_broadcast_address());
}
future<> storage_service::start_sys_dist_ks() {
supervisor::notify("starting system distributed keyspace");
co_await _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
}
/* Broadcasts the chosen tokens through gossip,
* together with a CDC generation timestamp and STATUS=NORMAL.
*
@@ -283,6 +276,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
}
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
@@ -519,12 +513,13 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
slogger.info("Replacing a node with token(s): {}", bootstrap_tokens);
// bootstrap_tokens was previously set using tokens gossiped by the replaced node
}
co_await start_sys_dist_ks();
co_await mark_existing_views_as_built();
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
co_await mark_existing_views_as_built(sys_dist_ks);
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id);
} else {
co_await start_sys_dist_ks();
supervisor::notify("starting system distributed keyspace");
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
bootstrap_tokens = co_await db::system_keyspace::get_saved_tokens();
if (bootstrap_tokens.empty()) {
bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
@@ -609,12 +604,12 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
}
future<> storage_service::mark_existing_views_as_built() {
return _db.invoke_on(0, [this] (replica::database& db) {
return do_with(db.get_views(), [this] (std::vector<view_ptr>& views) {
return parallel_for_each(views, [this] (view_ptr& view) {
return db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()).then([this, view] {
return _sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
future<> storage_service::mark_existing_views_as_built(sharded<db::system_distributed_keyspace>& sys_dist_ks) {
return _db.invoke_on(0, [this, &sys_dist_ks] (replica::database& db) {
return do_with(db.get_views(), [this, &sys_dist_ks] (std::vector<view_ptr>& views) {
return parallel_for_each(views, [this, &sys_dist_ks] (view_ptr& view) {
return db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()).then([this, view, &sys_dist_ks] {
return sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
});
});
});
@@ -1336,10 +1331,10 @@ future<> storage_service::uninit_messaging_service_part() {
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
}
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service) {
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
assert(this_shard_id() == 0);
return seastar::async([this, &qp, &client, &cdc_gen_service] {
return seastar::async([this, &qp, &client, &cdc_gen_service, &sys_dist_ks] {
set_mode(mode::STARTING);
_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
@@ -1393,7 +1388,7 @@ future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_cl
for (auto& x : loaded_peer_features) {
slogger.info("peer={}, supported_features={}", x.first, x.second);
}
join_token_ring(cdc_gen_service, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
join_token_ring(cdc_gen_service, sys_dist_ks, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
});
}

View File

@@ -172,7 +172,6 @@ private:
public:
storage_service(abort_source& as, distributed<replica::database>& db,
gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>&,
sharded<db::system_keyspace>&,
gms::feature_service& feature_service,
storage_service_config config,
@@ -346,7 +345,8 @@ public:
*
* \see init_messaging_service_part
*/
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service);
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service,
sharded<db::system_distributed_keyspace>& sys_dist_ks);
future<> drain_on_shutdown();
@@ -358,6 +358,7 @@ private:
bool is_replacing();
bool is_first_node();
future<> join_token_ring(cdc::generation_service& cdc_gen_service,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
std::unordered_set<gms::inet_address> initial_contact_nodes,
std::unordered_set<gms::inet_address> loaded_endpoints,
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
@@ -369,7 +370,7 @@ public:
private:
void set_mode(mode m);
future<> mark_existing_views_as_built();
future<> mark_existing_views_as_built(sharded<db::system_distributed_keyspace>&);
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
@@ -500,7 +501,6 @@ private:
private:
// Should be serialized under token_metadata_lock.
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
sharded<db::system_keyspace>& _sys_ks;
locator::snitch_signal_slot_t _snitch_reconfigure;
std::unordered_set<gms::inet_address> _replacing_nodes_pending_ranges_updater;

View File

@@ -707,7 +707,7 @@ public:
sscfg.available_memory = memory::stats().total_memory();
ss.start(std::ref(abort_sources), std::ref(db),
std::ref(gossiper),
std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(sys_ks),
std::ref(feature_service), sscfg, std::ref(mm),
std::ref(token_metadata), std::ref(erm_factory), std::ref(ms),
std::ref(repair),
@@ -782,7 +782,7 @@ public:
});
try {
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()).get();
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local(), sys_dist_ks).get();
} catch (std::exception& e) {
// if any of the defers crashes too, we'll never see
// the error