storage_service: Load tablet metadata on boot and from group0 changes

This commit is contained in:
Tomasz Grabiec
2023-03-25 20:13:09 +01:00
parent 41e69836fd
commit d42685d0cb
4 changed files with 38 additions and 0 deletions

View File

@@ -1149,6 +1149,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
qp.start(std::ref(proxy), std::ref(forward_service), std::move(local_data_dict), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::ref(group0_client), std::move(wasm_ctx)).get();
ss.invoke_on_all([&] (service::storage_service& ss) {
ss.set_query_processor(qp.local());
}).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
sstables::init_metrics().get();
@@ -1227,6 +1232,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// 3. need to check if it depends on any of the above steps
sys_ks.local().setup(snitch, messaging).get();
supervisor::notify("loading tablet metadata");
ss.local().load_tablet_metadata().get();
supervisor::notify("starting schema commit log");
// Check there is no truncation record for schema tables.

View File

@@ -53,6 +53,7 @@
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "replica/database.hh"
#include "replica/tablets.hh"
#include <seastar/core/metrics.hh>
#include "cdc/generation.hh"
#include "cdc/generation_service.hh"
@@ -4636,6 +4637,24 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
});
}
void storage_service::on_update_tablet_metadata() {
if (this_shard_id() != 0) {
// replicate_to_all_cores() takes care of other shards.
return;
}
// FIXME: Avoid reading whole tablet metadata on partial changes.
load_tablet_metadata().get();
}
future<> storage_service::load_tablet_metadata() {
if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
return make_ready_future<>();
}
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> {
tmptr->set_tablets(co_await replica::read_tablet_metadata(*_qp));
}, acquire_merge_lock::no);
}
future<> storage_service::snitch_reconfigured() {
assert(this_shard_id() == 0);
auto& snitch = _snitch.local();

View File

@@ -118,6 +118,7 @@ private:
gms::gossiper& _gossiper;
sharded<netw::messaging_service>& _messaging;
sharded<service::migration_manager>& _migration_manager;
cql3::query_processor* _qp = nullptr;
sharded<repair_service>& _repair;
sharded<streaming::stream_manager>& _stream_manager;
sharded<locator::snitch_ptr>& _snitch;
@@ -164,6 +165,7 @@ public:
void init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks);
future<> uninit_messaging_service();
future<> load_tablet_metadata();
private:
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;
@@ -268,6 +270,10 @@ public:
_protocol_servers.push_back(&server);
}
void set_query_processor(cql3::query_processor& qp) {
_qp = &qp;
}
// All pointers are valid.
const std::vector<protocol_server*>& protocol_servers() const {
return _protocol_servers;
@@ -463,6 +469,7 @@ public:
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
virtual void on_update_tablet_metadata() override;
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}

View File

@@ -804,6 +804,10 @@ public:
std::ref(snitch)).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
ss.invoke_on_all([&] (service::storage_service& ss) {
ss.set_query_processor(qp.local());
}).get();
for (const auto p: all_system_table_load_phases) {
replica::distributed_loader::init_system_keyspace(sys_ks, db, ss, gossiper, raft_gr, *cfg, p).get();
}