mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-05 06:23:03 +00:00
Merge "Use canonical mutations for background schema sync" from Botond
Currently the background schema sync (push/pull) uses frozen mutation to send the schema mutations over the wire to the remote node. For this to work correctly, both nodes have to have the exact same schema for the system schema tables, as attempting to unpack the frozen mutation with the wrong schema leads to undefined behaviour. To avoid this and to ensure syncing schema between nodes with different schema table schema versions is defined we migrate the background schema sync to use canonical mutations for the transfer of the schema mutations. Canonical mutations are immune to this problem, as they support deserializing with any version of the schema, older or newer one. The foreground schema sync mechanisms -- the on-demand schema pulls on reads and writes -- already use canonical mutations to transmit the schema mutations. It is important to note that due to this change, column-level incompatibilities between the schema mutations and the schema used to deserialize them will be hidden. This is undesired and should be fixed in a follow-up (#4956). Table level incompatibilities are detected and schema mutations containing such mutations will be rejected just like before. This patch adds canonical mutation support to the two background schema sync verbs: * `DEFINITIONS_UPDATE` (schema push) * `MIGRATION_REQUEST` (schema pull) Both verbs still support the old frozen mutation schema transfer, albeit that path is now much less efficient. After all nodes are upgraded, the pull verb can effectively avoid sending frozen mutations altogether, completely migrating to canonical mutations. Unfortunately this was not possible for the push verb, so that one now has an overhead as it needs to send both the frozen and canonical mutations. Fixes: #4273
This commit is contained in:
@@ -797,6 +797,7 @@ idls = ['idl/gossip_digest.idl.hh',
|
||||
'idl/consistency_level.idl.hh',
|
||||
'idl/cache_temperature.idl.hh',
|
||||
'idl/view.idl.hh',
|
||||
'idl/messaging_service.idl.hh',
|
||||
]
|
||||
|
||||
headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])
|
||||
|
||||
@@ -642,19 +642,19 @@ future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>&
|
||||
});
|
||||
}
|
||||
|
||||
future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features features)
|
||||
{
|
||||
auto map = [&proxy] (sstring table) {
|
||||
return db::system_keyspace::query_mutations(proxy, NAME, table).then([&proxy, table] (auto rs) {
|
||||
auto s = proxy.local().get_db().local().find_schema(NAME, table);
|
||||
std::vector<frozen_mutation> results;
|
||||
std::vector<canonical_mutation> results;
|
||||
for (auto&& p : rs->partitions()) {
|
||||
auto mut = p.mut().unfreeze(s);
|
||||
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
|
||||
if (is_system_keyspace(partition_key)) {
|
||||
continue;
|
||||
}
|
||||
results.emplace_back(std::move(p.mut()));
|
||||
results.emplace_back(mut);
|
||||
}
|
||||
return results;
|
||||
});
|
||||
@@ -663,7 +663,7 @@ future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<ser
|
||||
std::move(mutations.begin(), mutations.end(), std::back_inserter(result));
|
||||
return std::move(result);
|
||||
};
|
||||
return map_reduce(all_table_names(features), map, std::vector<frozen_mutation>{}, reduce);
|
||||
return map_reduce(all_table_names(features), map, std::vector<canonical_mutation>{}, reduce);
|
||||
}
|
||||
|
||||
future<schema_result>
|
||||
|
||||
@@ -152,7 +152,7 @@ future<> save_system_keyspace_schema();
|
||||
|
||||
future<utils::UUID> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
|
||||
future<std::vector<frozen_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<service::storage_proxy>& proxy, schema_features);
|
||||
|
||||
future<schema_result_value_type>
|
||||
read_schema_partition_for_keyspace(distributed<service::storage_proxy>& proxy, const sstring& schema_table_name, const sstring& keyspace_name);
|
||||
|
||||
28
idl/messaging_service.idl.hh
Normal file
28
idl/messaging_service.idl.hh
Normal file
@@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
namespace netw {
|
||||
|
||||
struct schema_pull_options {
|
||||
bool remote_supports_canonical_mutation_retval;
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -60,6 +60,7 @@
|
||||
#include "idl/cache_temperature.dist.hh"
|
||||
#include "idl/view.dist.hh"
|
||||
#include "idl/mutation.dist.hh"
|
||||
#include "idl/messaging_service.dist.hh"
|
||||
#include "serializer_impl.hh"
|
||||
#include "serialization_visitors.hh"
|
||||
#include "idl/consistency_level.dist.impl.hh"
|
||||
@@ -80,6 +81,7 @@
|
||||
#include "idl/query.dist.impl.hh"
|
||||
#include "idl/cache_temperature.dist.impl.hh"
|
||||
#include "idl/mutation.dist.impl.hh"
|
||||
#include "idl/messaging_service.dist.impl.hh"
|
||||
#include <seastar/rpc/lz4_compressor.hh>
|
||||
#include <seastar/rpc/lz4_fragmented_compressor.hh>
|
||||
#include <seastar/rpc/multi_algo_compressor_factory.hh>
|
||||
@@ -948,24 +950,28 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg));
|
||||
}
|
||||
|
||||
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm)>&& func) {
|
||||
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func));
|
||||
}
|
||||
void messaging_service::unregister_definitions_update() {
|
||||
_rpc->unregister_handler(netw::messaging_verb::DEFINITIONS_UPDATE);
|
||||
}
|
||||
future<> messaging_service::send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm) {
|
||||
return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm));
|
||||
future<> messaging_service::send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm, std::vector<canonical_mutation> cm) {
|
||||
return send_message_oneway(this, messaging_verb::DEFINITIONS_UPDATE, std::move(id), std::move(fm), std::move(cm));
|
||||
}
|
||||
|
||||
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func) {
|
||||
void messaging_service::register_migration_request(std::function<future<std::vector<frozen_mutation>, std::vector<canonical_mutation>>
|
||||
(const rpc::client_info&, rpc::optional<schema_pull_options>)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::MIGRATION_REQUEST, std::move(func));
|
||||
}
|
||||
void messaging_service::unregister_migration_request() {
|
||||
_rpc->unregister_handler(netw::messaging_verb::MIGRATION_REQUEST);
|
||||
}
|
||||
future<std::vector<frozen_mutation>> messaging_service::send_migration_request(msg_addr id) {
|
||||
return send_message<std::vector<frozen_mutation>>(this, messaging_verb::MIGRATION_REQUEST, std::move(id));
|
||||
future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>> messaging_service::send_migration_request(msg_addr id,
|
||||
schema_pull_options options) {
|
||||
return send_message<future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>>>(this, messaging_verb::MIGRATION_REQUEST,
|
||||
std::move(id), options);
|
||||
}
|
||||
|
||||
void messaging_service::register_mutation(std::function<future<rpc::no_wait_type> (const rpc::client_info&, rpc::opt_time_point, frozen_mutation fm, std::vector<inet_address> forward,
|
||||
|
||||
@@ -153,6 +153,10 @@ namespace netw {
|
||||
|
||||
struct serializer {};
|
||||
|
||||
struct schema_pull_options {
|
||||
bool remote_supports_canonical_mutation_retval = true;
|
||||
};
|
||||
|
||||
class messaging_service : public seastar::async_sharded_service<messaging_service> {
|
||||
public:
|
||||
struct rpc_protocol_wrapper;
|
||||
@@ -380,14 +384,17 @@ public:
|
||||
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
|
||||
|
||||
// Wrapper for DEFINITIONS_UPDATE
|
||||
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm)>&& func);
|
||||
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func);
|
||||
void unregister_definitions_update();
|
||||
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm);
|
||||
future<> send_definitions_update(msg_addr id, std::vector<frozen_mutation> fm, std::vector<canonical_mutation> cm);
|
||||
|
||||
// Wrapper for MIGRATION_REQUEST
|
||||
void register_migration_request(std::function<future<std::vector<frozen_mutation>> (const rpc::client_info&)>&& func);
|
||||
void register_migration_request(std::function<future<std::vector<frozen_mutation>, std::vector<canonical_mutation>> (
|
||||
const rpc::client_info&, rpc::optional<schema_pull_options>)>&& func);
|
||||
void unregister_migration_request();
|
||||
future<std::vector<frozen_mutation>> send_migration_request(msg_addr id);
|
||||
future<std::vector<frozen_mutation>, rpc::optional<std::vector<canonical_mutation>>> send_migration_request(msg_addr id,
|
||||
schema_pull_options options);
|
||||
|
||||
// FIXME: response_id_type is an alias in service::storage_proxy::response_id_type
|
||||
using response_id_type = uint64_t;
|
||||
|
||||
@@ -95,12 +95,20 @@ void migration_manager::init_messaging_service()
|
||||
_feature_listeners.push_back(ss.cluster_supports_digest_insensitive_to_expiry().when_enabled(update_schema));
|
||||
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> m) {
|
||||
ms.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
|
||||
auto src = netw::messaging_service::get_source(cinfo);
|
||||
auto f = make_ready_future<>();
|
||||
if (cm) {
|
||||
f = do_with(std::move(*cm), get_local_shared_storage_proxy(), [src] (const std::vector<canonical_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
});
|
||||
} else {
|
||||
f = do_with(std::move(fm), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
});
|
||||
}
|
||||
// Start a new fiber.
|
||||
(void)do_with(std::move(m), get_local_shared_storage_proxy(), [src] (const std::vector<frozen_mutation>& mutations, shared_ptr<storage_proxy>& p) {
|
||||
return service::get_local_migration_manager().merge_schema_from(src, mutations);
|
||||
}).then_wrapped([src] (auto&& f) {
|
||||
(void)f.then_wrapped([src] (auto&& f) {
|
||||
if (f.failed()) {
|
||||
mlogger.error("Failed to update definitions from {}: {}", src, f.get_exception());
|
||||
} else {
|
||||
@@ -109,14 +117,28 @@ void migration_manager::init_messaging_service()
|
||||
});
|
||||
return netw::messaging_service::no_wait();
|
||||
});
|
||||
ms.register_migration_request([this] (const rpc::client_info& cinfo) {
|
||||
ms.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options) {
|
||||
using frozen_mutations = std::vector<frozen_mutation>;
|
||||
using canonical_mutations = std::vector<canonical_mutation>;
|
||||
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
|
||||
|
||||
auto src = netw::messaging_service::get_source(cinfo);
|
||||
if (!has_compatible_schema_tables_version(src.addr)) {
|
||||
mlogger.debug("Ignoring schema request from incompatible node: {}", src);
|
||||
return make_ready_future<std::vector<frozen_mutation>>(std::vector<frozen_mutation>());
|
||||
return make_ready_future<frozen_mutations, canonical_mutations>(frozen_mutations{}, canonical_mutations{});
|
||||
}
|
||||
auto features = get_local_storage_service().cluster_schema_features();
|
||||
return db::schema_tables::convert_schema_to_mutations(get_storage_proxy(), features).finally([p = get_local_shared_storage_proxy()] {
|
||||
auto& proxy = get_storage_proxy();
|
||||
return db::schema_tables::convert_schema_to_mutations(proxy, features).then([&proxy, cm_retval_supported] (std::vector<canonical_mutation>&& cm) {
|
||||
const auto& db = proxy.local().get_db().local();
|
||||
if (cm_retval_supported) {
|
||||
return make_ready_future<frozen_mutations, canonical_mutations>(frozen_mutations{}, std::move(cm));
|
||||
}
|
||||
auto fm = boost::copy_range<std::vector<frozen_mutation>>(cm | boost::adaptors::transformed([&db] (const canonical_mutation& cm) {
|
||||
return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema());
|
||||
}));
|
||||
return make_ready_future<frozen_mutations, canonical_mutations>(std::move(fm), std::move(cm));
|
||||
}).finally([p = get_local_shared_storage_proxy()] {
|
||||
// keep local proxy alive
|
||||
});
|
||||
});
|
||||
@@ -248,7 +270,13 @@ future<> migration_manager::do_merge_schema_from(netw::messaging_service::msg_ad
|
||||
{
|
||||
auto& ms = netw::get_local_messaging_service();
|
||||
mlogger.info("Pulling schema from {}", id);
|
||||
return ms.send_migration_request(std::move(id)).then([this, id] (std::vector<frozen_mutation> mutations) {
|
||||
return ms.send_migration_request(std::move(id), netw::schema_pull_options{}).then([this, id] (std::vector<frozen_mutation> mutations,
|
||||
rpc::optional<std::vector<canonical_mutation>> canonical_mutations) {
|
||||
if (canonical_mutations) {
|
||||
return do_with(std::move(*canonical_mutations), [this, id] (std::vector<canonical_mutation>& mutations) {
|
||||
return this->merge_schema_from(id, mutations);
|
||||
});
|
||||
}
|
||||
return do_with(std::move(mutations), [this, id] (auto&& mutations) {
|
||||
return this->merge_schema_from(id, mutations);
|
||||
});
|
||||
@@ -272,6 +300,26 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
|
||||
return i->second.trigger();
|
||||
}
|
||||
|
||||
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<canonical_mutation>& canonical_mutations) {
|
||||
mlogger.debug("Applying schema mutations from {}", src);
|
||||
auto& proxy = service::get_storage_proxy();
|
||||
const auto& db = proxy.local().get_db().local();
|
||||
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(canonical_mutations.size());
|
||||
try {
|
||||
for (const auto& cm : canonical_mutations) {
|
||||
auto& tbl = db.find_column_family(cm.column_family_id());
|
||||
mutations.emplace_back(cm.to_mutation(tbl.schema()));
|
||||
}
|
||||
} catch (no_such_column_family& e) {
|
||||
mlogger.error("Error while applying schema mutations from {}: {}", src, e);
|
||||
return make_exception_future<>(std::make_exception_ptr<std::runtime_error>(
|
||||
std::runtime_error(fmt::format("Error while applying schema mutations: {}", e))));
|
||||
}
|
||||
return db::schema_tables::merge_schema(service::get_local_storage_service(), proxy, std::move(mutations));
|
||||
}
|
||||
|
||||
future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations)
|
||||
{
|
||||
mlogger.debug("Applying schema mutations from {}", src);
|
||||
@@ -854,7 +902,8 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi
|
||||
{
|
||||
netw::messaging_service::msg_addr id{endpoint, 0};
|
||||
auto fm = std::vector<frozen_mutation>(schema.begin(), schema.end());
|
||||
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm));
|
||||
auto cm = std::vector<canonical_mutation>(schema.begin(), schema.end());
|
||||
return netw::get_local_messaging_service().send_definitions_update(id, std::move(fm), std::move(cm));
|
||||
}
|
||||
|
||||
// Returns a future on the local application of the schema
|
||||
|
||||
@@ -86,6 +86,8 @@ public:
|
||||
|
||||
// Merge mutations received from src.
|
||||
// Keep mutations alive around whole async operation.
|
||||
future<> merge_schema_from(netw::msg_addr src, const std::vector<canonical_mutation>& mutations);
|
||||
// Deprecated. The canonical mutation should be used instead.
|
||||
future<> merge_schema_from(netw::msg_addr src, const std::vector<frozen_mutation>& mutations);
|
||||
|
||||
future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
|
||||
|
||||
Reference in New Issue
Block a user