From c40043b1429cc04a92bcdd295ef7d2d71b79c04a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 15 Dec 2021 18:56:18 +0200 Subject: [PATCH 1/4] mm: remove stats on schema version get --- service/migration_manager.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/service/migration_manager.cc b/service/migration_manager.cc index bf57865331..a3ac71961b 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -174,7 +174,6 @@ void migration_manager::init_messaging_service() return make_ready_future(service::get_local_storage_proxy().get_db().local().get_version()); }); _messaging.register_get_schema_version([this] (unsigned shard, table_schema_version v) { - get_local_storage_proxy().get_stats().replica_cross_shard_ops += shard != this_shard_id(); // FIXME: should this get an smp_service_group? Probably one separate from reads and writes. return container().invoke_on(shard, [v] (auto&& sp) { mlogger.debug("Schema version request for {}", v); From 26c656f6ed55117b08cd877aa20304c1eb1059e6 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 15 Dec 2021 19:05:27 +0200 Subject: [PATCH 2/4] mm: don't capture storage proxy shared_ptr during background schema merge The definitions_update() verb captures a shared_ptr to storage_proxy to keep it alive while the background task executes. This was introduced in (2016!): commit 1429213b4c896d5b20a5bfa5582feb4525682c7f Author: Pekka Enberg Date: Mon Mar 14 17:57:08 2016 +0200 main: Defer migration manager RPC verb registration after commitlog replay Defer registering migration manager RPC verbs after commitlog has has been replayed so that our own schema is fully loaded before other other nodes start querying it or sending schema updates. Message-Id: <1457971028-7325-1-git-send-email-penberg@scylladb.com> when moving this code from storage_proxy.cc. Later, better protection with a gate was added: commit 14de126ff83d8322090e4190273aecf06ce64b31 Author: Pavel Emelyanov Date: Mon Mar 16 18:03:48 2020 +0300 migration_manager: Run background schema merge in gate The call for merge_schema_from in some cases is run in the background and thus is not aborted/waited on shutdown. This may result in use-after-free one of which is merge_schema_from -> read_schema_for_keyspace -> db::system_keyspace::query -> storage_proxy::query -> query_partition_key_range_concurrent in the latter function the proxy._token_metadata is accessed, while the respective object can be already free (unlike the storage_proxy itself that's still leaked on shutdown). Related bug: #5903, #5999 (cannot reproduce though) Tests: unit(dev), manual start-stop dtest(consistency.TestConsistency, dev) dtest(schema_management, dev) Signed-off-by: Pavel Emelyanov Reviewed-by: Pekka Enberg Message-Id: <20200316150348.31118-1-xemul@scylladb.com> Since now the task execution is protected by the gate and therefore migration_manager lifetime (which is contained within that of storage_proxy, as it is constructed afterwards), capturing the shared_ptr is not needed, and we therefore remove it, as it uses the deprecated global storage_proxy accessors. --- service/migration_manager.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/migration_manager.cc b/service/migration_manager.cc index a3ac71961b..2121f433fe 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -132,11 +132,11 @@ void migration_manager::init_messaging_service() auto src = netw::messaging_service::get_source(cinfo); auto f = make_ready_future<>(); if (cm) { - f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [this, src] (const std::vector& mutations, shared_ptr& p) { + f = do_with(std::move(*cm), [this, src] (const std::vector& mutations) { return merge_schema_in_background(src, mutations); }); } else { - f = do_with(std::move(fm), get_local_shared_storage_proxy(), [this, src] (const std::vector& mutations, shared_ptr& p) { + f = do_with(std::move(fm), [this, src] (const std::vector& mutations) { return merge_schema_in_background(src, mutations); }); } From aca9029c24b1549eedb884f7ff85f6560a1bf2b7 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 16 Dec 2021 19:27:17 +0200 Subject: [PATCH 3/4] migration_manager: don't keep storage_proxy alive during schema_check verb The schema_check verb doesn't leak tasks, so when the verb is unregistered it will be drained. So protection for storage_proxy lifetime can be removed. --- service/migration_manager.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 2121f433fe..0437f9f0a0 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -166,8 +166,6 @@ void migration_manager::init_messaging_service() return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); })); return make_ready_future>(rpc::tuple(std::move(fm), std::move(cm))); - }).finally([p = get_local_shared_storage_proxy()] { - // keep local proxy alive }); }); _messaging.register_schema_check([] { From a97731a7e52aa64839a127901989391e823172c4 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 16 Dec 2021 20:59:29 +0200 Subject: [PATCH 4/4] migration_manager: replace uses of get_storage_proxy and get_local_storage_proxy with constructor-provided reference A static helper also gained a storage_proxy parameter. --- main.cc | 2 +- service/migration_manager.cc | 76 ++++++++++++++++++------------------ service/migration_manager.hh | 5 ++- test/lib/cql_test_env.cc | 2 +- 4 files changed, 44 insertions(+), 41 deletions(-) diff --git a/main.cc b/main.cc index 632db5586d..93ad0719b5 100644 --- a/main.cc +++ b/main.cc @@ -930,7 +930,7 @@ int main(int ac, char** av) { // engine().at_exit([&proxy] { return proxy.stop(); }); supervisor::notify("starting migration manager"); debug::the_migration_manager = &mm; - mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(gossiper), std::ref(raft_gr)).get(); + mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get(); auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] { mm.stop().get(); }); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 0437f9f0a0..8dc62452af 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -75,10 +75,10 @@ static logging::logger mlogger("migration_manager"); using namespace std::chrono_literals; const std::chrono::milliseconds migration_manager::migration_delay = 60000ms; -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms); +static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& sp); -migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, gms::gossiper& gossiper, service::raft_group_registry& raft_gr) : - _notifier(notifier), _feat(feat), _messaging(ms), _gossiper(gossiper), _raft_gr(raft_gr) +migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms, service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group_registry& raft_gr) : + _notifier(notifier), _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _raft_gr(raft_gr) , _schema_push([this] { return passive_announce(); }) { } @@ -116,7 +116,7 @@ void migration_manager::init_messaging_service() //FIXME: future discarded. (void)with_gate(_background_tasks, [this] { mlogger.debug("features changed, recalculating schema version"); - return db::schema_tables::recalculate_schema_version(get_storage_proxy(), _feat); + return db::schema_tables::recalculate_schema_version(_storage_proxy.container(), _feat); }); }; @@ -156,7 +156,7 @@ void migration_manager::init_messaging_service() const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval; auto features = _feat.cluster_schema_features(); - auto& proxy = get_storage_proxy(); + auto& proxy = _storage_proxy.container(); return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector&& cm) { const auto& db = proxy.local().get_db().local(); if (cm_retval_supported) { @@ -168,8 +168,8 @@ void migration_manager::init_messaging_service() return make_ready_future>(rpc::tuple(std::move(fm), std::move(cm))); }); }); - _messaging.register_schema_check([] { - return make_ready_future(service::get_local_storage_proxy().get_db().local().get_version()); + _messaging.register_schema_check([this] { + return make_ready_future(_storage_proxy.get_db().local().get_version()); }); _messaging.register_get_schema_version([this] (unsigned shard, table_schema_version v) { // FIXME: should this get an smp_service_group? Probably one separate from reads and writes. @@ -218,7 +218,7 @@ bool migration_manager::have_schema_agreement() { // Us. return true; } - auto our_version = get_local_storage_proxy().get_db().local().get_version(); + auto our_version = _storage_proxy.get_db().local().get_version(); bool match = false; for (auto& x : known_endpoints) { auto& endpoint = x.first; @@ -249,7 +249,7 @@ bool migration_manager::have_schema_agreement() { */ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); if (db.get_version() == their_version || !should_pull_schema_from(endpoint)) { @@ -343,8 +343,8 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { mlogger.debug("Applying schema mutations from {}", src); - auto& proxy = service::get_storage_proxy(); - const auto& db = proxy.local().get_db().local(); + auto& proxy = _storage_proxy; + const auto& db = proxy.get_db().local(); std::vector mutations; mutations.reserve(canonical_mutations.size()); @@ -359,7 +359,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return make_exception_future<>(std::make_exception_ptr( std::runtime_error(fmt::format("Error while applying schema mutations: {}", e)))); } - return db::schema_tables::merge_schema(proxy, _feat, std::move(mutations)); + return db::schema_tables::merge_schema(proxy.container(), _feat, std::move(mutations)); } future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& mutations) @@ -367,7 +367,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr mlogger.debug("Applying schema mutations from {}", src); return map_reduce(mutations, [this, src](const frozen_mutation& fm) { // schema table's schema is not syncable so just use get_schema_definition() - return get_schema_definition(fm.schema_version(), src, _messaging).then([&fm](schema_ptr s) { + return get_schema_definition(fm.schema_version(), src, _messaging, _storage_proxy).then([&fm](schema_ptr s) { s->registry_entry()->mark_synced(); return fm.unfreeze(std::move(s)); }); @@ -375,7 +375,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr all.emplace_back(std::move(m)); return std::move(all); }).then([this](std::vector schema) { - return db::schema_tables::merge_schema(get_storage_proxy(), _feat, std::move(schema)); + return db::schema_tables::merge_schema(_storage_proxy.container(), _feat, std::move(schema)); }); } @@ -621,7 +621,7 @@ public void notifyDropAggregate(UDAggregate udf) #endif std::vector migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr ksm) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_keyspace_update(*ksm); @@ -639,7 +639,7 @@ future<>migration_manager::announce_new_keyspace(lw_shared_ptr migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type timestamp) { - auto& proxy = get_local_storage_proxy(); + auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_new_keyspace(*ksm); @@ -669,7 +669,7 @@ future<> migration_manager::include_keyspace_and_announce( future> migration_manager::include_keyspace( const keyspace_metadata& keyspace, std::vector mutations) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). - mutation m = co_await db::schema_tables::read_keyspace_mutation(service::get_storage_proxy(), keyspace.name()); + mutation m = co_await db::schema_tables::read_keyspace_mutation(_storage_proxy.container(), keyspace.name()); mutations.push_back(std::move(m)); co_return std::move(mutations); } @@ -683,7 +683,7 @@ future> migration_manager::prepare_new_column_family_annou cfm.validate(); #endif try { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(cfm->ks_name()); if (db.has_schema(cfm->ks_name(), cfm->cf_name())) { throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name()); @@ -714,7 +714,7 @@ future> migration_manager::prepare_column_family_update_an #endif try { auto ts = ts_opt.value_or(api::new_timestamp()); - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id? #if 0 oldCfm.validateCompatility(cfm); @@ -743,7 +743,7 @@ future<> migration_manager::announce_column_family_update(schema_ptr cfm, bool f } future> migration_manager::do_prepare_new_type_announcement(user_type new_type) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(new_type->_keyspace); auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); @@ -760,28 +760,28 @@ future> migration_manager::prepare_update_type_announcemen } future> migration_manager::prepare_new_function_announcement(shared_ptr func) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); auto mutations = db::schema_tables::make_create_function_mutations(func, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_function_drop_announcement(shared_ptr func) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); auto mutations = db::schema_tables::make_drop_function_mutations(func, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_new_aggregate_announcement(shared_ptr aggregate) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); auto mutations = db::schema_tables::make_create_aggregate_mutations(aggregate, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } future> migration_manager::prepare_aggregate_drop_announcement(shared_ptr aggregate) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); auto mutations = db::schema_tables::make_drop_aggregate_mutations(aggregate, api::new_timestamp()); return include_keyspace(*keyspace.metadata(), std::move(mutations)); @@ -827,7 +827,7 @@ public static void announceColumnFamilyUpdate(CFMetaData cfm, boolean fromThrift #endif std::vector migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); if (!db.has_keyspace(ks_name)) { throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name)); } @@ -897,7 +897,7 @@ future<> migration_manager::announce_column_family_drop(const sstring& ks_name, } future> migration_manager::prepare_type_drop_announcement(user_type dropped_type) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(dropped_type->_keyspace); mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string()); auto mutations = @@ -909,7 +909,7 @@ future> migration_manager::prepare_new_view_announcement(v #if 0 view.metadata.validate(); #endif - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); try { auto&& keyspace = db.find_keyspace(view->ks_name()).metadata(); if (keyspace->cf_meta_data().contains(view->cf_name())) { @@ -931,7 +931,7 @@ future> migration_manager::prepare_view_update_announcemen #if 0 view.metadata.validate(); #endif - auto db = get_local_storage_proxy().data_dictionary(); + auto db = _storage_proxy.data_dictionary(); try { auto&& keyspace = db.find_keyspace(view->ks_name()).metadata(); auto& old_view = keyspace->cf_meta_data().at(view->cf_name()); @@ -951,7 +951,7 @@ future> migration_manager::prepare_view_update_announcemen } future> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name) { - auto& db = get_local_storage_proxy().get_db().local(); + auto& db = _storage_proxy.get_db().local(); try { auto& view = db.find_column_family(ks_name, cf_name).schema(); if (!view->is_view()) { @@ -1008,7 +1008,7 @@ future<> migration_manager::announce(std::vector schema) { }; co_return co_await container().invoke_on(0, func); } else { - auto f = db::schema_tables::merge_schema(get_storage_proxy(), _feat, schema); + auto f = db::schema_tables::merge_schema(_storage_proxy.container(), _feat, schema); try { using namespace std::placeholders; @@ -1151,11 +1151,11 @@ future<> migration_manager::maybe_sync(const schema_ptr& s, netw::messaging_serv // Returns schema of given version, either from cache or from remote node identified by 'from'. // Doesn't affect current node's schema in any way. -static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms) { - return local_schema_registry().get_or_load(v, [&ms, dst] (table_schema_version v) { +static future get_schema_definition(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms, service::storage_proxy& storage_proxy) { + return local_schema_registry().get_or_load(v, [&ms, &storage_proxy, dst] (table_schema_version v) { mlogger.debug("Requesting schema {} from {}", v, dst); - return ms.send_get_schema_version(dst, v).then([] (frozen_schema s) { - auto& proxy = get_storage_proxy(); + return ms.send_get_schema_version(dst, v).then([&storage_proxy] (frozen_schema s) { + auto& proxy = storage_proxy.container(); // Since the latest schema version is always present in the schema registry // we only happen to query already outdated schema version, which is // referenced by the incoming request. @@ -1176,12 +1176,12 @@ static future get_schema_definition(table_schema_version v, netw::me return frozen_schema{us}; }); }); - }).then([] (schema_ptr s) { + }).then([&storage_proxy] (schema_ptr s) { // If this is a view so this schema also needs a reference to the base // table. if (s->is_view()) { if (!s->view_info()->base_info()) { - auto& db = service::get_local_storage_proxy().local_db(); + auto& db = storage_proxy.local_db(); // This line might throw a no_such_column_family // It should be fine since if we tried to register a view for which // we don't know the base table, our registry is broken. @@ -1198,7 +1198,7 @@ future migration_manager::get_schema_for_read(table_schema_version v } future migration_manager::get_schema_for_write(table_schema_version v, netw::messaging_service::msg_addr dst, netw::messaging_service& ms) { - return get_schema_definition(v, dst, ms).then([this, dst] (schema_ptr s) { + return get_schema_definition(v, dst, ms, _storage_proxy).then([this, dst] (schema_ptr s) { return maybe_sync(s, dst).then([s] { return s; }); @@ -1243,7 +1243,7 @@ void migration_manager::on_change(gms::inet_address endpoint, gms::application_s mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); return; } - if (get_local_storage_proxy().get_token_metadata_ptr()->is_member(endpoint)) { + if (_storage_proxy.get_token_metadata_ptr()->is_member(endpoint)) { schedule_schema_pull(endpoint, *ep_state); } } diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 5d046c5ed8..747d785a9e 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -72,6 +72,8 @@ class versioned_value; namespace service { +class storage_proxy; + template concept MergeableMutation = std::is_same::value || std::is_same::value; @@ -87,13 +89,14 @@ private: static const std::chrono::milliseconds migration_delay; gms::feature_service& _feat; netw::messaging_service& _messaging; + service::storage_proxy& _storage_proxy; gms::gossiper& _gossiper; seastar::abort_source _as; service::raft_group_registry& _raft_gr; serialized_action _schema_push; utils::UUID _schema_version_to_publish; public: - migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, gms::gossiper& gossiper, service::raft_group_registry& raft_gr); + migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group_registry& raft_gr); migration_notifier& get_notifier() { return _notifier; } const migration_notifier& get_notifier() const { return _notifier; } diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index fb1a10504d..2e269446cd 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -634,7 +634,7 @@ public: proxy.start(std::ref(db), std::ref(gossiper), spcfg, std::ref(b), scheduling_group_key_create(sg_conf).get0(), std::ref(feature_service), std::ref(token_metadata), std::ref(erm_factory), std::ref(ms)).get(); auto stop_proxy = defer([&proxy] { proxy.stop().get(); }); - mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(gossiper), std::ref(raft_gr)).get(); + mm.start(std::ref(mm_notif), std::ref(feature_service), std::ref(ms), std::ref(proxy), std::ref(gossiper), std::ref(raft_gr)).get(); auto stop_mm = defer([&mm] { mm.stop().get(); }); cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};