From a2e255833a6aefa86fa8329fd19303bf914c12ac Mon Sep 17 00:00:00 2001 From: Gleb Natapov' via ScyllaDB development Date: Thu, 12 Jan 2023 12:35:42 +0200 Subject: [PATCH] lwt: upgrade stored mutations to the latest schema during prepare Currently they are upgraded during learn on a replica. The are two problems with this. First the column mapping may not exist on a replica if it missed this particular schema (because it was down for instance) and the mapping history is not part of the schema. In this case "Failed to look up column mapping for schema version" will be thrown. Second lwt request coordinator may not have the schema for the mutation as well (because it was freed from the registry already) and when a replica tries to retrieve the schema from the coordinator the retrieval will fail causing the whole request to fail with "Schema version XXXX not found" Both of those problems can be fixed by upgrading stored mutations during prepare on a node it is stored at. To upgrade the mutation its column mapping is needed and it is guarantied that it will be present at the node the mutation is stored at since it is pre-request to store it that the corresponded schema is available. After that the mutation is processed using latest schema that will be available on all nodes. Fixes #10770 Message-Id: (cherry picked from commit 15ebd5907177ba8709bc40ad2fa9ae773a9cf674) --- service/paxos/paxos_state.cc | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 54e234f446..ae1cd8d665 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -78,7 +78,7 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ prv, tr_state, timeout); }); }); - return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) { + return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema] (auto t) mutable { if (utils::get_local_injector().enter("paxos_error_after_save_promise")) { return make_exception_future(utils::injected_error("injected_error_after_save_promise")); } @@ -103,8 +103,25 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ auto ex = f2.get_exception(); logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex)); } - return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), - std::move(state._most_recent_commit), std::move(data_or_digest)))); + auto upgrade_if_needed = [schema = std::move(schema)] (std::optional p) mutable { + if (!p || p->update.schema_version() == schema->version()) { + return make_ready_future>(std::move(p)); + } + // In case current schema is not the same as the schema in the proposal + // try to look it up first in the local schema_registry cache and upgrade + // the mutation using schema from the cache. + // + // If there's no schema in the cache, then retrieve persisted column mapping + // for that version and upgrade the mutation with it. + logger.debug("Stored mutation references outdated schema version. " + "Trying to upgrade the accepted proposal mutation to the most recent schema version."); + return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema = std::move(schema), p = std::move(p)] (const column_mapping& cm) { + return make_ready_future>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm)))); + }); + }; + return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable { + return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest))); + }); }); } else { logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); @@ -200,15 +217,9 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis // If there's no schema in the cache, then retrieve persisted column mapping // for that version and upgrade the mutation with it. if (decision.update.schema_version() != schema->version()) { - logger.debug("Stored mutation references outdated schema version. " - "Trying to upgrade the accepted proposal mutation to the most recent schema version."); - return service::get_column_mapping(decision.update.column_family_id(), decision.update.schema_version()) - .then([&sp, schema, tr_state, timeout, &decision] (const column_mapping& cm) { - return do_with(decision.update.unfreeze_upgrading(schema, cm), [&sp, tr_state, timeout] (const mutation& upgraded) { - return sp.mutate_locally(upgraded, tr_state, db::commitlog::force_sync::yes, timeout); - }); - }); + on_internal_error(logger, format("schema version in learn does not match current schema")); } + return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); }); } else {