lwt: upgrade stored mutations to the latest schema during prepare

Currently they are upgraded during learn on a replica. The are two
problems with this.  First the column mapping may not exist on a replica
if it missed this particular schema (because it was down for instance)
and the mapping history is not part of the schema. In this case "Failed
to look up column mapping for schema version" will be thrown. Second lwt
request coordinator may not have the schema for the mutation as well
(because it was freed from the registry already) and when a replica
tries to retrieve the schema from the coordinator the retrieval will fail
causing the whole request to fail with "Schema version XXXX not found"

Both of those problems can be fixed by upgrading stored mutations
during prepare on a node it is stored at. To upgrade the mutation its
column mapping is needed and it is guarantied that it will be present
at the node the mutation is stored at since it is pre-request to store
it that the corresponded schema is available. After that the mutation
is processed using latest schema that will be available on all nodes.

Fixes #10770

Message-Id: <Y7/ifraPJghCWTsq@scylladb.com>
(cherry picked from commit 15ebd59071)
This commit is contained in:
Gleb Natapov' via ScyllaDB development
2023-01-12 12:35:42 +02:00
committed by Avi Kivity
parent f4aa5cacb1
commit a2e255833a

View File

@@ -78,7 +78,7 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
prv, tr_state, timeout);
});
});
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) {
return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema] (auto t) mutable {
if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
return make_exception_future<prepare_response>(utils::injected_error("injected_error_after_save_promise"));
}
@@ -103,8 +103,25 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, tracing::trace_
auto ex = f2.get_exception();
logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
}
return make_ready_future<prepare_response>(prepare_response(promise(std::move(state._accepted_proposal),
std::move(state._most_recent_commit), std::move(data_or_digest))));
auto upgrade_if_needed = [schema = std::move(schema)] (std::optional<proposal> p) mutable {
if (!p || p->update.schema_version() == schema->version()) {
return make_ready_future<std::optional<proposal>>(std::move(p));
}
// In case current schema is not the same as the schema in the proposal
// try to look it up first in the local schema_registry cache and upgrade
// the mutation using schema from the cache.
//
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
logger.debug("Stored mutation references outdated schema version. "
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema = std::move(schema), p = std::move(p)] (const column_mapping& cm) {
return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
});
};
return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable {
return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest)));
});
});
} else {
logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
@@ -200,15 +217,9 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis
// If there's no schema in the cache, then retrieve persisted column mapping
// for that version and upgrade the mutation with it.
if (decision.update.schema_version() != schema->version()) {
logger.debug("Stored mutation references outdated schema version. "
"Trying to upgrade the accepted proposal mutation to the most recent schema version.");
return service::get_column_mapping(decision.update.column_family_id(), decision.update.schema_version())
.then([&sp, schema, tr_state, timeout, &decision] (const column_mapping& cm) {
return do_with(decision.update.unfreeze_upgrading(schema, cm), [&sp, tr_state, timeout] (const mutation& upgraded) {
return sp.mutate_locally(upgraded, tr_state, db::commitlog::force_sync::yes, timeout);
});
});
on_internal_error(logger, format("schema version in learn does not match current schema"));
}
return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
});
} else {