From e02b93cae18570713a50cdc38501dcb1c5c885fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 21 Aug 2019 16:15:07 +0300 Subject: [PATCH 1/3] schema_tables: convert_schema_to_mutations: return canonical_mutations In preparation to the schema push/pull migrating to use canonical mutations, convert the method producing the schema mutations to return a vector of canonical mutations. The only user, MIGRATION_REQUEST verb, converts the canonical mutations back to frozen mutations. This is very inefficient, but this path will only be used in mixed clusters. After all nodes are upgraded the verb will be sending the canonical mutations directly instead. --- db/schema_tables.cc | 8 ++++---- db/schema_tables.hh | 2 +- service/migration_manager.cc | 8 +++++++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 8dc3da15ed..3cb3a25555 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -642,19 +642,19 @@ future calculate_schema_digest(distributed& }); } -future> convert_schema_to_mutations(distributed& proxy, schema_features features) +future> convert_schema_to_mutations(distributed& proxy, schema_features features) { auto map = [&proxy] (sstring table) { return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) { auto s = proxy.local().get_db().local().find_schema(NAME, table); - std::vector results; + std::vector results; for (auto&& p : rs->partitions()) { auto mut = p.mut().unfreeze(s); auto partition_key = value_cast(utf8_type->deserialize(mut.key().get_component(*s, 0))); if (is_system_keyspace(partition_key)) { continue; } - results.emplace_back(std::move(p.mut())); + results.emplace_back(mut); } return results; }); @@ -663,7 +663,7 @@ future> convert_schema_to_mutations(distributed{}, reduce); + return map_reduce(all_table_names(features), map, std::vector{}, reduce); } future diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 640ad8f5aa..7d6ea79d19 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -152,7 +152,7 @@ future<> save_system_keyspace_schema(); future calculate_schema_digest(distributed& proxy, schema_features); -future> convert_schema_to_mutations(distributed& proxy, schema_features); +future> convert_schema_to_mutations(distributed& proxy, schema_features); future read_schema_partition_for_keyspace(distributed& proxy, const sstring& schema_table_name, const sstring& keyspace_name); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 8ed30eb531..70d1758cf1 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -116,7 +116,13 @@ void migration_manager::init_messaging_service() return make_ready_future>(std::vector()); } auto features = get_local_storage_service().cluster_schema_features(); - return db::schema_tables::convert_schema_to_mutations(get_storage_proxy(), features).finally([p = get_local_shared_storage_proxy()] { + auto& proxy = get_storage_proxy(); + return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy] (std::vector&& schema_mutations) { + const auto& db = proxy.local().get_db().local(); + return boost::copy_range>(schema_mutations | boost::adaptors::transformed([&db] (const canonical_mutation& cm) { + return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); + })); + }).finally([p = get_local_shared_storage_proxy()] { // keep local proxy alive }); }); From d9a8ff15d832052df37431cd69f938473fd4cfcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 21 Aug 2019 16:17:15 +0300 Subject: [PATCH 2/3] service::migration_manager: add canonical_mutation merge_schema_from() overload Add an overload which takes a vector of canonical mutations. Going forward, this is the overload to use. --- service/migration_manager.cc | 20 ++++++++++++++++++++ service/migration_manager.hh | 2 ++ 2 files changed, 22 insertions(+) diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 70d1758cf1..60499ac455 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -278,6 +278,26 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return i->second.trigger(); } +future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { + mlogger.debug("Applying schema mutations from {}", src); + auto& proxy = service::get_storage_proxy(); + const auto& db = proxy.local().get_db().local(); + + std::vector mutations; + mutations.reserve(canonical_mutations.size()); + try { + for (const auto& cm : canonical_mutations) { + auto& tbl = db.find_column_family(cm.column_family_id()); + mutations.emplace_back(cm.to_mutation(tbl.schema())); + } + } catch (no_such_column_family& e) { + mlogger.error("Error while applying schema mutations from {}: {}", src, e); + return make_exception_future<>(std::make_exception_ptr( + std::runtime_error(fmt::format("Error while applying schema mutations: {}", e)))); + } + return db::schema_tables::merge_schema(service::get_local_storage_service(), proxy, std::move(mutations)); +} + future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& mutations) { mlogger.debug("Applying schema mutations from {}", src); diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 484def3a8f..87cab54f3d 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -86,6 +86,8 @@ public: // Merge mutations received from src. // Keep mutations alive around whole async operation. + future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); + // Deprecated. The canonical mutation should be used instead. future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); future<> notify_create_keyspace(const lw_shared_ptr& ksm); From 7adc764b6eff4e5a839ad387836d31d6ee92b40c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 26 Aug 2019 17:41:04 +0300 Subject: [PATCH 3/3] messaging_service: add canonical_support to schema pull and push verbs The verbs are: * DEFINITIONS_UPDATE (push) * MIGRATION_REQUEST (pull) Support was added in a backward-compatible way. The push verb, sends both the old frozen mutation parameter, and the new optional canonical mutation parameter. It is expected that new nodes will use the latter, while old nodes will fall-back to the former. The pull verb has a new optional `options` parameter, which for now contains a single flag: `remote_supports_canonical_mutation_retval`. This flag, if set, means that the remote node supports the new canonical mutation return value, thus the old frozen mutations return value can be left empty. --- configure.py | 1 + idl/messaging_service.idl.hh | 28 +++++++++++++++++++++++ message/messaging_service.cc | 18 ++++++++++----- message/messaging_service.hh | 15 +++++++++---- service/migration_manager.cc | 43 +++++++++++++++++++++++++++--------- 5 files changed, 85 insertions(+), 20 deletions(-) create mode 100644 idl/messaging_service.idl.hh diff --git a/configure.py b/configure.py index 8eba6a728f..ece836c33c 100755 --- a/configure.py +++ b/configure.py @@ -797,6 +797,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/consistency_level.idl.hh', 'idl/cache_temperature.idl.hh', 'idl/view.idl.hh', + 'idl/messaging_service.idl.hh', ] headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git']) diff --git a/idl/messaging_service.idl.hh b/idl/messaging_service.idl.hh new file mode 100644 index 0000000000..fcd8d82094 --- /dev/null +++ b/idl/messaging_service.idl.hh @@ -0,0 +1,28 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +namespace netw { + +struct schema_pull_options { + bool remote_supports_canonical_mutation_retval; +}; + +} // namespace netw diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7421e1ea00..1c3b02e737 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -60,6 +60,7 @@ #include "idl/cache_temperature.dist.hh" #include "idl/view.dist.hh" #include "idl/mutation.dist.hh" +#include "idl/messaging_service.dist.hh" #include "serializer_impl.hh" #include "serialization_visitors.hh" #include "idl/consistency_level.dist.impl.hh" @@ -80,6 +81,7 @@ #include "idl/query.dist.impl.hh" #include "idl/cache_temperature.dist.impl.hh" #include "idl/mutation.dist.impl.hh" +#include "idl/messaging_service.dist.impl.hh" #include #include #include @@ -948,24 +950,28 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg)); } -void messaging_service::register_definitions_update(std::function fm)>&& func) { +void messaging_service::register_definitions_update(std::function fm, + rpc::optional> cm)>&& func) { register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func)); } void messaging_service::unregister_definitions_update() { _rpc->unregister_handler(netw::messaging_verb::DEFINITIONS_UPDATE); } -future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm) { - return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm)); +future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm, std::vector cm) { + return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm), std::move(cm)); } -void messaging_service::register_migration_request(std::function> (const rpc::client_info&)>&& func) { +void messaging_service::register_migration_request(std::function, std::vector> + (const rpc::client_info&, rpc::optional)>&& func) { register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func)); } void messaging_service::unregister_migration_request() { _rpc->unregister_handler(netw::messaging_verb::MIGRATION_REQUEST); } -future> messaging_service::send_migration_request(msg_addr id) { - return send_message>(this, messaging_verb::MIGRATION_REQUEST, std::move(id)); +future, rpc::optional>> messaging_service::send_migration_request(msg_addr id, + schema_pull_options options) { + return send_message, rpc::optional>>>(this, messaging_verb::MIGRATION_REQUEST, + std::move(id), options); } void messaging_service::register_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 46b657f22e..74f6a45ecc 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -153,6 +153,10 @@ namespace netw { struct serializer {}; +struct schema_pull_options { + bool remote_supports_canonical_mutation_retval = true; +}; + class messaging_service : public seastar::async_sharded_service { public: struct rpc_protocol_wrapper; @@ -380,14 +384,17 @@ public: future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg); // Wrapper for DEFINITIONS_UPDATE - void register_definitions_update(std::function fm)>&& func); + void register_definitions_update(std::function fm, + rpc::optional> cm)>&& func); void unregister_definitions_update(); - future<> send_definitions_update(msg_addr id, std::vector fm); + future<> send_definitions_update(msg_addr id, std::vector fm, std::vector cm); // Wrapper for MIGRATION_REQUEST - void register_migration_request(std::function> (const rpc::client_info&)>&& func); + void register_migration_request(std::function, std::vector> ( + const rpc::client_info&, rpc::optional)>&& func); void unregister_migration_request(); - future> send_migration_request(msg_addr id); + future, rpc::optional>> send_migration_request(msg_addr id, + schema_pull_options options); // FIXME: response_id_type is an alias in service::storage_proxy::response_id_type using response_id_type = uint64_t; diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 60499ac455..1ad3991312 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -95,12 +95,20 @@ void migration_manager::init_messaging_service() _feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema)); auto& ms = netw::get_local_messaging_service(); - ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector m) { + ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector fm, rpc::optional> cm) { auto src = netw::messaging_service::get_source(cinfo); + auto f = make_ready_future<>(); + if (cm) { + f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + return service::get_local_migration_manager().merge_schema_from(src, mutations); + }); + } else { + f = do_with(std::move(fm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + return service::get_local_migration_manager().merge_schema_from(src, mutations); + }); + } // Start a new fiber. - (void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { - return service::get_local_migration_manager().merge_schema_from(src, mutations); - }).then_wrapped([src] (auto&& f) { + (void)f.then_wrapped([src] (auto&& f) { if (f.failed()) { mlogger.error("Failed to update definitions from {}: {}", src, f.get_exception()); } else { @@ -109,19 +117,27 @@ void migration_manager::init_messaging_service() }); return netw::messaging_service::no_wait(); }); - ms.register_migration_request([this] (const rpc::client_info& cinfo) { + ms.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional options) { + using frozen_mutations = std::vector; + using canonical_mutations = std::vector; + const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval; + auto src = netw::messaging_service::get_source(cinfo); if (!has_compatible_schema_tables_version(src.addr)) { mlogger.debug("Ignoring schema request from incompatible node: {}", src); - return make_ready_future>(std::vector()); + return make_ready_future(frozen_mutations{}, canonical_mutations{}); } auto features = get_local_storage_service().cluster_schema_features(); auto& proxy = get_storage_proxy(); - return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy] (std::vector&& schema_mutations) { + return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector&& cm) { const auto& db = proxy.local().get_db().local(); - return boost::copy_range>(schema_mutations | boost::adaptors::transformed([&db] (const canonical_mutation& cm) { + if (cm_retval_supported) { + return make_ready_future(frozen_mutations{}, std::move(cm)); + } + auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db] (const canonical_mutation& cm) { return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); })); + return make_ready_future(std::move(fm), std::move(cm)); }).finally([p = get_local_shared_storage_proxy()] { // keep local proxy alive }); @@ -254,7 +270,13 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad { auto& ms = netw::get_local_messaging_service(); mlogger.info("Pulling schema from {}", id); - return ms.send_migration_request(std::move(id)).then([this, id] (std::vector mutations) { + return ms.send_migration_request(std::move(id), netw::schema_pull_options{}).then([this, id] (std::vector mutations, + rpc::optional> canonical_mutations) { + if (canonical_mutations) { + return do_with(std::move(*canonical_mutations), [this, id] (std::vector& mutations) { + return this->merge_schema_from(id, mutations); + }); + } return do_with(std::move(mutations), [this, id] (auto&& mutations) { return this->merge_schema_from(id, mutations); }); @@ -880,7 +902,8 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi { netw::messaging_service::msg_addr id{endpoint, 0}; auto fm = std::vector(schema.begin(), schema.end()); - return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm)); + auto cm = std::vector(schema.begin(), schema.end()); + return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm)); } // Returns a future on the local application of the schema