mutation_partition: add apply_gently
To be used for freezing mutations or making canonical mutations gently. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -826,6 +826,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'mutation/partition_version.cc',
|
||||
'mutation/range_tombstone.cc',
|
||||
'mutation/range_tombstone_list.cc',
|
||||
'mutation/async_utils.cc',
|
||||
'absl-flat_hash_map.cc',
|
||||
'collection_mutation.cc',
|
||||
'client_data.cc',
|
||||
|
||||
62
mutation/async_utils.cc
Normal file
62
mutation/async_utils.cc
Normal file
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include "mutation/async_utils.hh"
|
||||
#include "mutation/mutation_partition_view.hh"
|
||||
#include "mutation/partition_version.hh"
|
||||
#include "partition_builder.hh"
|
||||
#include "mutation/canonical_mutation.hh"
|
||||
#include "converting_mutation_partition_applier.hh"
|
||||
#include "mutation/mutation_partition_serializer.hh"
|
||||
#include "idl/mutation.dist.impl.hh"
|
||||
|
||||
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition_view p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
mutation_partition p2(target, mutation_partition::copy_comparators_only{});
|
||||
partition_builder b(p_schema, p2);
|
||||
co_await p.accept_gently(p_schema, b);
|
||||
if (s.version() != p_schema.version()) {
|
||||
p2.upgrade(p_schema, s);
|
||||
}
|
||||
apply_resume res;
|
||||
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
|
||||
co_await yield();
|
||||
}
|
||||
}
|
||||
|
||||
future<> apply_gently(mutation_partition& target, const schema& s, const mutation_partition& p,
|
||||
const schema& p_schema, mutation_application_stats& app_stats) {
|
||||
mutation_partition p2(p_schema, p);
|
||||
if (s.version() != p_schema.version()) {
|
||||
p2.upgrade(p_schema, s);
|
||||
}
|
||||
apply_resume res;
|
||||
while (target.apply_monotonically(s, std::move(p2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
|
||||
co_await yield();
|
||||
}
|
||||
}
|
||||
|
||||
future<> apply_gently(mutation_partition& target, const schema& s, mutation_partition&& p, mutation_application_stats& app_stats) {
|
||||
apply_resume res;
|
||||
while (target.apply_monotonically(s, std::move(p), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
|
||||
co_await yield();
|
||||
}
|
||||
}
|
||||
|
||||
future<> apply_gently(mutation& target, mutation&& m) {
|
||||
mutation_application_stats app_stats;
|
||||
co_await apply_gently(target.partition(), *target.schema(), std::move(m.partition()), *m.schema(), app_stats);
|
||||
}
|
||||
|
||||
future<> apply_gently(mutation& target, const mutation& m) {
|
||||
auto m2 = m;
|
||||
mutation_application_stats app_stats;
|
||||
co_await apply_gently(target.partition(), *target.schema(), std::move(m2.partition()), *m.schema(), app_stats);
|
||||
}
|
||||
35
mutation/async_utils.hh
Normal file
35
mutation/async_utils.hh
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "mutation_partition.hh"
|
||||
#include "mutation.hh"
|
||||
|
||||
//
|
||||
// Applies p to the `target` mutation_partition.
|
||||
//
|
||||
// Commutative when target_schema == p_schema. If schemas differ, data in p which
|
||||
// is not representable in target_schema is dropped, thus apply_gently() loses commutativity.
|
||||
//
|
||||
// Weak exception guarantees.
|
||||
// Assumes target and p are not owned by a cache_tracker.
|
||||
future<> apply_gently(mutation_partition& target, const schema& target_schema, const mutation_partition& p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
future<> apply_gently(mutation_partition& target, const schema& target_schema, mutation_partition_view p, const schema& p_schema,
|
||||
mutation_application_stats& app_stats);
|
||||
// Use in case the `target` instance and p share the same schema.
|
||||
// Same guarantees and constraints as for other variants of apply_gently().
|
||||
future<> apply_gently(mutation_partition& target, const schema& target_schema, mutation_partition&& p, mutation_application_stats& app_stats);
|
||||
|
||||
// The apply_gently entry points may yield while applying
|
||||
// changes to the `target` mutation partition, therefore they should not
|
||||
// be used when atomic application is required, such as when
|
||||
// applying changes to memtable, which is done synchronously.
|
||||
future<> apply_gently(mutation& target, mutation&& m);
|
||||
future<> apply_gently(mutation& target, const mutation& m);
|
||||
@@ -53,6 +53,7 @@
|
||||
#include "types/user.hh"
|
||||
#include "mutation/mutation_rebuilder.hh"
|
||||
#include "mutation/mutation_partition.hh"
|
||||
#include "mutation/async_utils.hh"
|
||||
#include "clustering_key_filter.hh"
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
@@ -1065,6 +1066,32 @@ SEASTAR_TEST_CASE(test_apply_monotonically_is_monotonic) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_apply_monotonically_with_preemption) {
|
||||
auto do_test = [](auto&& gen) {
|
||||
auto&& alloc = standard_allocator();
|
||||
with_allocator(alloc, [&] {
|
||||
mutation_application_stats app_stats;
|
||||
mutation target = gen();
|
||||
mutation second = gen();
|
||||
|
||||
auto expected = target + second;
|
||||
|
||||
mutation m = target;
|
||||
auto m2 = mutation_partition(*m.schema(), second.partition());
|
||||
apply_resume res;
|
||||
memory::with_allocation_failures([&] {
|
||||
while (m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker, app_stats, is_preemptible::yes, res) == stop_iteration::no) {
|
||||
yield().get();
|
||||
}
|
||||
});
|
||||
assert_that(m).is_equal_to_compacted(expected);
|
||||
});
|
||||
};
|
||||
|
||||
do_test(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||
do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_v2_apply_monotonically_is_monotonic_on_alloc_failures) {
|
||||
auto do_test = [](auto&& gen) {
|
||||
auto&& alloc = standard_allocator();
|
||||
@@ -1973,6 +2000,16 @@ SEASTAR_TEST_CASE(test_mutation_diff_with_random_generator) {
|
||||
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
|
||||
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
|
||||
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
|
||||
|
||||
// same as above, just using apply_gently
|
||||
m12 = m1;
|
||||
apply_gently(m12, m2).get();
|
||||
m12_with_diff = m1;
|
||||
apply_gently(m12_with_diff.partition(), *s, m2.partition().difference(*s, m1.partition()), app_stats).get();
|
||||
check_partitions_match(m12.partition(), m12_with_diff.partition(), *s);
|
||||
check_partitions_match(mutation_partition{*s}, m1.partition().difference(*s, m1.partition()), *s);
|
||||
check_partitions_match(m1.partition(), m1.partition().difference(*s, mutation_partition{*s}), *s);
|
||||
check_partitions_match(mutation_partition{*s}, mutation_partition{*s}.difference(*s, m1.partition()), *s);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user