Merge "Introduce segregate scrub mode" from Botond

"
The current scrub compaction has a serious drawback, while it is
very effective at removing any corruptions it recognizes, it is very
heavy-handed in its way of repairing such corruptions: it simply drops
all data that is suspected to be corrupt. While this *is* the safest way
to cleanse data, it might not be the best way from the point of view of
a user who doesn't want to loose data, even at the risk of retaining
some business-logic level corruption. Mind you, no database-level scrub
can ever fully repair data from the business-logic point of view, they
can only do so on the database-level. So in certain cases it might be
desirable to have a less heavy-handed approach of cleansing the data,
that tries as hard as it can to not loose any data.

This series introduces a new scrub mode, with the goal of addressing
this use-case: when the user doesn't want to loose any data. The new
mode is called "segregate" and it works by segregating its input into
multiple outputs such that each output contains a valid stream. This
approach can fix any out-of-order data, be that on the partition or
fragment level. Out-of-order partitions are simply written into a
separate output. Out of order fragments are handled by injecting a
partition-end/partition-start pair right before them, so that they are
now in a separate (duplicate) partition, that will just be written into
a separate output, just like a regular out-of-order partition.

The reason this series is posted as an RFC is that although I consider
the code stable and tested, there are some questions related to the UX.
* First and foremost every scrub that does more than just discard data
  that is suspected to be corrupt (but even these a certain degree) have
  to consider the possibility that they are rehabilitating corruptions,
  leaving them in the system without a warning, in the sense that the
  user won't see any more problems due to low-level corruptions and
  hence might think everything is alright, while data is still corrupt
  from the business logic point of view. It is very hard to draw a line
  between what should and shouldn't scrub do, yet there is a demand from
  users for scrub that can restore data without loosing any of it. Note
  that anybody executing such a scrub is already in a bad shape, even if
  they can read their data (they often can't) it is already corrupt,
  scrub is not making anything worse here.
* This series converts the previous `skip_corrupted` boolean into an
  enum, which now selects the scrub mode. This means that
  `skip_corrupted` cannot be combined with segregate to throw out what
  the former can't fix. This was chosen for simplicity, a bunch of
  flags, all interacting with each other is very hard to see through in
  my opinion, a linear mode selector is much more so.
* The new segregate mode goes all-in, by trying to fix even
  fragment-level disorder. Maybe it should only do it on the partition
  level, or maybe this should be made configurable, allowing the user to
  select what to happen with those data that cannot be fixed.

Tests: unit(dev), unit(sstable_datafile_test:debug)
"

* 'sstable-scrub-segregate-by-partition/v1' of https://github.com/denesb/scylla:
  test: boost/sstable_datafile_test: add tests for segregate mode scrub
  api: storage_service/keyspace_scrub: expose new segregate mode
  sstables: compaction/scrub: add segregate mode
  mutation_fragment_stream_validator: add reset methods
  mutation_writer: add segregate_by_partition
  api: /storage_service/keyspace_scrub: add scrub mode param
  sstables: compaction/scrub: replace skip_corrupted with mode enum
  sstables: compaction/scrub: prevent infinite loop when last partition end is missing
  tests: boost/sstable_datafile_test: use the same permit for all fragments in scrub tests
This commit is contained in:
Avi Kivity
2021-05-18 13:43:01 +03:00
14 changed files with 704 additions and 56 deletions

View File

@@ -31,7 +31,9 @@
#include "flat_mutation_reader.hh"
#include "mutation_writer/multishard_writer.hh"
#include "mutation_writer/timestamp_based_splitting_writer.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/random_schema.hh"
@@ -427,3 +429,26 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) {
BOOST_TEST_PASSPOINT();
}
}
// Check that the partition_based_splitting_mutation_writer can fix reordered partitions
SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer) {
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 2),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 1));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto input_mutations = tests::generate_random_mutations(random_schema).get();
input_mutations.emplace_back(*input_mutations.begin()); // Have a duplicate partition as well.
std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen());
mutation_writer::segregate_by_partition(flat_mutation_reader_from_mutations(tests::make_permit(), std::move(input_mutations)), [] (flat_mutation_reader rd) {
testlog.info("Checking segregated output stream");
return async([rd = std::move(rd)] () mutable {
assert_that(std::move(rd)).has_monotonic_positions();
});
}).get();
}

View File

@@ -62,6 +62,7 @@
#include "mutation_compactor.hh"
#include "service/priority_manager.hh"
#include "db/config.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include <stdio.h>
#include <ftw.h>
@@ -5300,7 +5301,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
});
}
SEASTAR_TEST_CASE(sstable_scrub_test) {
SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
cql_test_config test_cfg;
auto& db_cfg = *test_cfg.db_config;
@@ -5321,6 +5322,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("s", int32_type, column_kind::static_column)
.with_column("v", int32_type).build();
auto permit = tests::make_permit();
auto tmp = tmpdir();
auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
@@ -5365,9 +5367,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
testlog.trace("Writing partition {}", pkey.with_schema(*schema));
if (write_to_scrubbed) {
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {}));
scrubbed_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
}
corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {}));
corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
writer.consume_new_partition(dkey);
{
@@ -5376,9 +5378,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
testlog.trace("Writing row {}", sr.position());
if (write_to_scrubbed) {
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr));
scrubbed_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
}
corrupt_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr));
corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
writer.consume(std::move(sr));
}
@@ -5389,16 +5391,16 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
testlog.trace("Writing row {}", cr.position());
if (write_to_scrubbed) {
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr));
scrubbed_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
}
corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr));
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
writer.consume(clustering_row(*schema, cr));
// write row twice
if (i == (rows_count / 2)) {
auto bad_cr = make_clustering_row(i - 2);
testlog.trace("Writing out-of-order row {}", bad_cr.position());
corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, bad_cr));
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
writer.consume(std::move(bad_cr));
}
}
@@ -5406,9 +5408,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
testlog.trace("Writing partition_end");
if (write_to_scrubbed) {
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{});
scrubbed_fragments.emplace_back(*schema, permit, partition_end{});
}
corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_end{});
corrupt_fragments.emplace_back(*schema, permit, partition_end{});
writer.consume_end_of_partition();
};
@@ -5438,6 +5440,172 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, permit));
for (const auto& mf : mfs) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();
};
testlog.info("Verifying written data...");
// Make sure we wrote what we though we wrote.
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub in abort mode");
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub in skip mode");
// We expect the scrub with mode=srub::mode::skip to get rid of all invalid data.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::skip).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() != sst);
verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments);
});
}, test_cfg);
}
SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
cql_test_config test_cfg;
auto& db_cfg = *test_cfg.db_config;
// Disable cache to filter out its possible "corrections" to the corrupt sstable.
db_cfg.enable_cache(false);
db_cfg.enable_commitlog(false);
return do_with_cql_env([this] (cql_test_env& cql_env) -> future<> {
return test_env::do_with_async([this, &cql_env] (test_env& env) {
cell_locker_stats cl_stats;
auto& db = cql_env.local_db();
auto& compaction_manager = db.get_compaction_manager();
auto schema = schema_builder("ks", get_name())
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("s", int32_type, column_kind::static_column)
.with_column("v", int32_type).build();
auto permit = tests::make_permit();
auto tmp = tmpdir();
auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared<unsigned>(1)] () mutable {
return env.make_sstable(schema, tmp.path().string(), (*gen)++);
};
std::vector<mutation_fragment> corrupt_fragments;
auto scrubbed_mt = make_lw_shared<memtable>(schema);
auto sst = sst_gen();
testlog.info("Writing sstable {}", sst->get_filename());
{
const auto ts = api::timestamp_type{1};
auto local_keys = make_local_keys(3, schema);
auto config = env.manager().configure_writer();
config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose
auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{});
auto make_static_row = [&, schema, ts] (mutation& mut) {
auto r = row{};
auto cdef = schema->static_column_at(0);
r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
mut.set_static_cell(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
return static_row(*schema, std::move(r));
};
auto make_clustering_row = [&, schema, ts] (unsigned i, mutation* mut) {
auto r = row{};
auto cdef = schema->regular_column_at(0);
auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i))));
r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
if (mut) {
mut->set_clustered_cell(ckey, cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))});
}
return clustering_row(std::move(ckey), {}, {}, std::move(r));
};
auto write_partition = [&, schema, ts] (int pk) {
auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) });
auto dkey = dht::decorate_key(*schema, pkey);
testlog.trace("Writing partition {}", pkey);
auto mut = mutation(schema, dkey);
corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
writer.consume_new_partition(dkey);
{
auto sr = make_static_row(mut);
testlog.trace("Writing row {}", sr.position());
corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
writer.consume(std::move(sr));
}
const unsigned rows_count = 10;
for (unsigned i = 0; i < rows_count; ++i) {
auto cr = make_clustering_row(i, &mut);
testlog.trace("Writing row {}", cr.position());
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
writer.consume(clustering_row(*schema, cr));
// write row twice
if (i == (rows_count / 2)) {
auto bad_cr = make_clustering_row(i - 2, nullptr);
testlog.trace("Writing out-of-order row {}", bad_cr.position());
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
writer.consume(std::move(bad_cr));
}
}
testlog.trace("Writing partition_end");
corrupt_fragments.emplace_back(*schema, permit, partition_end{});
writer.consume_end_of_partition();
scrubbed_mt->apply(mut);
};
write_partition(1);
write_partition(0);
write_partition(2);
testlog.info("Writing done");
writer.consume_end_of_stream();
}
sst->load().get();
testlog.info("Loaded sstable {}", sst->get_filename());
auto cfg = column_family_test_config(env.manager());
cfg.datadir = tmp.path().string();
auto table = make_lw_shared<column_family>(schema, cfg, column_family::no_commitlog(),
db.get_compaction_manager(), cl_stats, db.row_cache_tracker());
auto stop_table = defer([table] {
table->stop().get();
});
table->mark_ready_for_writes();
table->start();
table->add_sstable_and_update_cache(sst).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, tests::make_permit()));
for (const auto& mf : mfs) {
@@ -5452,32 +5620,189 @@ SEASTAR_TEST_CASE(sstable_scrub_test) {
// Make sure we wrote what we though we wrote.
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub with --skip-corrupted=false");
testlog.info("Scrub in abort mode");
// We expect the scrub with skip_corrupted=false to stop on the first invalid fragment.
compaction_manager.perform_sstable_scrub(table.get(), false).get();
// We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
verify_fragments(sst, corrupt_fragments);
testlog.info("Scrub with --skip-corrupted=true");
testlog.info("Scrub in segregate mode");
// We expect the scrub with skip_corrupted=true to get rid of all invalid data.
compaction_manager.perform_sstable_scrub(table.get(), true).get();
// We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data.
compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::segregate).get();
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() != sst);
verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments);
testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size());
BOOST_REQUIRE(table->in_strategy_sstables().size() > 1);
{
auto sst_reader = assert_that(table->as_mutation_source().make_reader(schema, tests::make_permit()));
auto mt_reader = scrubbed_mt->as_data_source().make_reader(schema, tests::make_permit());
auto mt_reader_close = deferred_close(mt_reader);
while (auto mf_opt = mt_reader(db::no_timeout).get()) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, *mf_opt));
sst_reader.produces(*schema, *mf_opt);
}
sst_reader.produces_end_of_stream();
}
});
}, test_cfg);
}
// Test the scrub_reader in segregate mode and segregate_by_partition together,
// as they are used in scrub compaction in segregate mode.
SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
simple_schema ss;
auto schema = ss.schema();
auto permit = tests::make_permit();
struct expected_rows_type {
using expected_clustering_rows_type = std::set<clustering_key, clustering_key::less_compare>;
bool has_static_row = false;
expected_clustering_rows_type clustering_rows;
explicit expected_rows_type(const ::schema& s) : clustering_rows(s) { }
};
using expected_partitions_type = std::map<dht::decorated_key, expected_rows_type, dht::decorated_key::less_comparator>;
expected_partitions_type expected_partitions{dht::decorated_key::less_comparator(schema)};
std::deque<mutation_fragment> all_fragments;
size_t double_partition_end = 0;
size_t missing_partition_end = 0;
for (uint32_t p = 0; p < 10; ++p) {
auto dk = ss.make_pkey(tests::random::get_int<uint32_t>(0, 8));
auto it = expected_partitions.find(dk);
testlog.trace("Generating data for {} partition {}", it == expected_partitions.end() ? "new" : "existing", dk);
if (it == expected_partitions.end()) {
auto [inserted_it, _] = expected_partitions.emplace(dk, expected_rows_type(*schema));
it = inserted_it;
}
all_fragments.emplace_back(*schema, permit, partition_start(dk, {}));
auto& expected_rows = it->second;
for (uint32_t r = 0; r < 10; ++r) {
const auto is_clustering_row = tests::random::get_int<unsigned>(0, 8);
if (is_clustering_row) {
auto ck = ss.make_ckey(tests::random::get_int<uint32_t>(0, 8));
testlog.trace("Generating clustering row {}", ck);
all_fragments.emplace_back(*schema, permit, ss.make_row(ck, "cv"));
expected_rows.clustering_rows.insert(ck);
} else {
testlog.trace("Generating static row");
all_fragments.emplace_back(*schema, permit, ss.make_static_row("sv"));
expected_rows.has_static_row = true;
}
}
const auto partition_end_roll = tests::random::get_int(0, 100);
if (partition_end_roll < 80) {
testlog.trace("Generating partition end");
all_fragments.emplace_back(*schema, permit, partition_end());
} else if (partition_end_roll < 90) {
testlog.trace("Generating double partition end");
++double_partition_end;
all_fragments.emplace_back(*schema, permit, partition_end());
all_fragments.emplace_back(*schema, permit, partition_end());
} else {
testlog.trace("Not generating partition end");
++missing_partition_end;
}
}
{
size_t rows = 0;
for (const auto& part : expected_partitions) {
rows += part.second.clustering_rows.size();
}
testlog.info("Generated {} partitions (with {} double and {} missing partition ends), {} rows and {} fragments total", expected_partitions.size(), double_partition_end, missing_partition_end, rows, all_fragments.size());
}
auto copy_fragments = [&schema] (const std::deque<mutation_fragment>& frags) {
auto permit = tests::make_permit();
std::deque<mutation_fragment> copied_fragments;
for (const auto& frag : frags) {
copied_fragments.emplace_back(*schema, permit, frag);
}
return copied_fragments;
};
std::list<std::deque<mutation_fragment>> segregated_fragment_streams;
mutation_writer::segregate_by_partition(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)),
sstables::compaction_options::scrub::mode::segregate), [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) {
return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable {
auto close = deferred_close(rd);
auto& fragments = segregated_fragment_streams.emplace_back();
while (auto mf_opt = rd(db::no_timeout).get()) {
fragments.emplace_back(*schema, rd.permit(), *mf_opt);
}
});
}).get();
testlog.info("Segregation resulted in {} fragment streams", segregated_fragment_streams.size());
testlog.info("Checking position monotonicity of segregated streams");
{
size_t i = 0;
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
testlog.debug("Checking position monotonicity of segregated stream #{}", i++);
assert_that(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream))))
.has_monotonic_positions();
}
}
testlog.info("Checking position monotonicity of re-combined stream");
{
std::vector<flat_mutation_reader> readers;
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream))));
}
assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions();
}
testlog.info("Checking content of re-combined stream");
{
std::vector<flat_mutation_reader> readers;
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream))));
}
auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers)));
for (const auto& [pkey, content] : expected_partitions) {
testlog.debug("Checking content of partition {}", pkey);
rd.produces_partition_start(pkey);
if (content.has_static_row) {
rd.produces_static_row();
}
for (const auto& ckey : content.clustering_rows) {
rd.produces_row_with_key(ckey);
}
rd.produces_partition_end();
}
rd.produces_end_of_stream();
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto schema = schema_builder("ks", get_name())
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("s", int32_type, column_kind::static_column)
.with_column("v", int32_type).build();
auto permit = tests::make_permit();
std::deque<mutation_fragment> corrupt_fragments;
std::deque<mutation_fragment> scrubbed_fragments;
@@ -5488,7 +5813,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto make_partition_start = [&, schema] (unsigned pk) {
auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) });
auto dkey = dht::decorate_key(*schema, pkey);
return mutation_fragment(*schema, tests::make_permit(), partition_start(std::move(dkey), {}));
return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {}));
};
auto make_static_row = [&, schema, ts] {
@@ -5496,7 +5821,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto cdef = schema->static_column_at(0);
auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
return mutation_fragment(*schema, tests::make_permit(), static_row(*schema, std::move(r)));
return mutation_fragment(*schema, permit, static_row(*schema, std::move(r)));
};
auto make_clustering_row = [&, schema, ts] (unsigned i) {
@@ -5504,12 +5829,12 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto cdef = schema->regular_column_at(0);
auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
return mutation_fragment(*schema, tests::make_permit(),
return mutation_fragment(*schema, permit,
clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r)));
};
auto add_fragment = [&, schema] (mutation_fragment mf, bool add_to_scrubbed = true) {
corrupt_fragments.emplace_back(mutation_fragment(*schema, tests::make_permit(), mf));
corrupt_fragments.emplace_back(mutation_fragment(*schema, permit, mf));
if (add_to_scrubbed) {
scrubbed_fragments.emplace_back(std::move(mf));
}
@@ -5521,7 +5846,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(0));
add_fragment(make_clustering_row(2));
add_fragment(make_clustering_row(1), false); // out-of-order clustering key
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end
scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end
// Partition 2
add_fragment(make_partition_start(2));
@@ -5529,7 +5854,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(0));
add_fragment(make_clustering_row(1));
add_fragment(make_static_row(), false); // out-of-order static row
add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{}));
add_fragment(mutation_fragment(*schema, permit, partition_end{}));
// Partition 1 - out-of-order
add_fragment(make_partition_start(1), false);
@@ -5538,7 +5863,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(1), false);
add_fragment(make_clustering_row(2), false);
add_fragment(make_clustering_row(3), false);
add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{}), false);
add_fragment(mutation_fragment(*schema, permit, partition_end{}), false);
// Partition 3
add_fragment(make_partition_start(3));
@@ -5547,9 +5872,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(1));
add_fragment(make_clustering_row(2));
add_fragment(make_clustering_row(3));
scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end - at EOS
scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS
auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, tests::make_permit(), std::move(corrupt_fragments)), true));
auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)),
compaction_options::scrub::mode::skip));
for (const auto& mf : scrubbed_fragments) {
testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf));
r.produces(*schema, mf);