storage_service: Load tablet metadata on boot and from group0 changes
This commit is contained in:
8
main.cc
8
main.cc
@@ -1149,6 +1149,11 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
auth_prep_cache_config.refresh = std::chrono::milliseconds(cfg->permissions_update_interval_in_ms());
|
||||
|
||||
qp.start(std::ref(proxy), std::ref(forward_service), std::move(local_data_dict), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::move(auth_prep_cache_config), std::ref(group0_client), std::move(wasm_ctx)).get();
|
||||
|
||||
ss.invoke_on_all([&] (service::storage_service& ss) {
|
||||
ss.set_query_processor(qp.local());
|
||||
}).get();
|
||||
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
sstables::init_metrics().get();
|
||||
@@ -1227,6 +1232,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// 3. need to check if it depends on any of the above steps
|
||||
sys_ks.local().setup(snitch, messaging).get();
|
||||
|
||||
supervisor::notify("loading tablet metadata");
|
||||
ss.local().load_tablet_metadata().get();
|
||||
|
||||
supervisor::notify("starting schema commit log");
|
||||
|
||||
// Check there is no truncation record for schema tables.
|
||||
|
||||
@@ -53,6 +53,7 @@
|
||||
#include "db/config.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/tablets.hh"
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include "cdc/generation.hh"
|
||||
#include "cdc/generation_service.hh"
|
||||
@@ -4636,6 +4637,24 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
|
||||
});
|
||||
}
|
||||
|
||||
void storage_service::on_update_tablet_metadata() {
|
||||
if (this_shard_id() != 0) {
|
||||
// replicate_to_all_cores() takes care of other shards.
|
||||
return;
|
||||
}
|
||||
// FIXME: Avoid reading whole tablet metadata on partial changes.
|
||||
load_tablet_metadata().get();
|
||||
}
|
||||
|
||||
future<> storage_service::load_tablet_metadata() {
|
||||
if (!_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return mutate_token_metadata([this] (mutable_token_metadata_ptr tmptr) -> future<> {
|
||||
tmptr->set_tablets(co_await replica::read_tablet_metadata(*_qp));
|
||||
}, acquire_merge_lock::no);
|
||||
}
|
||||
|
||||
future<> storage_service::snitch_reconfigured() {
|
||||
assert(this_shard_id() == 0);
|
||||
auto& snitch = _snitch.local();
|
||||
|
||||
@@ -118,6 +118,7 @@ private:
|
||||
gms::gossiper& _gossiper;
|
||||
sharded<netw::messaging_service>& _messaging;
|
||||
sharded<service::migration_manager>& _migration_manager;
|
||||
cql3::query_processor* _qp = nullptr;
|
||||
sharded<repair_service>& _repair;
|
||||
sharded<streaming::stream_manager>& _stream_manager;
|
||||
sharded<locator::snitch_ptr>& _snitch;
|
||||
@@ -164,6 +165,7 @@ public:
|
||||
void init_messaging_service(sharded<service::storage_proxy>& proxy, sharded<db::system_distributed_keyspace>& sys_dist_ks);
|
||||
future<> uninit_messaging_service();
|
||||
|
||||
future<> load_tablet_metadata();
|
||||
private:
|
||||
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;
|
||||
|
||||
@@ -268,6 +270,10 @@ public:
|
||||
_protocol_servers.push_back(&server);
|
||||
}
|
||||
|
||||
void set_query_processor(cql3::query_processor& qp) {
|
||||
_qp = &qp;
|
||||
}
|
||||
|
||||
// All pointers are valid.
|
||||
const std::vector<protocol_server*>& protocol_servers() const {
|
||||
return _protocol_servers;
|
||||
@@ -463,6 +469,7 @@ public:
|
||||
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
|
||||
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
|
||||
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
|
||||
virtual void on_update_tablet_metadata() override;
|
||||
|
||||
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
|
||||
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
|
||||
|
||||
@@ -804,6 +804,10 @@ public:
|
||||
std::ref(snitch)).get();
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
ss.invoke_on_all([&] (service::storage_service& ss) {
|
||||
ss.set_query_processor(qp.local());
|
||||
}).get();
|
||||
|
||||
for (const auto p: all_system_table_load_phases) {
|
||||
replica::distributed_loader::init_system_keyspace(sys_ks, db, ss, gossiper, raft_gr, *cfg, p).get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user