mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-06 23:13:15 +00:00
"This simplifies implementation of mutation_partition merging by relaxing exception guarantees it needs to provide. This allows reverters to be dropped. Direct motivation for this is to make it easier to implement new semantics for merging of clustering range continuity. Implementation details: We only need strong exception guarantees when applying to the memtable, which is using MVCC. Instead of calling apply() with strong exception guarantees on the latest version, we will move the incoming mutation to a new partition_version and then use monotonic apply() to merge them. If that merging fails, we attach the version with the remainder, which cannot fail. This way apply() always succeeds if the allocation of partition_version object succeeds. Results of `perf_simple_query_g -c1 -m1G --write` (high overwrite rate): Before: 101011.13 tps 102498.07 tps 103174.68 tps 102879.55 tps 103524.48 tps 102794.56 tps 103565.11 tps 103018.51 tps 103494.37 tps 102375.81 tps 103361.65 tps After: 101785.37 tps 101366.19 tps 103532.26 tps 100834.83 tps 100552.11 tps 100891.31 tps 101752.06 tps 101532.00 tps 100612.06 tps 102750.62 tps 100889.16 tps Fixes #2012." * tag 'tgrabiec/drop-reversible-apply-v1' of github.com:scylladb/seastar-dev: mutation_partition: Drop apply_reversibly() mutation_partition: Relax exception guarantees of apply() mutation_partition: Introduce apply_weak() tests: mvcc: Add test for atomicity of partition_entry::apply() tests: Move failure_injecting_allocation_strategy to a header tests: mutation_partition: Test exception guarantees of apply_monotonically() mvcc: Use apply_monotonically() where sufficient mvcc: partition_version: Use apply_monotonically() to provide atomicity mvcc: Extract partition_entry::add_version() mutation_partition: Introduce apply_monotonically() mutation_partition: Introduce row::consume_with()
465 lines
16 KiB
C++
465 lines
16 KiB
C++
/*
|
|
* Copyright (C) 2017 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
|
|
#include <boost/range/adaptor/transformed.hpp>
|
|
#include <boost/range/algorithm/copy.hpp>
|
|
#include <boost/range/algorithm_ext/push_back.hpp>
|
|
#include <seastar/core/thread.hh>
|
|
|
|
#include "partition_version.hh"
|
|
#include "partition_snapshot_row_cursor.hh"
|
|
|
|
#include "tests/test-utils.hh"
|
|
#include "tests/mutation_assertions.hh"
|
|
#include "tests/mutation_reader_assertions.hh"
|
|
#include "tests/simple_schema.hh"
|
|
#include "tests/mutation_source_test.hh"
|
|
#include "tests/failure_injecting_allocation_strategy.hh"
|
|
|
|
using namespace std::chrono_literals;
|
|
|
|
SEASTAR_TEST_CASE(test_apply_to_incomplete) {
|
|
return seastar::async([] {
|
|
logalloc::region r;
|
|
simple_schema table;
|
|
auto&& s = *table.schema();
|
|
|
|
auto new_mutation = [&] {
|
|
return mutation(table.make_pkey(0), table.schema());
|
|
};
|
|
|
|
auto mutation_with_row = [&] (clustering_key ck) {
|
|
auto m = new_mutation();
|
|
table.add_row(m, ck, "v");
|
|
return m;
|
|
};
|
|
|
|
// FIXME: There is no assert_that() for mutation_partition
|
|
auto assert_equal = [&] (mutation_partition mp1, mutation_partition mp2) {
|
|
auto key = table.make_pkey(0);
|
|
assert_that(mutation(table.schema(), key, std::move(mp1)))
|
|
.is_equal_to(mutation(table.schema(), key, std::move(mp2)));
|
|
};
|
|
|
|
auto apply = [&] (partition_entry& e, const mutation& m) {
|
|
e.apply_to_incomplete(s, partition_entry(m.partition()), s);
|
|
};
|
|
|
|
auto ck1 = table.make_ckey(1);
|
|
auto ck2 = table.make_ckey(2);
|
|
|
|
BOOST_TEST_MESSAGE("Check that insert falling into discontinuous range is dropped");
|
|
with_allocator(r.allocator(), [&] {
|
|
logalloc::reclaim_lock l(r);
|
|
auto e = partition_entry(mutation_partition::make_incomplete(s));
|
|
auto m = new_mutation();
|
|
table.add_row(m, ck1, "v");
|
|
apply(e, m);
|
|
assert_equal(e.squashed(s), mutation_partition::make_incomplete(s));
|
|
});
|
|
|
|
BOOST_TEST_MESSAGE("Check that continuity from latest version wins");
|
|
with_allocator(r.allocator(), [&] {
|
|
logalloc::reclaim_lock l(r);
|
|
auto m1 = mutation_with_row(ck2);
|
|
auto e = partition_entry(m1.partition());
|
|
|
|
auto snap1 = e.read(r, table.schema());
|
|
|
|
auto m2 = mutation_with_row(ck2);
|
|
apply(e, m2);
|
|
|
|
partition_version* latest = &*e.version();
|
|
partition_version* prev = latest->next();
|
|
|
|
for (rows_entry& row : prev->partition().clustered_rows()) {
|
|
row.set_continuous(is_continuous::no);
|
|
}
|
|
|
|
auto m3 = mutation_with_row(ck1);
|
|
apply(e, m3);
|
|
assert_equal(e.squashed(s), (m2 + m3).partition());
|
|
|
|
// Check that snapshot data is not stolen when its entry is applied
|
|
auto e2 = partition_entry(mutation_partition(table.schema()));
|
|
e2.apply_to_incomplete(s, std::move(e), s);
|
|
assert_equal(snap1->squashed(), m1.partition());
|
|
assert_equal(e2.squashed(s), (m2 + m3).partition());
|
|
});
|
|
});
|
|
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_schema_upgrade_preserves_continuity) {
|
|
return seastar::async([] {
|
|
logalloc::region r;
|
|
simple_schema table;
|
|
|
|
auto new_mutation = [&] {
|
|
return mutation(table.make_pkey(0), table.schema());
|
|
};
|
|
|
|
auto mutation_with_row = [&] (clustering_key ck) {
|
|
auto m = new_mutation();
|
|
table.add_row(m, ck, "v");
|
|
return m;
|
|
};
|
|
|
|
// FIXME: There is no assert_that() for mutation_partition
|
|
auto assert_entry_equal = [&] (schema_ptr e_schema, partition_entry& e, mutation m) {
|
|
auto key = table.make_pkey(0);
|
|
assert_that(mutation(e_schema, key, e.squashed(*e_schema)))
|
|
.is_equal_to(m)
|
|
.has_same_continuity(m);
|
|
};
|
|
|
|
auto apply = [&] (schema_ptr e_schema, partition_entry& e, const mutation& m) {
|
|
e.apply_to_incomplete(*e_schema, partition_entry(m.partition()), *m.schema());
|
|
};
|
|
|
|
with_allocator(r.allocator(), [&] {
|
|
logalloc::reclaim_lock l(r);
|
|
auto m1 = mutation_with_row(table.make_ckey(1));
|
|
m1.partition().clustered_rows().begin()->set_continuous(is_continuous::no);
|
|
m1.partition().set_static_row_continuous(false);
|
|
m1.partition().ensure_last_dummy(*m1.schema());
|
|
|
|
auto e = partition_entry(m1.partition());
|
|
auto rd1 = e.read(r, table.schema());
|
|
|
|
auto m2 = mutation_with_row(table.make_ckey(3));
|
|
m2.partition().ensure_last_dummy(*m2.schema());
|
|
apply(table.schema(), e, m2);
|
|
|
|
auto new_schema = schema_builder(table.schema()).with_column("__new_column", utf8_type).build();
|
|
|
|
e.upgrade(table.schema(), new_schema);
|
|
rd1 = {};
|
|
|
|
assert_entry_equal(new_schema, e, m1 + m2);
|
|
|
|
auto m3 = mutation_with_row(table.make_ckey(2));
|
|
apply(new_schema, e, m3);
|
|
|
|
auto m4 = mutation_with_row(table.make_ckey(0));
|
|
table.add_static_row(m4, "s_val");
|
|
apply(new_schema, e, m4);
|
|
|
|
assert_entry_equal(new_schema, e, m1 + m2 + m3);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_full_eviction_marks_affected_range_as_discontinuous) {
|
|
return seastar::async([] {
|
|
logalloc::region r;
|
|
with_allocator(r.allocator(), [&] {
|
|
logalloc::reclaim_lock l(r);
|
|
|
|
simple_schema table;
|
|
auto&& s = *table.schema();
|
|
auto ck1 = table.make_ckey(1);
|
|
auto ck2 = table.make_ckey(2);
|
|
|
|
auto e = partition_entry(mutation_partition(table.schema()));
|
|
|
|
auto t = table.new_tombstone();
|
|
auto&& p1 = e.open_version(s).partition();
|
|
p1.clustered_row(s, ck2);
|
|
p1.apply(t);
|
|
|
|
auto snap1 = e.read(r, table.schema());
|
|
|
|
auto&& p2 = e.open_version(s).partition();
|
|
p2.clustered_row(s, ck1);
|
|
|
|
auto snap2 = e.read(r, table.schema());
|
|
|
|
e.evict();
|
|
|
|
BOOST_REQUIRE(snap1->squashed().fully_discontinuous(s, position_range(
|
|
position_in_partition::before_all_clustered_rows(),
|
|
position_in_partition::after_key(ck2)
|
|
)));
|
|
|
|
BOOST_REQUIRE(snap2->squashed().fully_discontinuous(s, position_range(
|
|
position_in_partition::before_all_clustered_rows(),
|
|
position_in_partition::after_key(ck2)
|
|
)));
|
|
|
|
BOOST_REQUIRE(!snap1->squashed().static_row_continuous());
|
|
BOOST_REQUIRE(!snap2->squashed().static_row_continuous());
|
|
|
|
BOOST_REQUIRE_EQUAL(snap1->squashed().partition_tombstone(), t);
|
|
BOOST_REQUIRE_EQUAL(snap2->squashed().partition_tombstone(), t);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_eviction_with_active_reader) {
|
|
return seastar::async([] {
|
|
logalloc::region r;
|
|
with_allocator(r.allocator(), [&] {
|
|
simple_schema table;
|
|
auto&& s = *table.schema();
|
|
auto ck1 = table.make_ckey(1);
|
|
auto ck2 = table.make_ckey(2);
|
|
|
|
auto e = partition_entry(mutation_partition(table.schema()));
|
|
|
|
auto&& p1 = e.open_version(s).partition();
|
|
p1.clustered_row(s, ck2);
|
|
p1.ensure_last_dummy(s); // needed by partition_snapshot_row_cursor
|
|
|
|
auto snap1 = e.read(r, table.schema());
|
|
|
|
auto&& p2 = e.open_version(s).partition();
|
|
p2.clustered_row(s, ck1);
|
|
|
|
auto snap2 = e.read(r, table.schema());
|
|
|
|
partition_snapshot_row_cursor cursor(s, *snap2);
|
|
cursor.advance_to(position_in_partition_view::before_all_clustered_rows());
|
|
BOOST_REQUIRE(cursor.continuous());
|
|
BOOST_REQUIRE(cursor.key().equal(s, ck1));
|
|
|
|
e.evict();
|
|
|
|
cursor.maybe_refresh();
|
|
do {
|
|
BOOST_REQUIRE(!cursor.continuous());
|
|
BOOST_REQUIRE(cursor.dummy());
|
|
} while (cursor.next());
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
|
|
return seastar::async([] {
|
|
logalloc::region r;
|
|
with_allocator(r.allocator(), [&] {
|
|
simple_schema table;
|
|
auto&& s = *table.schema();
|
|
|
|
auto e = partition_entry(mutation_partition(table.schema()));
|
|
auto snap1 = e.read(r, table.schema());
|
|
|
|
{
|
|
auto&& p1 = snap1->version()->partition();
|
|
p1.clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no);
|
|
p1.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::no);
|
|
p1.clustered_row(s, table.make_ckey(2), is_dummy::no, is_continuous::no);
|
|
p1.clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::no);
|
|
p1.clustered_row(s, table.make_ckey(6), is_dummy::no, is_continuous::no);
|
|
p1.ensure_last_dummy(s);
|
|
}
|
|
|
|
auto snap2 = e.read(r, table.schema(), 1);
|
|
|
|
partition_snapshot_row_cursor cur(s, *snap2);
|
|
position_in_partition::equal_compare eq(s);
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.advance_to(table.make_ckey(0)));
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(0)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
}
|
|
|
|
r.full_compaction();
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(0)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(1)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(2)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(2)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
}
|
|
|
|
{
|
|
auto&& p2 = snap2->version()->partition();
|
|
p2.clustered_row(s, table.make_ckey(2), is_dummy::no, is_continuous::yes);
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(2)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(3)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
}
|
|
|
|
{
|
|
auto&& p2 = snap2->version()->partition();
|
|
p2.clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::yes);
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(3)));
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(4)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(6)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), position_in_partition::after_all_clustered_rows()));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
|
|
BOOST_REQUIRE(!cur.next());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.advance_to(table.make_ckey(4)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(4)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
|
|
{
|
|
auto&& p2 = snap2->version()->partition();
|
|
p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::yes);
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(4)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(5)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
|
|
BOOST_REQUIRE(cur.next());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(6)));
|
|
BOOST_REQUIRE(!cur.continuous());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.advance_to(table.make_ckey(4)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
|
|
e.evict();
|
|
|
|
{
|
|
auto&& p2 = snap2->version()->partition();
|
|
p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::yes);
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(!cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(5)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(!cur.advance_to(table.make_ckey(4)));
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(5)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
|
|
{
|
|
logalloc::reclaim_lock rl(r);
|
|
BOOST_REQUIRE(cur.maybe_refresh());
|
|
BOOST_REQUIRE(eq(cur.position(), table.make_ckey(5)));
|
|
BOOST_REQUIRE(cur.continuous());
|
|
}
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_apply_is_atomic) {
|
|
auto do_test = [](auto&& gen) {
|
|
failure_injecting_allocation_strategy alloc(standard_allocator());
|
|
with_allocator(alloc, [&] {
|
|
auto target = gen();
|
|
auto second = gen();
|
|
|
|
auto expected = target + second;
|
|
|
|
size_t fail_offset = 0;
|
|
while (true) {
|
|
mutation_partition m2 = second.partition();
|
|
auto e = partition_entry(target.partition());
|
|
//auto snap1 = e.read(r, gen.schema());
|
|
|
|
alloc.fail_after(fail_offset++);
|
|
try {
|
|
e.apply(*target.schema(), std::move(m2), *second.schema());
|
|
alloc.stop_failing();
|
|
break;
|
|
} catch (const std::bad_alloc&) {
|
|
assert_that(mutation(target.schema(), target.decorated_key(), e.squashed(*target.schema())))
|
|
.is_equal_to(target)
|
|
.has_same_continuity(target);
|
|
e.apply(*target.schema(), std::move(m2), *second.schema());
|
|
assert_that(mutation(target.schema(), target.decorated_key(), e.squashed(*target.schema())))
|
|
.is_equal_to(expected)
|
|
.has_same_continuity(expected);
|
|
}
|
|
assert_that(mutation(target.schema(), target.decorated_key(), e.squashed(*target.schema())))
|
|
.is_equal_to(expected)
|
|
.has_same_continuity(expected);
|
|
}
|
|
});
|
|
};
|
|
|
|
do_test(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
|
do_test(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
|
return make_ready_future<>();
|
|
}
|