mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
main: allow setting the global batchlog_manager
As a prerequisite to globalizing the batchlog_manager, allow setting a global pointer to it and instantiate the sharded<db::batchlog_manager> on the main/cql_test_env stack. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -434,5 +434,5 @@ inet_address_vector_replica_set db::batchlog_manager::endpoint_filter(const sstr
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
distributed<db::batchlog_manager> db::_the_batchlog_manager;
|
||||
static distributed<db::batchlog_manager> _the_global_batchlog_manager;
|
||||
distributed<db::batchlog_manager>* db::_the_batchlog_manager = &_the_global_batchlog_manager;
|
||||
|
||||
@@ -126,14 +126,24 @@ private:
|
||||
future<> batchlog_replay_loop();
|
||||
};
|
||||
|
||||
extern distributed<batchlog_manager> _the_batchlog_manager;
|
||||
extern distributed<batchlog_manager>* _the_batchlog_manager;
|
||||
|
||||
// DEPRECATED, DON'T USE!
|
||||
// Pass references to services through constructor/function parameters. Don't use globals.
|
||||
inline void set_the_batchlog_manager(distributed<batchlog_manager>* bm) {
|
||||
_the_batchlog_manager = bm;
|
||||
}
|
||||
|
||||
// DEPRECATED, DON'T USE!
|
||||
// Pass references to services through constructor/function parameters. Don't use globals.
|
||||
inline distributed<batchlog_manager>& get_batchlog_manager() {
|
||||
return _the_batchlog_manager;
|
||||
return *_the_batchlog_manager;
|
||||
}
|
||||
|
||||
// DEPRECATED, DON'T USE!
|
||||
// Pass references to services through constructor/function parameters. Don't use globals.
|
||||
inline batchlog_manager& get_local_batchlog_manager() {
|
||||
return _the_batchlog_manager.local();
|
||||
return _the_batchlog_manager->local();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
17
main.cc
17
main.cc
@@ -482,6 +482,7 @@ int main(int ac, char** av) {
|
||||
sharded<db::snapshot_ctl> snapshot_ctl;
|
||||
sharded<netw::messaging_service> messaging;
|
||||
sharded<cql3::query_processor> qp;
|
||||
sharded<db::batchlog_manager> bm;
|
||||
sharded<semaphore> sst_dir_semaphore;
|
||||
sharded<service::raft_group_registry> raft_gr;
|
||||
sharded<service::memory_limiter> service_memory_limiter;
|
||||
@@ -513,7 +514,7 @@ int main(int ac, char** av) {
|
||||
|
||||
tcp_syncookies_sanity();
|
||||
|
||||
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &mm_notifier, &ctx, &opts, &dirs,
|
||||
return seastar::async([cfg, ext, &db, &qp, &bm, &proxy, &mm, &mm_notifier, &ctx, &opts, &dirs,
|
||||
&prometheus_server, &cf_cache_hitrate_calculator, &load_meter, &feature_service,
|
||||
&token_metadata, &erm_factory, &snapshot_ctl, &messaging, &sst_dir_semaphore, &raft_gr, &service_memory_limiter,
|
||||
&repair, &sst_loader, &ss, &lifecycle_notifier] {
|
||||
@@ -933,8 +934,10 @@ int main(int ac, char** av) {
|
||||
bm_cfg.replay_rate = cfg->batchlog_replay_throttle_in_kb() * 1000;
|
||||
bm_cfg.delay = std::chrono::milliseconds(cfg->ring_delay_ms());
|
||||
|
||||
db::get_batchlog_manager().start(std::ref(qp), bm_cfg).get();
|
||||
// #293 - do not stop anything
|
||||
bm.start(std::ref(qp), bm_cfg).get();
|
||||
// FIXME: until we deglobalize the storage_proxy
|
||||
db::set_the_batchlog_manager(&bm);
|
||||
|
||||
sstables::init_metrics().get();
|
||||
|
||||
db::system_keyspace::minimal_setup(qp);
|
||||
@@ -1225,11 +1228,13 @@ int main(int ac, char** av) {
|
||||
});
|
||||
|
||||
supervisor::notify("starting batchlog manager");
|
||||
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
|
||||
bm.invoke_on_all([] (db::batchlog_manager& b) {
|
||||
return b.start();
|
||||
}).get();
|
||||
auto stop_batchlog_manager = defer_verbose_shutdown("batchlog manager", [] {
|
||||
db::get_batchlog_manager().invoke_on_all(&db::batchlog_manager::stop).get();
|
||||
auto stop_batchlog_manager = defer_verbose_shutdown("batchlog manager", [&bm] {
|
||||
bm.stop().get();
|
||||
// FIXME: until we deglobalize the storage_proxy
|
||||
db::set_the_batchlog_manager(nullptr);
|
||||
});
|
||||
|
||||
supervisor::notify("starting load meter");
|
||||
|
||||
@@ -554,7 +554,6 @@ public:
|
||||
|
||||
distributed<service::storage_proxy>& proxy = service::get_storage_proxy();
|
||||
distributed<service::migration_manager> mm;
|
||||
distributed<db::batchlog_manager>& bm = db::get_batchlog_manager();
|
||||
sharded<cql3::cql_config> cql_config;
|
||||
cql_config.start(cql3::cql_config::default_tag{}).get();
|
||||
auto stop_cql_config = defer([&] { cql_config.stop().get(); });
|
||||
@@ -642,8 +641,14 @@ public:
|
||||
db::batchlog_manager_config bmcfg;
|
||||
bmcfg.replay_rate = 100000000;
|
||||
bmcfg.write_request_timeout = 2s;
|
||||
distributed<db::batchlog_manager> bm;
|
||||
bm.start(std::ref(qp), bmcfg).get();
|
||||
auto stop_bm = defer([&bm] { bm.stop().get(); });
|
||||
// FIXME: until we deglobalize the storage_proxy
|
||||
db::set_the_batchlog_manager(&bm);
|
||||
auto stop_bm = defer([&bm] {
|
||||
bm.stop().get();
|
||||
db::set_the_batchlog_manager(nullptr);
|
||||
});
|
||||
|
||||
distributed_loader::init_system_keyspace(db, ss, *cfg).get();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user