storage_service: Remove sys_dist_ks from storage_service dependencies
The service in question is only needed join_cluster-time, no need to keep it in the dependencies list. This also solves the dependency trouble -- the distributed keyspace is sharded::start-ed after it's passed to storage_service initialization. Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
4
main.cc
4
main.cc
@@ -911,7 +911,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
debug::the_storage_service = &ss;
|
||||
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
|
||||
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
|
||||
std::ref(db), std::ref(gossiper), std::ref(sys_ks),
|
||||
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata), std::ref(erm_factory),
|
||||
std::ref(messaging), std::ref(repair),
|
||||
std::ref(stream_manager), std::ref(raft_gr), std::ref(lifecycle_notifier), std::ref(bm)).get();
|
||||
@@ -1273,7 +1273,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
}).get();
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local());
|
||||
return ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local(), sys_dist_ks);
|
||||
}).get();
|
||||
|
||||
sl_controller.invoke_on_all([&lifecycle_notifier] (qos::service_level_controller& controller) {
|
||||
|
||||
@@ -80,7 +80,6 @@ static logging::logger slogger("storage_service");
|
||||
|
||||
storage_service::storage_service(abort_source& abort_source,
|
||||
distributed<replica::database>& db, gms::gossiper& gossiper,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
gms::feature_service& feature_service,
|
||||
storage_service_config config,
|
||||
@@ -107,7 +106,6 @@ storage_service::storage_service(abort_source& abort_source,
|
||||
, _erm_factory(erm_factory)
|
||||
, _lifecycle_notifier(elc_notif)
|
||||
, _batchlog_manager(bm)
|
||||
, _sys_dist_ks(sys_dist_ks)
|
||||
, _sys_ks(sys_ks)
|
||||
, _snitch_reconfigure([this] { return snitch_reconfigured(); })
|
||||
{
|
||||
@@ -220,11 +218,6 @@ future<> storage_service::snitch_reconfigured() {
|
||||
return update_topology(utils::fb_utilities::get_broadcast_address());
|
||||
}
|
||||
|
||||
future<> storage_service::start_sys_dist_ks() {
|
||||
supervisor::notify("starting system distributed keyspace");
|
||||
co_await _sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
|
||||
}
|
||||
|
||||
/* Broadcasts the chosen tokens through gossip,
|
||||
* together with a CDC generation timestamp and STATUS=NORMAL.
|
||||
*
|
||||
@@ -283,6 +276,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
|
||||
}
|
||||
|
||||
future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
@@ -519,12 +513,13 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
slogger.info("Replacing a node with token(s): {}", bootstrap_tokens);
|
||||
// bootstrap_tokens was previously set using tokens gossiped by the replaced node
|
||||
}
|
||||
co_await start_sys_dist_ks();
|
||||
co_await mark_existing_views_as_built();
|
||||
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
|
||||
co_await mark_existing_views_as_built(sys_dist_ks);
|
||||
co_await _sys_ks.local().update_tokens(bootstrap_tokens);
|
||||
co_await bootstrap(cdc_gen_service, bootstrap_tokens, cdc_gen_id);
|
||||
} else {
|
||||
co_await start_sys_dist_ks();
|
||||
supervisor::notify("starting system distributed keyspace");
|
||||
co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start);
|
||||
bootstrap_tokens = co_await db::system_keyspace::get_saved_tokens();
|
||||
if (bootstrap_tokens.empty()) {
|
||||
bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
|
||||
@@ -609,12 +604,12 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi
|
||||
co_await cdc_gen_service.after_join(std::move(cdc_gen_id));
|
||||
}
|
||||
|
||||
future<> storage_service::mark_existing_views_as_built() {
|
||||
return _db.invoke_on(0, [this] (replica::database& db) {
|
||||
return do_with(db.get_views(), [this] (std::vector<view_ptr>& views) {
|
||||
return parallel_for_each(views, [this] (view_ptr& view) {
|
||||
return db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()).then([this, view] {
|
||||
return _sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
|
||||
future<> storage_service::mark_existing_views_as_built(sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
return _db.invoke_on(0, [this, &sys_dist_ks] (replica::database& db) {
|
||||
return do_with(db.get_views(), [this, &sys_dist_ks] (std::vector<view_ptr>& views) {
|
||||
return parallel_for_each(views, [this, &sys_dist_ks] (view_ptr& view) {
|
||||
return db::system_keyspace::mark_view_as_built(view->ks_name(), view->cf_name()).then([this, view, &sys_dist_ks] {
|
||||
return sys_dist_ks.local().finish_view_build(view->ks_name(), view->cf_name());
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1336,10 +1331,10 @@ future<> storage_service::uninit_messaging_service_part() {
|
||||
return container().invoke_on_all(&service::storage_service::uninit_messaging_service);
|
||||
}
|
||||
|
||||
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service) {
|
||||
future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service, sharded<db::system_distributed_keyspace>& sys_dist_ks) {
|
||||
assert(this_shard_id() == 0);
|
||||
|
||||
return seastar::async([this, &qp, &client, &cdc_gen_service] {
|
||||
return seastar::async([this, &qp, &client, &cdc_gen_service, &sys_dist_ks] {
|
||||
set_mode(mode::STARTING);
|
||||
|
||||
_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
|
||||
@@ -1393,7 +1388,7 @@ future<> storage_service::join_cluster(cql3::query_processor& qp, raft_group0_cl
|
||||
for (auto& x : loaded_peer_features) {
|
||||
slogger.info("peer={}, supported_features={}", x.first, x.second);
|
||||
}
|
||||
join_token_ring(cdc_gen_service, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
|
||||
join_token_ring(cdc_gen_service, sys_dist_ks, std::move(initial_contact_nodes), std::move(loaded_endpoints), std::move(loaded_peer_features), get_ring_delay()).get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -172,7 +172,6 @@ private:
|
||||
public:
|
||||
storage_service(abort_source& as, distributed<replica::database>& db,
|
||||
gms::gossiper& gossiper,
|
||||
sharded<db::system_distributed_keyspace>&,
|
||||
sharded<db::system_keyspace>&,
|
||||
gms::feature_service& feature_service,
|
||||
storage_service_config config,
|
||||
@@ -346,7 +345,8 @@ public:
|
||||
*
|
||||
* \see init_messaging_service_part
|
||||
*/
|
||||
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service);
|
||||
future<> join_cluster(cql3::query_processor& qp, raft_group0_client& client, cdc::generation_service& cdc_gen_service,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
|
||||
future<> drain_on_shutdown();
|
||||
|
||||
@@ -358,6 +358,7 @@ private:
|
||||
bool is_replacing();
|
||||
bool is_first_node();
|
||||
future<> join_token_ring(cdc::generation_service& cdc_gen_service,
|
||||
sharded<db::system_distributed_keyspace>& sys_dist_ks,
|
||||
std::unordered_set<gms::inet_address> initial_contact_nodes,
|
||||
std::unordered_set<gms::inet_address> loaded_endpoints,
|
||||
std::unordered_map<gms::inet_address, sstring> loaded_peer_features,
|
||||
@@ -369,7 +370,7 @@ public:
|
||||
|
||||
private:
|
||||
void set_mode(mode m);
|
||||
future<> mark_existing_views_as_built();
|
||||
future<> mark_existing_views_as_built(sharded<db::system_distributed_keyspace>&);
|
||||
|
||||
// Stream data for which we become a new replica.
|
||||
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens
|
||||
@@ -500,7 +501,6 @@ private:
|
||||
private:
|
||||
// Should be serialized under token_metadata_lock.
|
||||
future<> replicate_to_all_cores(mutable_token_metadata_ptr tmptr) noexcept;
|
||||
sharded<db::system_distributed_keyspace>& _sys_dist_ks;
|
||||
sharded<db::system_keyspace>& _sys_ks;
|
||||
locator::snitch_signal_slot_t _snitch_reconfigure;
|
||||
std::unordered_set<gms::inet_address> _replacing_nodes_pending_ranges_updater;
|
||||
|
||||
@@ -707,7 +707,7 @@ public:
|
||||
sscfg.available_memory = memory::stats().total_memory();
|
||||
ss.start(std::ref(abort_sources), std::ref(db),
|
||||
std::ref(gossiper),
|
||||
std::ref(sys_dist_ks), std::ref(sys_ks),
|
||||
std::ref(sys_ks),
|
||||
std::ref(feature_service), sscfg, std::ref(mm),
|
||||
std::ref(token_metadata), std::ref(erm_factory), std::ref(ms),
|
||||
std::ref(repair),
|
||||
@@ -782,7 +782,7 @@ public:
|
||||
});
|
||||
|
||||
try {
|
||||
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local()).get();
|
||||
ss.local().join_cluster(qp.local(), group0_client, cdc_generation_service.local(), sys_dist_ks).get();
|
||||
} catch (std::exception& e) {
|
||||
// if any of the defers crashes too, we'll never see
|
||||
// the error
|
||||
|
||||
Reference in New Issue
Block a user