diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc index 54e234f446..ae1cd8d665 100644 --- a/service/paxos/paxos_state.cc +++ b/service/paxos/paxos_state.cc @@ -78,7 +78,7 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ prv, tr_state, timeout); }); }); - return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest] (auto t) { + return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema] (auto t) mutable { if (utils::get_local_injector().enter("paxos_error_after_save_promise")) { return make_exception_future(utils::injected_error("injected_error_after_save_promise")); } @@ -103,8 +103,25 @@ future paxos_state::prepare(storage_proxy& sp, tracing::trace_ auto ex = f2.get_exception(); logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex)); } - return make_ready_future(prepare_response(promise(std::move(state._accepted_proposal), - std::move(state._most_recent_commit), std::move(data_or_digest)))); + auto upgrade_if_needed = [schema = std::move(schema)] (std::optional p) mutable { + if (!p || p->update.schema_version() == schema->version()) { + return make_ready_future>(std::move(p)); + } + // In case current schema is not the same as the schema in the proposal + // try to look it up first in the local schema_registry cache and upgrade + // the mutation using schema from the cache. + // + // If there's no schema in the cache, then retrieve persisted column mapping + // for that version and upgrade the mutation with it. + logger.debug("Stored mutation references outdated schema version. " + "Trying to upgrade the accepted proposal mutation to the most recent schema version."); + return service::get_column_mapping(p->update.column_family_id(), p->update.schema_version()).then([schema = std::move(schema), p = std::move(p)] (const column_mapping& cm) { + return make_ready_future>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm)))); + }); + }; + return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable { + return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest))); + }); }); } else { logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot); @@ -200,15 +217,9 @@ future<> paxos_state::learn(storage_proxy& sp, schema_ptr schema, proposal decis // If there's no schema in the cache, then retrieve persisted column mapping // for that version and upgrade the mutation with it. if (decision.update.schema_version() != schema->version()) { - logger.debug("Stored mutation references outdated schema version. " - "Trying to upgrade the accepted proposal mutation to the most recent schema version."); - return service::get_column_mapping(decision.update.column_family_id(), decision.update.schema_version()) - .then([&sp, schema, tr_state, timeout, &decision] (const column_mapping& cm) { - return do_with(decision.update.unfreeze_upgrading(schema, cm), [&sp, tr_state, timeout] (const mutation& upgraded) { - return sp.mutate_locally(upgraded, tr_state, db::commitlog::force_sync::yes, timeout); - }); - }); + on_internal_error(logger, format("schema version in learn does not match current schema")); } + return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout); }); } else {