mutation_partition: add apply_gently

To be used for freezing mutations or
making canonical mutations gently.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2024-04-16 20:23:14 +03:00
parent f625cd76a9
commit e1411f3911
4 changed files with 135 additions and 0 deletions

View File

@@ -826,6 +826,7 @@ scylla_core = (['message/messaging_service.cc',
'mutation/partition_version.cc',
'mutation/range_tombstone.cc',
'mutation/range_tombstone_list.cc',
'mutation/async_utils.cc',
'absl-flat_hash_map.cc',
'collection_mutation.cc',
'client_data.cc',

62
mutation/async_utils.cc Normal file
View File

@@ -0,0 +1,62 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include "mutation/async_utils.hh"
#include "mutation/mutation_partition_view.hh"
#include "mutation/partition_version.hh"
#include "partition_builder.hh"
#include "mutation/canonical_mutation.hh"
#include "converting_mutation_partition_applier.hh"
#include "mutation/mutation_partition_serializer.hh"
#include "idl/mutation.dist.impl.hh"
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition_view p,
const schema& p_schema, mutation_application_stats& app_stats) {
mutation_partition p2(target, mutation_partition::copy_comparators_only{});
partition_builder b(p_schema, p2);
co_await p.accept_gently(p_schema, b);
if (s.version() != p_schema.version()) {
p2.upgrade(p_schema, s);
}
apply_resume res;
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation_partition& target, const schema& s, const mutation_partition& p,
const schema& p_schema, mutation_application_stats& app_stats) {
mutation_partition p2(p_schema, p);
if (s.version() != p_schema.version()) {
p2.upgrade(p_schema, s);
}
apply_resume res;
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
apply_resume res;
while (target.apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
co_await yield();
}
}
future<> apply_gently(mutation& target, mutation&& m) {
mutation_application_stats app_stats;
co_await apply_gently(target.partition(), *target.schema(), std::move(m.partition()), *m.schema(), app_stats);
}
future<> apply_gently(mutation& target, const mutation& m) {
auto m2 = m;
mutation_application_stats app_stats;
co_await apply_gently(target.partition(), *target.schema(), std::move(m2.partition()), *m.schema(), app_stats);
}

35
mutation/async_utils.hh Normal file
View File

@@ -0,0 +1,35 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "mutation_partition.hh"
#include "mutation.hh"
//
// Applies p to the `target` mutation_partition.
//
// Commutative when target_schema == p_schema. If schemas differ, data in p which
// is not representable in target_schema is dropped, thus apply_gently() loses commutativity.
//
// Weak exception guarantees.
// Assumes target and p are not owned by a cache_tracker.
future<> apply_gently(mutation_partition& target, const schema& target_schema, const mutation_partition& p, const schema& p_schema,
mutation_application_stats& app_stats);
future<> apply_gently(mutation_partition& target, const schema& target_schema, mutation_partition_view p, const schema& p_schema,
mutation_application_stats& app_stats);
// Use in case the `target` instance and p share the same schema.
// Same guarantees and constraints as for other variants of apply_gently().
future<> apply_gently(mutation_partition& target, const schema& target_schema, mutation_partition&& p, mutation_application_stats& app_stats);
// The apply_gently entry points may yield while applying
// changes to the `target` mutation partition, therefore they should not
// be used when atomic application is required, such as when
// applying changes to memtable, which is done synchronously.
future<> apply_gently(mutation& target, mutation&& m);
future<> apply_gently(mutation& target, const mutation& m);

View File

@@ -53,6 +53,7 @@
#include "types/user.hh"
#include "mutation/mutation_rebuilder.hh"
#include "mutation/mutation_partition.hh"
#include "mutation/async_utils.hh"
#include "clustering_key_filter.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/from_fragments_v2.hh"
@@ -1065,6 +1066,32 @@ SEASTAR_TEST_CASE(test_apply_monotonically_is_monotonic) {
return make_ready_future<>();
}
SEASTAR_THREAD_TEST_CASE(test_apply_monotonically_with_preemption) {
auto do_test = [](auto&& gen) {
auto&& alloc = standard_allocator();
with_allocator(alloc, [&] {
mutation_application_stats app_stats;
mutation target = gen();
mutation second = gen();
auto expected = target + second;
mutation m = target;
auto m2 = mutation_partition(*m.schema(), second.partition());
apply_resume res;
memory::with_allocation_failures([&] {
while (m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
yield().get();
}
});
assert_that(m).is_equal_to_compacted(expected);
});
};
do_test(random_mutation_generator(random_mutation_generator::generate_counters::no));
do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes));
}
SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
auto do_test = [](auto&& gen) {
auto&& alloc = standard_allocator();
@@ -1973,6 +2000,16 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
// same as above, just using apply_gently
m12 = m1;
apply_gently(m12, m2).get();
m12_with_diff = m1;
apply_gently(m12_with_diff.partition(), *s, m2.partition().difference(*s, m1.partition()), app_stats).get();
check_partitions_match(m12.partition(), m12_with_diff.partition(), *s);
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
});
});
}