diff --git a/main.cc b/main.cc index 632db5586d..93ad0719b5 100644 --- a/main.cc +++ b/main.cc @@ -930,7 +930,7 @@ int main(int ac, char** av) { // engine().at_exit([&proxy] { return proxy.stop(); }); supervisor::notify("starting migration manager"); debug::the_migration_manager = &mm; - mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(gossiper), std::ref(raft_gr)).get(); + mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get(); auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] { mm.stop().get(); }); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index bf57865331..8dc62452af 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -75,10 +75,10 @@ static logging::logger mlogger("migration_manager"); using namespace std::chrono_literals; const std::chrono::milliseconds migration_manager::migration_delay = 60000ms; -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms); +static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp); -migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, gms::gossiper& gossiper, service::raft_group_registry& raft_gr) : - _notifier(notifier), _feat(feat), _messaging(ms), _gossiper(gossiper), _raft_gr(raft_gr) +migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group_registry& raft_gr) : + _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr) , _schema_push([this] { return passive_announce(); }) { } @@ -116,7 +116,7 @@ void migration_manager::init_messaging_service() //FIXME: future discarded. (void)with_gate(_background_tasks, [this] { mlogger.debug("features changed, recalculating schema version"); - return db::schema_tables::recalculate_schema_version(get_storage_proxy(), _feat); + return db::schema_tables::recalculate_schema_version(_storage_proxy.container(), _feat); }); }; @@ -132,11 +132,11 @@ void migration_manager::init_messaging_service() auto src = netw::messaging_service::get_source(cinfo); auto f = make_ready_future<>(); if (cm) { - f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [this, src] (const std::vector& mutations, shared_ptr& p) { + f = do_with(std::move(*cm), [this, src] (const std::vector& mutations) { return merge_schema_in_background(src, mutations); }); } else { - f = do_with(std::move(fm), get_local_shared_storage_proxy(), [this, src] (const std::vector& mutations, shared_ptr& p) { + f = do_with(std::move(fm), [this, src] (const std::vector& mutations) { return merge_schema_in_background(src, mutations); }); } @@ -156,7 +156,7 @@ void migration_manager::init_messaging_service() const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval; auto features = _feat.cluster_schema_features(); - auto& proxy = get_storage_proxy(); + auto& proxy = _storage_proxy.container(); return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector&& cm) { const auto& db = proxy.local().get_db().local(); if (cm_retval_supported) { @@ -166,15 +166,12 @@ void migration_manager::init_messaging_service() return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); })); return make_ready_future>(rpc::tuple(std::move(fm), std::move(cm))); - }).finally([p = get_local_shared_storage_proxy()] { - // keep local proxy alive }); }); - _messaging.register_schema_check([] { - return make_ready_future(service::get_local_storage_proxy().get_db().local().get_version()); + _messaging.register_schema_check([this] { + return make_ready_future(_storage_proxy.get_db().local().get_version()); }); _messaging.register_get_schema_version([this] (unsigned shard, table_schema_version v) { - get_local_storage_proxy().get_stats().replica_cross_shard_ops += shard != this_shard_id(); // FIXME: should this get an smp_service_group? Probably one separate from reads and writes. return container().invoke_on(shard, [v] (auto&& sp) { mlogger.debug("Schema version request for {}", v); @@ -221,7 +218,7 @@ bool migration_manager::have_schema_agreement() { // Us. return true; } - auto our_version = get_local_storage_proxy().get_db().local().get_version(); + auto our_version = _storage_proxy.get_db().local().get_version(); bool match = false; for (auto& x : known_endpoints) { auto& endpoint = x.first; @@ -252,7 +249,7 @@ bool migration_manager::have_schema_agreement() { */ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) { @@ -346,8 +343,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { mlogger.debug("Applying schema mutations from {}", src); - auto& proxy = service::get_storage_proxy(); - const auto& db = proxy.local().get_db().local(); + auto& proxy = _storage_proxy; + const auto& db = proxy.get_db().local(); std::vector mutations; mutations.reserve(canonical_mutations.size()); @@ -362,7 +359,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return make_exception_future<>(std::make_exception_ptr( std::runtime_error(fmt::format("Error while applying schema mutations: {}", e)))); } - return db::schema_tables::merge_schema(proxy, _feat, std::move(mutations)); + return db::schema_tables::merge_schema(proxy.container(), _feat, std::move(mutations)); } future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& mutations) @@ -370,7 +367,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr mlogger.debug("Applying schema mutations from {}", src); return map_reduce(mutations, [this, src](const frozen_mutation& fm) { // schema table's schema is not syncable so just use get_schema_definition() - return get_schema_definition(fm.schema_version(), src, _messaging).then([&fm](schema_ptr s) { + return get_schema_definition(fm.schema_version(), src, _messaging, _storage_proxy).then([&fm](schema_ptr s) { s->registry_entry()->mark_synced(); return fm.unfreeze(std::move(s)); }); @@ -378,7 +375,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr all.emplace_back(std::move(m)); return std::move(all); }).then([this](std::vector schema) { - return db::schema_tables::merge_schema(get_storage_proxy(), _feat, std::move(schema)); + return db::schema_tables::merge_schema(_storage_proxy.container(), _feat, std::move(schema)); }); } @@ -624,7 +621,7 @@ public void notifyDropAggregate(UDAggregate udf) #endif std::vector migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr ksm) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_keyspace_update(*ksm); @@ -642,7 +639,7 @@ future<>migration_manager::announce_new_keyspace(lw_shared_ptr migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type timestamp) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_new_keyspace(*ksm); @@ -672,7 +669,7 @@ future<> migration_manager::include_keyspace_and_announce( future> migration_manager::include_keyspace( const keyspace_metadata& keyspace, std::vector mutations) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - mutation m = co_await db::schema_tables::read_keyspace_mutation(service::get_storage_proxy(), keyspace.name()); + mutation m = co_await db::schema_tables::read_keyspace_mutation(_storage_proxy.container(), keyspace.name()); mutations.push_back(std::move(m)); co_return std::move(mutations); } @@ -686,7 +683,7 @@ future> migration_manager::prepare_new_column_family_annou cfm.validate(); #endif try { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(cfm->ks_name()); if (db.has_schema(cfm->ks_name(), cfm->cf_name())) { throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name()); @@ -717,7 +714,7 @@ future> migration_manager::prepare_column_family_update_an #endif try { auto ts = ts_opt.value_or(api::new_timestamp()); - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id? #if 0 oldCfm.validateCompatility(cfm); @@ -746,7 +743,7 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f } future> migration_manager::do_prepare_new_type_announcement(user_type new_type) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(new_type->_keyspace); auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); @@ -763,28 +760,28 @@ future> migration_manager::prepare_update_type_announcemen } future> migration_manager::prepare_new_function_announcement(shared_ptr func) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); auto mutations = db::schema_tables::make_create_function_mutations(func, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_function_drop_announcement(shared_ptr func) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); auto mutations = db::schema_tables::make_drop_function_mutations(func, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_new_aggregate_announcement(shared_ptr aggregate) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); auto mutations = db::schema_tables::make_create_aggregate_mutations(aggregate, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_aggregate_drop_announcement(shared_ptr aggregate) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); auto mutations = db::schema_tables::make_drop_aggregate_mutations(aggregate, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); @@ -830,7 +827,7 @@ public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift #endif std::vector migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); if (!db.has_keyspace(ks_name)) { throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name)); } @@ -900,7 +897,7 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name, } future> migration_manager::prepare_type_drop_announcement(user_type dropped_type) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(dropped_type->_keyspace); mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string()); auto mutations = @@ -912,7 +909,7 @@ future> migration_manager::prepare_new_view_announcement(v #if 0 view.metadata.validate(); #endif - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); try { auto&& keyspace = db.find_keyspace(view->ks_name()).metadata(); if (keyspace->cf_meta_data().contains(view->cf_name())) { @@ -934,7 +931,7 @@ future> migration_manager::prepare_view_update_announcemen #if 0 view.metadata.validate(); #endif - auto db = get_local_storage_proxy().data_dictionary(); + auto db = _storage_proxy.data_dictionary(); try { auto&& keyspace = db.find_keyspace(view->ks_name()).metadata(); auto& old_view = keyspace->cf_meta_data().at(view->cf_name()); @@ -954,7 +951,7 @@ future> migration_manager::prepare_view_update_announcemen } future> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); try { auto& view = db.find_column_family(ks_name, cf_name).schema(); if (!view->is_view()) { @@ -1011,7 +1008,7 @@ future<> migration_manager::announce(std::vector schema) { }; co_return co_await container().invoke_on(0, func); } else { - auto f = db::schema_tables::merge_schema(get_storage_proxy(), _feat, schema); + auto f = db::schema_tables::merge_schema(_storage_proxy.container(), _feat, schema); try { using namespace std::placeholders; @@ -1154,11 +1151,11 @@ future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_serv // Returns schema of given version, either from cache or from remote node identified by 'from'. // Doesn't affect current node's schema in any way. -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms) { - return local_schema_registry().get_or_load(v, [&ms, dst] (table_schema_version v) { +static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& storage_proxy) { + return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst] (table_schema_version v) { mlogger.debug("Requesting schema {} from {}", v, dst); - return ms.send_get_schema_version(dst, v).then([] (frozen_schema s) { - auto& proxy = get_storage_proxy(); + return ms.send_get_schema_version(dst, v).then([&storage_proxy] (frozen_schema s) { + auto& proxy = storage_proxy.container(); // Since the latest schema version is always present in the schema registry // we only happen to query already outdated schema version, which is // referenced by the incoming request. @@ -1179,12 +1176,12 @@ static future get_schema_definition(table_schema_version v, netw::me return frozen_schema{us}; }); }); - }).then([] (schema_ptr s) { + }).then([&storage_proxy] (schema_ptr s) { // If this is a view so this schema also needs a reference to the base // table. if (s->is_view()) { if (!s->view_info()->base_info()) { - auto& db = service::get_local_storage_proxy().local_db(); + auto& db = storage_proxy.local_db(); // This line might throw a no_such_column_family // It should be fine since if we tried to register a view for which // we don't know the base table, our registry is broken. @@ -1201,7 +1198,7 @@ future migration_manager::get_schema_for_read(table_schema_version v } future migration_manager::get_schema_for_write(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms) { - return get_schema_definition(v, dst, ms).then([this, dst] (schema_ptr s) { + return get_schema_definition(v, dst, ms, _storage_proxy).then([this, dst] (schema_ptr s) { return maybe_sync(s, dst).then([s] { return s; }); @@ -1246,7 +1243,7 @@ void migration_manager::on_change(gms::inet_address endpoint, gms::application_s mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); return; } - if (get_local_storage_proxy().get_token_metadata_ptr()->is_member(endpoint)) { + if (_storage_proxy.get_token_metadata_ptr()->is_member(endpoint)) { schedule_schema_pull(endpoint, *ep_state); } } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 5d046c5ed8..747d785a9e 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -72,6 +72,8 @@ class versioned_value; namespace service { +class storage_proxy; + template concept MergeableMutation = std::is_same::value || std::is_same::value; @@ -87,13 +89,14 @@ private: static const std::chrono::milliseconds migration_delay; gms::feature_service& _feat; netw::messaging_service& _messaging; + service::storage_proxy& _storage_proxy; gms::gossiper& _gossiper; seastar::abort_source _as; service::raft_group_registry& _raft_gr; serialized_action _schema_push; utils::UUID _schema_version_to_publish; public: - migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, gms::gossiper& gossiper, service::raft_group_registry& raft_gr); + migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr); migration_notifier& get_notifier() { return _notifier; } const migration_notifier& get_notifier() const { return _notifier; } diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index fb1a10504d..2e269446cd 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -634,7 +634,7 @@ public: proxy.start(std::ref(db), std::ref(gossiper), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory), std::ref(ms)).get(); auto stop_proxy = defer([&proxy] { proxy.stop().get(); }); - mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(gossiper), std::ref(raft_gr)).get(); + mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get(); auto stop_mm = defer([&mm] { mm.stop().get(); }); cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};