Files
scylladb/mutation/async_utils.cc
Benny Halevy c485ed6287 canonical_mutation: add to_mutation_gently
to_mutation_gently generates mutation from canonical_mutation
asynchronously using the newly introduced mutation_partition
accept_gently method.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
2024-05-02 19:27:54 +03:00

91 lines
3.5 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include "mutation/async_utils.hh"
#include "mutation/mutation_partition_view.hh"
#include "mutation/partition_version.hh"
#include "partition_builder.hh"
#include "mutation/canonical_mutation.hh"
#include "converting_mutation_partition_applier.hh"
#include "mutation/mutation_partition_serializer.hh"
#include "idl/mutation.dist.impl.hh"
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition_view p,
const schema& p_schema, mutation_application_stats& app_stats) {
mutation_partition p2(target, mutation_partition::copy_comparators_only{});
partition_builder b(p_schema, p2);
co_await p.accept_gently(p_schema, b);
if (s.version() != p_schema.version()) {
p2.upgrade(p_schema, s);
}
apply_resume res;
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation_partition& target, const schema& s, const mutation_partition& p,
const schema& p_schema, mutation_application_stats& app_stats) {
mutation_partition p2(p_schema, p);
if (s.version() != p_schema.version()) {
p2.upgrade(p_schema, s);
}
apply_resume res;
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
apply_resume res;
while (target.apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation& target, mutation&& m) {
mutation_application_stats app_stats;
co_await apply_gently(target.partition(), *target.schema(), std::move(m.partition()), *m.schema(), app_stats);
}
future<> apply_gently(mutation& target, const mutation& m) {
auto m2 = m;
mutation_application_stats app_stats;
co_await apply_gently(target.partition(), *target.schema(), std::move(m2.partition()), *m.schema(), app_stats);
}
future<mutation> to_mutation_gently(const canonical_mutation& cm, schema_ptr s) {
auto in = ser::as_input_stream(cm.representation());
auto mv = ser::deserialize(in, boost::type<ser::canonical_mutation_view>());
auto cf_id = mv.table_id();
if (s->id() != cf_id) {
throw std::runtime_error(format("Attempted to deserialize canonical_mutation of table {} with schema of table {} ({}.{})",
cf_id, s->id(), s->ks_name(), s->cf_name()));
}
auto version = mv.schema_version();
auto pk = mv.key();
mutation m(std::move(s), std::move(pk));
if (version == m.schema()->version()) {
auto partition_view = mutation_partition_view::from_view(mv.partition());
mutation_application_stats app_stats;
co_await apply_gently(m.partition(), *m.schema(), partition_view, *m.schema(), app_stats);
} else {
column_mapping cm = mv.mapping();
converting_mutation_partition_applier v(cm, *m.schema(), m.partition());
auto partition_view = mutation_partition_view::from_view(mv.partition());
co_await partition_view.accept_gently(cm, v);
}
co_return m;
}