diff --git a/configure.py b/configure.py index 8eba6a728f..ece836c33c 100755 --- a/configure.py +++ b/configure.py @@ -797,6 +797,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/consistency_level.idl.hh', 'idl/cache_temperature.idl.hh', 'idl/view.idl.hh', + 'idl/messaging_service.idl.hh', ] headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git']) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 8dc3da15ed..3cb3a25555 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -642,19 +642,19 @@ future calculate_schema_digest(distributed& }); } -future> convert_schema_to_mutations(distributed& proxy, schema_features features) +future> convert_schema_to_mutations(distributed& proxy, schema_features features) { auto map = [&proxy] (sstring table) { return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) { auto s = proxy.local().get_db().local().find_schema(NAME, table); - std::vector results; + std::vector results; for (auto&& p : rs->partitions()) { auto mut = p.mut().unfreeze(s); auto partition_key = value_cast(utf8_type->deserialize(mut.key().get_component(*s, 0))); if (is_system_keyspace(partition_key)) { continue; } - results.emplace_back(std::move(p.mut())); + results.emplace_back(mut); } return results; }); @@ -663,7 +663,7 @@ future> convert_schema_to_mutations(distributed{}, reduce); + return map_reduce(all_table_names(features), map, std::vector{}, reduce); } future diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 640ad8f5aa..7d6ea79d19 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -152,7 +152,7 @@ future<> save_system_keyspace_schema(); future calculate_schema_digest(distributed& proxy, schema_features); -future> convert_schema_to_mutations(distributed& proxy, schema_features); +future> convert_schema_to_mutations(distributed& proxy, schema_features); future read_schema_partition_for_keyspace(distributed& proxy, const sstring& schema_table_name, const sstring& keyspace_name); diff --git a/idl/messaging_service.idl.hh b/idl/messaging_service.idl.hh new file mode 100644 index 0000000000..fcd8d82094 --- /dev/null +++ b/idl/messaging_service.idl.hh @@ -0,0 +1,28 @@ +/* + * Copyright 2019 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +namespace netw { + +struct schema_pull_options { + bool remote_supports_canonical_mutation_retval; +}; + +} // namespace netw diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7421e1ea00..1c3b02e737 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -60,6 +60,7 @@ #include "idl/cache_temperature.dist.hh" #include "idl/view.dist.hh" #include "idl/mutation.dist.hh" +#include "idl/messaging_service.dist.hh" #include "serializer_impl.hh" #include "serialization_visitors.hh" #include "idl/consistency_level.dist.impl.hh" @@ -80,6 +81,7 @@ #include "idl/query.dist.impl.hh" #include "idl/cache_temperature.dist.impl.hh" #include "idl/mutation.dist.impl.hh" +#include "idl/messaging_service.dist.impl.hh" #include #include #include @@ -948,24 +950,28 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg)); } -void messaging_service::register_definitions_update(std::function fm)>&& func) { +void messaging_service::register_definitions_update(std::function fm, + rpc::optional> cm)>&& func) { register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func)); } void messaging_service::unregister_definitions_update() { _rpc->unregister_handler(netw::messaging_verb::DEFINITIONS_UPDATE); } -future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm) { - return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm)); +future<> messaging_service::send_definitions_update(msg_addr id, std::vector fm, std::vector cm) { + return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm), std::move(cm)); } -void messaging_service::register_migration_request(std::function> (const rpc::client_info&)>&& func) { +void messaging_service::register_migration_request(std::function, std::vector> + (const rpc::client_info&, rpc::optional)>&& func) { register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func)); } void messaging_service::unregister_migration_request() { _rpc->unregister_handler(netw::messaging_verb::MIGRATION_REQUEST); } -future> messaging_service::send_migration_request(msg_addr id) { - return send_message>(this, messaging_verb::MIGRATION_REQUEST, std::move(id)); +future, rpc::optional>> messaging_service::send_migration_request(msg_addr id, + schema_pull_options options) { + return send_message, rpc::optional>>>(this, messaging_verb::MIGRATION_REQUEST, + std::move(id), options); } void messaging_service::register_mutation(std::function (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector forward, diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 46b657f22e..74f6a45ecc 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -153,6 +153,10 @@ namespace netw { struct serializer {}; +struct schema_pull_options { + bool remote_supports_canonical_mutation_retval = true; +}; + class messaging_service : public seastar::async_sharded_service { public: struct rpc_protocol_wrapper; @@ -380,14 +384,17 @@ public: future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg); // Wrapper for DEFINITIONS_UPDATE - void register_definitions_update(std::function fm)>&& func); + void register_definitions_update(std::function fm, + rpc::optional> cm)>&& func); void unregister_definitions_update(); - future<> send_definitions_update(msg_addr id, std::vector fm); + future<> send_definitions_update(msg_addr id, std::vector fm, std::vector cm); // Wrapper for MIGRATION_REQUEST - void register_migration_request(std::function> (const rpc::client_info&)>&& func); + void register_migration_request(std::function, std::vector> ( + const rpc::client_info&, rpc::optional)>&& func); void unregister_migration_request(); - future> send_migration_request(msg_addr id); + future, rpc::optional>> send_migration_request(msg_addr id, + schema_pull_options options); // FIXME: response_id_type is an alias in service::storage_proxy::response_id_type using response_id_type = uint64_t; diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 8ed30eb531..1ad3991312 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -95,12 +95,20 @@ void migration_manager::init_messaging_service() _feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema)); auto& ms = netw::get_local_messaging_service(); - ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector m) { + ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector fm, rpc::optional> cm) { auto src = netw::messaging_service::get_source(cinfo); + auto f = make_ready_future<>(); + if (cm) { + f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + return service::get_local_migration_manager().merge_schema_from(src, mutations); + }); + } else { + f = do_with(std::move(fm), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { + return service::get_local_migration_manager().merge_schema_from(src, mutations); + }); + } // Start a new fiber. - (void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector& mutations, shared_ptr& p) { - return service::get_local_migration_manager().merge_schema_from(src, mutations); - }).then_wrapped([src] (auto&& f) { + (void)f.then_wrapped([src] (auto&& f) { if (f.failed()) { mlogger.error("Failed to update definitions from {}: {}", src, f.get_exception()); } else { @@ -109,14 +117,28 @@ void migration_manager::init_messaging_service() }); return netw::messaging_service::no_wait(); }); - ms.register_migration_request([this] (const rpc::client_info& cinfo) { + ms.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional options) { + using frozen_mutations = std::vector; + using canonical_mutations = std::vector; + const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval; + auto src = netw::messaging_service::get_source(cinfo); if (!has_compatible_schema_tables_version(src.addr)) { mlogger.debug("Ignoring schema request from incompatible node: {}", src); - return make_ready_future>(std::vector()); + return make_ready_future(frozen_mutations{}, canonical_mutations{}); } auto features = get_local_storage_service().cluster_schema_features(); - return db::schema_tables::convert_schema_to_mutations(get_storage_proxy(), features).finally([p = get_local_shared_storage_proxy()] { + auto& proxy = get_storage_proxy(); + return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector&& cm) { + const auto& db = proxy.local().get_db().local(); + if (cm_retval_supported) { + return make_ready_future(frozen_mutations{}, std::move(cm)); + } + auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db] (const canonical_mutation& cm) { + return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); + })); + return make_ready_future(std::move(fm), std::move(cm)); + }).finally([p = get_local_shared_storage_proxy()] { // keep local proxy alive }); }); @@ -248,7 +270,13 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad { auto& ms = netw::get_local_messaging_service(); mlogger.info("Pulling schema from {}", id); - return ms.send_migration_request(std::move(id)).then([this, id] (std::vector mutations) { + return ms.send_migration_request(std::move(id), netw::schema_pull_options{}).then([this, id] (std::vector mutations, + rpc::optional> canonical_mutations) { + if (canonical_mutations) { + return do_with(std::move(*canonical_mutations), [this, id] (std::vector& mutations) { + return this->merge_schema_from(id, mutations); + }); + } return do_with(std::move(mutations), [this, id] (auto&& mutations) { return this->merge_schema_from(id, mutations); }); @@ -272,6 +300,26 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return i->second.trigger(); } +future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& canonical_mutations) { + mlogger.debug("Applying schema mutations from {}", src); + auto& proxy = service::get_storage_proxy(); + const auto& db = proxy.local().get_db().local(); + + std::vector mutations; + mutations.reserve(canonical_mutations.size()); + try { + for (const auto& cm : canonical_mutations) { + auto& tbl = db.find_column_family(cm.column_family_id()); + mutations.emplace_back(cm.to_mutation(tbl.schema())); + } + } catch (no_such_column_family& e) { + mlogger.error("Error while applying schema mutations from {}: {}", src, e); + return make_exception_future<>(std::make_exception_ptr( + std::runtime_error(fmt::format("Error while applying schema mutations: {}", e)))); + } + return db::schema_tables::merge_schema(service::get_local_storage_service(), proxy, std::move(mutations)); +} + future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector& mutations) { mlogger.debug("Applying schema mutations from {}", src); @@ -854,7 +902,8 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi { netw::messaging_service::msg_addr id{endpoint, 0}; auto fm = std::vector(schema.begin(), schema.end()); - return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm)); + auto cm = std::vector(schema.begin(), schema.end()); + return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm)); } // Returns a future on the local application of the schema diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 484def3a8f..87cab54f3d 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -86,6 +86,8 @@ public: // Merge mutations received from src. // Keep mutations alive around whole async operation. + future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); + // Deprecated. The canonical mutation should be used instead. future<> merge_schema_from(netw::msg_addr src, const std::vector& mutations); future<> notify_create_keyspace(const lw_shared_ptr& ksm);