From 41181a5c2f15ee394eee813658942bd004b34ee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 30 Apr 2021 12:20:14 +0300 Subject: [PATCH 1/9] tests: boost/sstable_datafile_test: use the same permit for all fragments in scrub tests No point in creating a permit for every mutation fragment. --- test/boost/sstable_datafile_test.cc | 40 +++++++++++++++-------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 24ba6adbed..afb736a64a 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -5321,6 +5321,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { .with_column("ck", int32_type, column_kind::clustering_key) .with_column("s", int32_type, column_kind::static_column) .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); auto tmp = tmpdir(); auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared(1)] () mutable { @@ -5365,9 +5366,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing partition {}", pkey.with_schema(*schema)); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {})); + scrubbed_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {})); + corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); writer.consume_new_partition(dkey); { @@ -5376,9 +5377,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing row {}", sr.position()); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr)); + scrubbed_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr)); + corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); writer.consume(std::move(sr)); } @@ -5389,16 +5390,16 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing row {}", cr.position()); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr)); + scrubbed_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr)); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); writer.consume(clustering_row(*schema, cr)); // write row twice if (i == (rows_count / 2)) { auto bad_cr = make_clustering_row(i - 2); testlog.trace("Writing out-of-order row {}", bad_cr.position()); - corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, bad_cr)); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr)); writer.consume(std::move(bad_cr)); } } @@ -5406,9 +5407,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing partition_end"); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); + corrupt_fragments.emplace_back(*schema, permit, partition_end{}); writer.consume_end_of_partition(); }; @@ -5439,7 +5440,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { - auto r = assert_that(sst->as_mutation_source().make_reader(schema, tests::make_permit())); + auto r = assert_that(sst->as_mutation_source().make_reader(schema, permit)); for (const auto& mf : mfs) { testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); r.produces(*schema, mf); @@ -5478,6 +5479,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { .with_column("ck", int32_type, column_kind::clustering_key) .with_column("s", int32_type, column_kind::static_column) .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); std::deque corrupt_fragments; std::deque scrubbed_fragments; @@ -5488,7 +5490,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto make_partition_start = [&, schema] (unsigned pk) { auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) }); auto dkey = dht::decorate_key(*schema, pkey); - return mutation_fragment(*schema, tests::make_permit(), partition_start(std::move(dkey), {})); + return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {})); }; auto make_static_row = [&, schema, ts] { @@ -5496,7 +5498,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->static_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, tests::make_permit(), static_row(*schema, std::move(r))); + return mutation_fragment(*schema, permit, static_row(*schema, std::move(r))); }; auto make_clustering_row = [&, schema, ts] (unsigned i) { @@ -5504,12 +5506,12 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->regular_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, tests::make_permit(), + return mutation_fragment(*schema, permit, clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r))); }; auto add_fragment = [&, schema] (mutation_fragment mf, bool add_to_scrubbed = true) { - corrupt_fragments.emplace_back(mutation_fragment(*schema, tests::make_permit(), mf)); + corrupt_fragments.emplace_back(mutation_fragment(*schema, permit, mf)); if (add_to_scrubbed) { scrubbed_fragments.emplace_back(std::move(mf)); } @@ -5521,7 +5523,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(0)); add_fragment(make_clustering_row(2)); add_fragment(make_clustering_row(1), false); // out-of-order clustering key - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end // Partition 2 add_fragment(make_partition_start(2)); @@ -5529,7 +5531,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(0)); add_fragment(make_clustering_row(1)); add_fragment(make_static_row(), false); // out-of-order static row - add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{})); + add_fragment(mutation_fragment(*schema, permit, partition_end{})); // Partition 1 - out-of-order add_fragment(make_partition_start(1), false); @@ -5538,7 +5540,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(1), false); add_fragment(make_clustering_row(2), false); add_fragment(make_clustering_row(3), false); - add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{}), false); + add_fragment(mutation_fragment(*schema, permit, partition_end{}), false); // Partition 3 add_fragment(make_partition_start(3)); @@ -5547,9 +5549,9 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(1)); add_fragment(make_clustering_row(2)); add_fragment(make_clustering_row(3)); - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end - at EOS + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS - auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, tests::make_permit(), std::move(corrupt_fragments)), true)); + auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), true)); for (const auto& mf : scrubbed_fragments) { testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf)); r.produces(*schema, mf); From ba75115e2072cc2f3031a8cb6dece7ccdae95a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 4 May 2021 14:36:36 +0300 Subject: [PATCH 2/9] sstables: compaction/scrub: prevent infinite loop when last partition end is missing Scrub compaction will add the missing last partition-end in a stream when allowed to modify the stream. This however can cause an infinite loop: 1) user calls fill_buffer() 2) process fragments until underlying is at EOS 3) add missing partition end 4) set EOS 5) user sees that last buffer wasn't empty 6) calls fill_buffer() again 7) goto (3) To prevent this cycle, break out of `fill_buffer()` early when both the scrub reader and the underlying is at EOS. --- sstables/compaction.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index b200bb968f..096791c490 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -1277,6 +1277,9 @@ class scrub_compaction final : public regular_compaction { , _validator(*_schema) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + if (_end_of_stream) { + return make_ready_future<>(); + } return repeat([this, timeout] { return _reader.fill_buffer(timeout).then([this] { fill_buffer_from_underlying(); From 03728f5c26a6568a38d52c9e34311939cf059bb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 17:55:00 +0300 Subject: [PATCH 3/9] sstables: compaction/scrub: replace skip_corrupted with mode enum We want to add more modes than the current two, so replace the current boolean mode selector with an enum which allows for easy extensions. --- api/storage_service.cc | 9 +++++-- sstables/compaction.cc | 39 +++++++++++++++++++++-------- sstables/compaction.hh | 2 +- sstables/compaction_descriptor.hh | 13 +++++++--- sstables/compaction_manager.cc | 4 +-- sstables/compaction_manager.hh | 2 +- test/boost/sstable_datafile_test.cc | 15 +++++------ 7 files changed, 57 insertions(+), 27 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 38efac84fd..2ce7d08a71 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1242,7 +1242,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ }); ss::scrub.set(r, wrap_ks_cf(ctx, [&snap_ctl] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { + auto scrub_mode = sstables::compaction_options::scrub::mode::abort; + const auto skip_corrupted = req_param(*req, "skip_corrupted", false); + if (skip_corrupted) { + scrub_mode = sstables::compaction_options::scrub::mode::skip; + } auto f = make_ready_future<>(); if (!req_param(*req, "disable_snapshot", false)) { @@ -1252,12 +1257,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ }); } - return f.then([&ctx, keyspace, column_families, skip_corrupted] { + return f.then([&ctx, keyspace, column_families, scrub_mode] { return ctx.db.invoke_on_all([=] (database& db) { return do_for_each(column_families, [=, &db](sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_scrub(&cf, skip_corrupted); + return cm.perform_sstable_scrub(&cf, scrub_mode); }); }); }).then([]{ diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 096791c490..76db1eb4f0 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -122,6 +122,19 @@ std::ostream& operator<<(std::ostream& os, compaction_type type) { return os; } +std::string_view to_string(compaction_options::scrub::mode scrub_mode) { + switch (scrub_mode) { + case compaction_options::scrub::mode::abort: + return "abort"; + case compaction_options::scrub::mode::skip: + return "skip"; + } +} + +std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode) { + return os << to_string(scrub_mode); +} + std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" }; @@ -1147,14 +1160,14 @@ public: class scrub_compaction final : public regular_compaction { class reader : public flat_mutation_reader::impl { - bool _skip_corrupted; + compaction_options::scrub::mode _scrub_mode; flat_mutation_reader _reader; mutation_fragment_stream_validator _validator; bool _skip_to_next_partition = false; private: void maybe_abort_scrub() { - if (!_skip_corrupted) { + if (_scrub_mode == compaction_options::scrub::mode::abort) { throw compaction_stop_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data", false); } } @@ -1270,9 +1283,9 @@ class scrub_compaction final : public regular_compaction { } public: - reader(flat_mutation_reader underlying, bool skip_corrupted) + reader(flat_mutation_reader underlying, compaction_options::scrub::mode scrub_mode) : impl(underlying.schema(), underlying.permit()) - , _skip_corrupted(skip_corrupted) + , _scrub_mode(scrub_mode) , _reader(std::move(underlying)) , _validator(*_schema) { } @@ -1320,30 +1333,34 @@ class scrub_compaction final : public regular_compaction { private: compaction_options::scrub _options; + std::string _scrub_start_description; + std::string _scrub_finish_description; public: scrub_compaction(column_family& cf, compaction_descriptor descriptor, compaction_options::scrub options) : regular_compaction(cf, std::move(descriptor)) - , _options(options) { + , _options(options) + , _scrub_start_description(fmt::format("Scrubbing in {} mode", _options.operation_mode)) + , _scrub_finish_description(fmt::format("Finished scrubbing in {} mode", _options.operation_mode)) { } std::string_view report_start_desc() const override { - return "Scrubbing"; + return _scrub_start_description; } std::string_view report_finish_desc() const override { - return "Finished scrubbing"; + return _scrub_finish_description; } flat_mutation_reader make_sstable_reader() const override { - return make_flat_mutation_reader(regular_compaction::make_sstable_reader(), _options.skip_corrupted); + return make_flat_mutation_reader(regular_compaction::make_sstable_reader(), _options.operation_mode); } - friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted); + friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode); }; -flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted) { - return make_flat_mutation_reader(std::move(rd), skip_corrupted); +flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode) { + return make_flat_mutation_reader(std::move(rd), scrub_mode); } class resharding_compaction final : public compaction { diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 4f28985872..84ac15d9aa 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -111,5 +111,5 @@ namespace sstables { get_fully_expired_sstables(column_family& cf, const std::vector& compacting, gc_clock::time_point gc_before); // For tests, can drop after we virtualize sstables. - flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted); + flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode); } diff --git a/sstables/compaction_descriptor.hh b/sstables/compaction_descriptor.hh index c7b7379d79..6ec153525f 100644 --- a/sstables/compaction_descriptor.hh +++ b/sstables/compaction_descriptor.hh @@ -73,7 +73,11 @@ public: std::reference_wrapper db; }; struct scrub { - bool skip_corrupted; + enum class mode { + abort, // abort scrub on the first sign of corruption + skip, // skip corrupt data, including range of rows and/or partitions that are out-of-order + }; + mode operation_mode = mode::abort; }; struct reshard { }; @@ -110,8 +114,8 @@ public: return compaction_options(upgrade{db}); } - static compaction_options make_scrub(bool skip_corrupted) { - return compaction_options(scrub{skip_corrupted}); + static compaction_options make_scrub(scrub::mode mode) { + return compaction_options(scrub{mode}); } template @@ -122,6 +126,9 @@ public: compaction_type type() const; }; +std::string_view to_string(compaction_options::scrub::mode); +std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode); + struct compaction_descriptor { // List of sstables to be compacted. std::vector sstables; diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index bf8a07357f..eab3d0b75f 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -829,8 +829,8 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family } // Submit a column family to be scrubbed and wait for its termination. -future<> compaction_manager::perform_sstable_scrub(column_family* cf, bool skip_corrupted) { - return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [this] (const table& cf) { +future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode) { + return rewrite_sstables(cf, sstables::compaction_options::make_scrub(scrub_mode), [this] (const table& cf) { return get_candidates(cf); }); } diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index 5117448f66..c65eda8eea 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -210,7 +210,7 @@ public: future<> perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version); // Submit a column family to be scrubbed and wait for its termination. - future<> perform_sstable_scrub(column_family* cf, bool skip_corrupted); + future<> perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode); // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index afb736a64a..d9c926b816 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -5453,18 +5453,18 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { // Make sure we wrote what we though we wrote. verify_fragments(sst, corrupt_fragments); - testlog.info("Scrub with --skip-corrupted=false"); + testlog.info("Scrub in abort mode"); - // We expect the scrub with skip_corrupted=false to stop on the first invalid fragment. - compaction_manager.perform_sstable_scrub(table.get(), false).get(); + // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get(); BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); verify_fragments(sst, corrupt_fragments); - testlog.info("Scrub with --skip-corrupted=true"); + testlog.info("Scrub in skip mode"); - // We expect the scrub with skip_corrupted=true to get rid of all invalid data. - compaction_manager.perform_sstable_scrub(table.get(), true).get(); + // We expect the scrub with mode=srub::mode::skip to get rid of all invalid data. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::skip).get(); BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() != sst); @@ -5551,7 +5551,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(3)); scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS - auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), true)); + auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), + compaction_options::scrub::mode::skip)); for (const auto& mf : scrubbed_fragments) { testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf)); r.produces(*schema, mf); From 34643ac997a7243a0f40f940eb45d9ad0b7f9bbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 17:55:13 +0300 Subject: [PATCH 4/9] api: /storage_service/keyspace_scrub: add scrub mode param Add direct support to the newly added scrub mode enum. Instead of the legacy `skip_corrupted` flag, one can now select the desired mode from the mode enum. `skip_corrupted` is still supported for backwards compatibility but it is ignored when the mode enum is set. --- api/api-doc/storage_service.json | 14 +++++++++++++- api/storage_service.cc | 18 +++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 35a45c9b2e..bfd4b06a12 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -762,7 +762,7 @@ "operations":[ { "method":"GET", - "summary":"Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false", + "summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output.", "type": "long", "nickname":"scrub", "produces":[ @@ -785,6 +785,18 @@ "type":"boolean", "paramType":"query" }, + { + "name":"scrub_mode", + "description":"How to handle corrupt data (overrides 'skip_corrupted'); ", + "required":false, + "allowMultiple":false, + "type":"string", + "enum":[ + "ABORT", + "SKIP" + ], + "paramType":"query" + }, { "name":"keyspace", "description":"The keyspace to query about", diff --git a/api/storage_service.cc b/api/storage_service.cc index 2ce7d08a71..a762a6c2f7 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1244,9 +1244,21 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ ss::scrub.set(r, wrap_ks_cf(ctx, [&snap_ctl] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { auto scrub_mode = sstables::compaction_options::scrub::mode::abort; - const auto skip_corrupted = req_param(*req, "skip_corrupted", false); - if (skip_corrupted) { - scrub_mode = sstables::compaction_options::scrub::mode::skip; + const sstring scrub_mode_str = req_param(*req, "scrub_mode", ""); + if (scrub_mode_str == "") { + const auto skip_corrupted = req_param(*req, "skip_corrupted", false); + + if (skip_corrupted) { + scrub_mode = sstables::compaction_options::scrub::mode::skip; + } + } else { + if (scrub_mode_str == "ABORT") { + scrub_mode = sstables::compaction_options::scrub::mode::abort; + } else if (scrub_mode_str == "SKIP") { + scrub_mode = sstables::compaction_options::scrub::mode::skip; + } else { + throw std::invalid_argument(fmt::format("Unknown argument for 'scrub_mode' parameter: {}", scrub_mode_str)); + } } auto f = make_ready_future<>(); From a53e6bc6e84be5567053831862ece7c696887a3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 14:40:17 +0300 Subject: [PATCH 5/9] mutation_writer: add segregate_by_partition Add a new segregator which segregates a stream, potentially containing duplicate or even out-of-order partitions, into multiple output streams, such that each output stream has strictly monotonic partitions. This segregator will be used by a new scrub compaction mode which is meant to fix sstables containing duplicate or out-of-order data. --- configure.py | 1 + .../partition_based_splitting_writer.cc | 115 ++++++++++++++++++ .../partition_based_splitting_writer.hh | 39 ++++++ test/boost/mutation_writer_test.cc | 25 ++++ 4 files changed, 180 insertions(+) create mode 100644 mutation_writer/partition_based_splitting_writer.cc create mode 100644 mutation_writer/partition_based_splitting_writer.hh diff --git a/configure.py b/configure.py index e1cfb4cd3f..72e0295fb0 100755 --- a/configure.py +++ b/configure.py @@ -950,6 +950,7 @@ scylla_core = (['database.cc', 'utils/error_injection.cc', 'mutation_writer/timestamp_based_splitting_writer.cc', 'mutation_writer/shard_based_splitting_writer.cc', + 'mutation_writer/partition_based_splitting_writer.cc', 'mutation_writer/feed_writers.cc', 'lua.cc', 'service/raft/schema_raft_state_machine.cc', diff --git a/mutation_writer/partition_based_splitting_writer.cc b/mutation_writer/partition_based_splitting_writer.cc new file mode 100644 index 0000000000..c5c7b56edd --- /dev/null +++ b/mutation_writer/partition_based_splitting_writer.cc @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "mutation_writer/partition_based_splitting_writer.hh" + +namespace mutation_writer { + +class partition_based_splitting_mutation_writer { + struct bucket { + bucket_writer writer; + dht::decorated_key last_key; + }; + +private: + schema_ptr _schema; + reader_permit _permit; + reader_consumer _consumer; + std::vector _buckets; + bucket* _current_bucket = nullptr; + + future<> write_to_bucket(mutation_fragment&& mf) { + return _current_bucket->writer.consume(std::move(mf)); + } + + bucket* create_bucket_for(const dht::decorated_key& key) { + return &_buckets.emplace_back(bucket{bucket_writer(_schema, _permit, _consumer), key}); + } +public: + partition_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _consumer(std::move(consumer)) + {} + + future<> consume(partition_start&& ps) { + if (_buckets.empty()) { + _current_bucket = create_bucket_for(ps.key()); + } else if (dht::ring_position_tri_compare(*_schema, _current_bucket->last_key, ps.key()) < 0) { + // No need to change bucket, just update the last key. + _current_bucket->last_key = ps.key(); + } else { + // Find the first bucket where this partition doesn't cause + // monotonicity violations. Prefer the buckets towards the head of the list. + auto it = std::find_if(_buckets.begin(), _buckets.end(), [this, &ps] (const bucket& b) { + return dht::ring_position_tri_compare(*_schema, b.last_key, ps.key()) < 0; + }); + if (it == _buckets.end()) { + _current_bucket = create_bucket_for(ps.key()); + } else { + _current_bucket = &*it; + _current_bucket->last_key = ps.key(); + } + } + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(ps))); + } + + future<> consume(static_row&& sr) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(sr))); + } + + future<> consume(clustering_row&& cr) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(cr))); + } + + future<> consume(range_tombstone&& rt) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(rt))); + } + + future<> consume(partition_end&& pe) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(pe))); + } + + void consume_end_of_stream() { + for (auto& bucket : _buckets) { + bucket.writer.consume_end_of_stream(); + } + } + void abort(std::exception_ptr ep) { + for (auto& bucket : _buckets) { + bucket.writer.abort(ep); + } + } + future<> close() noexcept { + return parallel_for_each(_buckets, [] (bucket& bucket) { + return bucket.writer.close(); + }); + } +}; + +future<> segregate_by_partition(flat_mutation_reader producer, reader_consumer consumer) { + auto schema = producer.schema(); + auto permit = producer.permit(); + return feed_writer(std::move(producer), + partition_based_splitting_mutation_writer(std::move(schema), std::move(permit), std::move(consumer))); +} + +} // namespace mutation_writer diff --git a/mutation_writer/partition_based_splitting_writer.hh b/mutation_writer/partition_based_splitting_writer.hh new file mode 100644 index 0000000000..8f0cf8882a --- /dev/null +++ b/mutation_writer/partition_based_splitting_writer.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "feed_writers.hh" + +namespace mutation_writer { + +// Given a producer that may contain partitions in the wrong order, or even +// contain partitions multiple times, separate them such that each output +// stream keeps the partition ordering guarantee. In other words, repair +// a stream that violates the ordering requirements by splitting it into output +// streams that honor it. +// This is useful for scrub compaction to split sstables containing out-of-order +// and/or duplicate partitions into sstables that honor the partition ordering. +future<> segregate_by_partition(flat_mutation_reader producer, reader_consumer consumer); + +} // namespace mutation_writer diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index 07999a5fb2..2191c24469 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -31,7 +31,9 @@ #include "flat_mutation_reader.hh" #include "mutation_writer/multishard_writer.hh" #include "mutation_writer/timestamp_based_splitting_writer.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include "test/lib/cql_test_env.hh" +#include "test/lib/flat_mutation_reader_assertions.hh" #include "test/lib/mutation_assertions.hh" #include "test/lib/random_utils.hh" #include "test/lib/random_schema.hh" @@ -427,3 +429,26 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) { BOOST_TEST_PASSPOINT(); } } + +// Check that the partition_based_splitting_mutation_writer can fix reordered partitions +SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 2), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 1)); + + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + + auto input_mutations = tests::generate_random_mutations(random_schema).get(); + input_mutations.emplace_back(*input_mutations.begin()); // Have a duplicate partition as well. + std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen()); + + mutation_writer::segregate_by_partition(flat_mutation_reader_from_mutations(tests::make_permit(), std::move(input_mutations)), [] (flat_mutation_reader rd) { + testlog.info("Checking segregated output stream"); + return async([rd = std::move(rd)] () mutable { + assert_that(std::move(rd)).has_monotonic_positions(); + }); + }).get(); +} From 104a47699c87fa3ed1b864a306095595b402a837 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 4 May 2021 14:34:24 +0300 Subject: [PATCH 6/9] mutation_fragment_stream_validator: add reset methods Allow resetting the validator to a given partition or mutation fragment. This allows a user which is able to fix corrupt streams to reset the validator to a partition or row which the validator normally wouldn't accept and hence it wouldn't advance its internal state to it. --- flat_mutation_reader.cc | 11 +++++++++++ mutation_fragment_stream_validator.hh | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 90a9d51ec5..58e2296d12 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -1086,6 +1086,17 @@ bool mutation_fragment_stream_validator::on_end_of_stream() { return _prev_kind == mutation_fragment::kind::partition_end; } +void mutation_fragment_stream_validator::reset(dht::decorated_key dk) { + _prev_partition_key = dk; + _prev_pos = position_in_partition::for_partition_start(); + _prev_kind = mutation_fragment::kind::partition_start; +} + +void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) { + _prev_pos = mf.position(); + _prev_kind = mf.mutation_fragment_kind(); +} + namespace { [[noreturn]] void on_validation_error(seastar::logger& l, const seastar::sstring& reason) { diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh index b6f877fb2b..964035fddf 100644 --- a/mutation_fragment_stream_validator.hh +++ b/mutation_fragment_stream_validator.hh @@ -95,6 +95,24 @@ public: /// \returns true if the partition key is valid. bool operator()(const dht::decorated_key& dk); + /// Reset the state of the validator to the given partition + /// + /// Reset the state of the validator as if it has just validated a valid + /// partition start with the provided key. This can be used t force a reset + /// to a given partition that is normally invalid and hence wouldn't advance + /// the internal state. This can be used by users that can correct such + /// invalid streams and wish to continue validating it. + void reset(dht::decorated_key dk); + + /// Reset the state of the validator to the given fragment + /// + /// Reset the state of the validator as if it has just validated a valid + /// fragment. This can be used t force a reset to a given fragment that is + /// normally invalid and hence wouldn't advance the internal state. This + /// can be used by users that can correct such invalid streams and wish to + /// continue validating it. + void reset(const mutation_fragment&); + /// Validate that the stream was properly closed. /// /// \returns false if the last partition wasn't closed, i.e. the last From 674a77ead0435cef95714ca22e2323b61c37b813 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 17:51:33 +0300 Subject: [PATCH 7/9] sstables: compaction/scrub: add segregate mode In segregate mode scrub will segregate the content of of input sstables into potentially multiple output sstables such that they respect partition level and fragment level monotonicity requirements. This can be used to fix data where partitions or even fragments are out-of-order or duplicated. In this case no data is lost and after the scrub each sstables contains valid data. Out-of-order partitions are fixed by simply being written into a separate output, compared to the last one compaction was writing into. Out-of-order fragments are fixed by injecting a partition-end/partition-start pair right before them, effectively moving them into a separate (duplicate) partition which is then treated in the above mentioned way. This mode can fix corruptions where partitions are out-of-order or duplicated. This mode cannot fix corruptions where partitions were merged, although data will be made valid from the database level, it won't be on the business-logic level. --- sstables/compaction.cc | 65 ++++++++++++++++++++++++++++--- sstables/compaction_descriptor.hh | 1 + 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 76db1eb4f0..417ebb380d 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -68,6 +68,7 @@ #include "leveled_manifest.hh" #include "dht/token.hh" #include "mutation_writer/shard_based_splitting_writer.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include "mutation_source_metadata.hh" #include "mutation_fragment_stream_validator.hh" @@ -128,6 +129,8 @@ std::string_view to_string(compaction_options::scrub::mode scrub_mode) { return "abort"; case compaction_options::scrub::mode::skip: return "skip"; + case compaction_options::scrub::mode::segregate: + return "segregate"; } } @@ -1160,6 +1163,8 @@ public: class scrub_compaction final : public regular_compaction { class reader : public flat_mutation_reader::impl { + using skip = bool_class; + private: compaction_options::scrub::mode _scrub_mode; flat_mutation_reader _reader; mutation_fragment_stream_validator _validator; @@ -1204,9 +1209,21 @@ class scrub_compaction final : public regular_compaction { } } - void on_invalid_partition(const dht::decorated_key& new_key) { + skip on_invalid_partition(const dht::decorated_key& new_key) { maybe_abort_scrub(); const auto& current_key = _validator.previous_partition_key(); + if (_scrub_mode == compaction_options::scrub::mode::segregate) { + clogger.error("[scrub compaction {}.{}] Detected out-of-order partition {} ({}) (previous being {} ({}))", + _schema->ks_name(), + _schema->cf_name(), + new_key.key().with_schema(*_schema), + new_key, + current_key.key().with_schema(*_schema), + current_key); + _validator.reset(new_key); + // Let the segregating interposer consumer handle this. + return skip::no; + } clogger.error("[scrub compaction {}.{}] Skipping invalid partition {} ({}):" " partition has non-monotonic key compared to current one {} ({})", _schema->ks_name(), @@ -1216,11 +1233,38 @@ class scrub_compaction final : public regular_compaction { current_key.key().with_schema(*_schema), current_key); _skip_to_next_partition = true; + return skip::yes; } - void on_invalid_mutation_fragment(const mutation_fragment& mf) { + skip on_invalid_mutation_fragment(const mutation_fragment& mf) { maybe_abort_scrub(); + const auto& key = _validator.previous_partition_key(); + + // If the unexpected fragment is a partition end, we just drop it. + // The only case a partition end is invalid is when it comes after + // another partition end, and we can just drop it in that case. + if (!mf.is_end_of_partition() && _scrub_mode == compaction_options::scrub::mode::segregate) { + clogger.error("[scrub compaction {}.{}] Injecting partition start/end to segregate out-of-order fragment {} (previous position being {}) in partition {} ({}):", + _schema->ks_name(), + _schema->cf_name(), + mf.position(), + _validator.previous_position(), + key.key().with_schema(*_schema), + key); + + push_mutation_fragment(*_schema, _permit, partition_end{}); + + // We loose the partition tombstone if any, but it will be + // picked up when compaction merges these partitions back. + push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_start(key, {}))); + + _validator.reset(mf); + + // Let the segregating interposer consumer handle this. + return skip::no; + } + clogger.error("[scrub compaction {}.{}] Skipping invalid {} fragment {}in partition {} ({}):" " fragment has non-monotonic position {} compared to previous position {}.", _schema->ks_name(), @@ -1231,6 +1275,7 @@ class scrub_compaction final : public regular_compaction { key, mf.position(), _validator.previous_position()); + return skip::yes; } void on_invalid_end_of_stream() { @@ -1258,15 +1303,13 @@ class scrub_compaction final : public regular_compaction { _skip_to_next_partition = false; // Then check that the partition monotonicity stands. const auto& dk = mf.as_partition_start().key(); - if (!_validator(dk)) { - on_invalid_partition(dk); + if (!_validator(dk) && on_invalid_partition(dk) == skip::yes) { continue; } } else if (_skip_to_next_partition) { continue; } else { - if (!_validator(mf)) { - on_invalid_mutation_fragment(mf); + if (!_validator(mf) && on_invalid_mutation_fragment(mf) == skip::yes) { continue; } } @@ -1356,6 +1399,16 @@ public: return make_flat_mutation_reader(regular_compaction::make_sstable_reader(), _options.operation_mode); } + reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { + return mutation_writer::segregate_by_partition(std::move(reader), std::move(end_consumer)); + }; + } + + bool use_interposer_consumer() const override { + return _options.operation_mode == compaction_options::scrub::mode::segregate; + } + friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode); }; diff --git a/sstables/compaction_descriptor.hh b/sstables/compaction_descriptor.hh index 6ec153525f..927991a782 100644 --- a/sstables/compaction_descriptor.hh +++ b/sstables/compaction_descriptor.hh @@ -76,6 +76,7 @@ public: enum class mode { abort, // abort scrub on the first sign of corruption skip, // skip corrupt data, including range of rows and/or partitions that are out-of-order + segregate, // segregate out-of-order data into streams that all contain data with correct order }; mode operation_mode = mode::abort; }; From 550a1cd0362302305294bc7d4ab860d7e2300d9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 29 Apr 2021 17:52:20 +0300 Subject: [PATCH 8/9] api: storage_service/keyspace_scrub: expose new segregate mode Allow invoking scrub with the newly added segregate mode as well. --- api/api-doc/storage_service.json | 5 +++-- api/storage_service.cc | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index bfd4b06a12..f5b1f27c70 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -762,7 +762,7 @@ "operations":[ { "method":"GET", - "summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output.", + "summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output; Segregate - segregate data into multiple sstables if needed, such that each sstable contains data with valid order.", "type": "long", "nickname":"scrub", "produces":[ @@ -793,7 +793,8 @@ "type":"string", "enum":[ "ABORT", - "SKIP" + "SKIP", + "SEGREGATE" ], "paramType":"query" }, diff --git a/api/storage_service.cc b/api/storage_service.cc index a762a6c2f7..a8818e6390 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1256,6 +1256,8 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ scrub_mode = sstables::compaction_options::scrub::mode::abort; } else if (scrub_mode_str == "SKIP") { scrub_mode = sstables::compaction_options::scrub::mode::skip; + } else if (scrub_mode_str == "SEGREGATE") { + scrub_mode = sstables::compaction_options::scrub::mode::segregate; } else { throw std::invalid_argument(fmt::format("Unknown argument for 'scrub_mode' parameter: {}", scrub_mode_str)); } From 9a32889ac06717b4b159285c7fff875c3af8975f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 4 May 2021 14:47:00 +0300 Subject: [PATCH 9/9] test: boost/sstable_datafile_test: add tests for segregate mode scrub Add two new unit test dedicated to the new segregate scrub mode. --- test/boost/sstable_datafile_test.cc | 325 +++++++++++++++++++++++++++- 1 file changed, 324 insertions(+), 1 deletion(-) diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index d9c926b816..7ac7ad1e1c 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -62,6 +62,7 @@ #include "mutation_compactor.hh" #include "service/priority_manager.hh" #include "db/config.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include #include @@ -5300,7 +5301,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { }); } -SEASTAR_TEST_CASE(sstable_scrub_test) { +SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { cql_test_config test_cfg; auto& db_cfg = *test_cfg.db_config; @@ -5473,6 +5474,328 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { }, test_cfg); } +SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { + cql_test_config test_cfg; + + auto& db_cfg = *test_cfg.db_config; + + // Disable cache to filter out its possible "corrections" to the corrupt sstable. + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + + return do_with_cql_env([this] (cql_test_env& cql_env) -> future<> { + return test_env::do_with_async([this, &cql_env] (test_env& env) { + cell_locker_stats cl_stats; + + auto& db = cql_env.local_db(); + auto& compaction_manager = db.get_compaction_manager(); + + auto schema = schema_builder("ks", get_name()) + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("s", int32_type, column_kind::static_column) + .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); + + auto tmp = tmpdir(); + auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared(1)] () mutable { + return env.make_sstable(schema, tmp.path().string(), (*gen)++); + }; + + std::vector corrupt_fragments; + auto scrubbed_mt = make_lw_shared(schema); + auto sst = sst_gen(); + + testlog.info("Writing sstable {}", sst->get_filename()); + + { + const auto ts = api::timestamp_type{1}; + + auto local_keys = make_local_keys(3, schema); + + auto config = env.manager().configure_writer(); + config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose + auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{}); + + auto make_static_row = [&, schema, ts] (mutation& mut) { + auto r = row{}; + auto cdef = schema->static_column_at(0); + r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + mut.set_static_cell(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + return static_row(*schema, std::move(r)); + }; + + auto make_clustering_row = [&, schema, ts] (unsigned i, mutation* mut) { + auto r = row{}; + auto cdef = schema->regular_column_at(0); + auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))); + r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + if (mut) { + mut->set_clustered_cell(ckey, cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + } + return clustering_row(std::move(ckey), {}, {}, std::move(r)); + }; + + auto write_partition = [&, schema, ts] (int pk) { + auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) }); + auto dkey = dht::decorate_key(*schema, pkey); + + testlog.trace("Writing partition {}", pkey); + + auto mut = mutation(schema, dkey); + corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); + writer.consume_new_partition(dkey); + + { + auto sr = make_static_row(mut); + + testlog.trace("Writing row {}", sr.position()); + + corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); + writer.consume(std::move(sr)); + } + + const unsigned rows_count = 10; + for (unsigned i = 0; i < rows_count; ++i) { + auto cr = make_clustering_row(i, &mut); + + testlog.trace("Writing row {}", cr.position()); + + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); + writer.consume(clustering_row(*schema, cr)); + + // write row twice + if (i == (rows_count / 2)) { + auto bad_cr = make_clustering_row(i - 2, nullptr); + testlog.trace("Writing out-of-order row {}", bad_cr.position()); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr)); + writer.consume(std::move(bad_cr)); + } + } + + testlog.trace("Writing partition_end"); + + corrupt_fragments.emplace_back(*schema, permit, partition_end{}); + writer.consume_end_of_partition(); + scrubbed_mt->apply(mut); + }; + + write_partition(1); + write_partition(0); + write_partition(2); + + testlog.info("Writing done"); + writer.consume_end_of_stream(); + } + sst->load().get(); + + testlog.info("Loaded sstable {}", sst->get_filename()); + + auto cfg = column_family_test_config(env.manager()); + cfg.datadir = tmp.path().string(); + auto table = make_lw_shared(schema, cfg, column_family::no_commitlog(), + db.get_compaction_manager(), cl_stats, db.row_cache_tracker()); + auto stop_table = defer([table] { + table->stop().get(); + }); + table->mark_ready_for_writes(); + table->start(); + + table->add_sstable_and_update_cache(sst).get(); + + BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader(schema, tests::make_permit())); + for (const auto& mf : mfs) { + testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + r.produces(*schema, mf); + } + r.produces_end_of_stream(); + }; + + testlog.info("Verifying written data..."); + + // Make sure we wrote what we though we wrote. + verify_fragments(sst, corrupt_fragments); + + testlog.info("Scrub in abort mode"); + + // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get(); + + BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + verify_fragments(sst, corrupt_fragments); + + testlog.info("Scrub in segregate mode"); + + // We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::segregate).get(); + + testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size()); + BOOST_REQUIRE(table->in_strategy_sstables().size() > 1); + { + auto sst_reader = assert_that(table->as_mutation_source().make_reader(schema, tests::make_permit())); + auto mt_reader = scrubbed_mt->as_data_source().make_reader(schema, tests::make_permit()); + auto mt_reader_close = deferred_close(mt_reader); + while (auto mf_opt = mt_reader(db::no_timeout).get()) { + testlog.trace("Expecting {}", mutation_fragment::printer(*schema, *mf_opt)); + sst_reader.produces(*schema, *mf_opt); + } + sst_reader.produces_end_of_stream(); + } + }); + }, test_cfg); +} + +// Test the scrub_reader in segregate mode and segregate_by_partition together, +// as they are used in scrub compaction in segregate mode. +SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { + simple_schema ss; + auto schema = ss.schema(); + auto permit = tests::make_permit(); + + struct expected_rows_type { + using expected_clustering_rows_type = std::set; + + bool has_static_row = false; + expected_clustering_rows_type clustering_rows; + + explicit expected_rows_type(const ::schema& s) : clustering_rows(s) { } + }; + using expected_partitions_type = std::map; + expected_partitions_type expected_partitions{dht::decorated_key::less_comparator(schema)}; + + std::deque all_fragments; + size_t double_partition_end = 0; + size_t missing_partition_end = 0; + + for (uint32_t p = 0; p < 10; ++p) { + auto dk = ss.make_pkey(tests::random::get_int(0, 8)); + auto it = expected_partitions.find(dk); + + testlog.trace("Generating data for {} partition {}", it == expected_partitions.end() ? "new" : "existing", dk); + + if (it == expected_partitions.end()) { + auto [inserted_it, _] = expected_partitions.emplace(dk, expected_rows_type(*schema)); + it = inserted_it; + } + + all_fragments.emplace_back(*schema, permit, partition_start(dk, {})); + + auto& expected_rows = it->second; + + for (uint32_t r = 0; r < 10; ++r) { + const auto is_clustering_row = tests::random::get_int(0, 8); + if (is_clustering_row) { + auto ck = ss.make_ckey(tests::random::get_int(0, 8)); + testlog.trace("Generating clustering row {}", ck); + + all_fragments.emplace_back(*schema, permit, ss.make_row(ck, "cv")); + expected_rows.clustering_rows.insert(ck); + } else { + testlog.trace("Generating static row"); + + all_fragments.emplace_back(*schema, permit, ss.make_static_row("sv")); + expected_rows.has_static_row = true; + } + } + + const auto partition_end_roll = tests::random::get_int(0, 100); + if (partition_end_roll < 80) { + testlog.trace("Generating partition end"); + all_fragments.emplace_back(*schema, permit, partition_end()); + } else if (partition_end_roll < 90) { + testlog.trace("Generating double partition end"); + ++double_partition_end; + all_fragments.emplace_back(*schema, permit, partition_end()); + all_fragments.emplace_back(*schema, permit, partition_end()); + } else { + testlog.trace("Not generating partition end"); + ++missing_partition_end; + } + } + + { + size_t rows = 0; + for (const auto& part : expected_partitions) { + rows += part.second.clustering_rows.size(); + } + testlog.info("Generated {} partitions (with {} double and {} missing partition ends), {} rows and {} fragments total", expected_partitions.size(), double_partition_end, missing_partition_end, rows, all_fragments.size()); + } + + auto copy_fragments = [&schema] (const std::deque& frags) { + auto permit = tests::make_permit(); + std::deque copied_fragments; + for (const auto& frag : frags) { + copied_fragments.emplace_back(*schema, permit, frag); + } + return copied_fragments; + }; + + std::list> segregated_fragment_streams; + + mutation_writer::segregate_by_partition(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), + sstables::compaction_options::scrub::mode::segregate), [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) { + return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable { + auto close = deferred_close(rd); + auto& fragments = segregated_fragment_streams.emplace_back(); + while (auto mf_opt = rd(db::no_timeout).get()) { + fragments.emplace_back(*schema, rd.permit(), *mf_opt); + } + }); + }).get(); + + testlog.info("Segregation resulted in {} fragment streams", segregated_fragment_streams.size()); + + testlog.info("Checking position monotonicity of segregated streams"); + { + size_t i = 0; + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + testlog.debug("Checking position monotonicity of segregated stream #{}", i++); + assert_that(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))) + .has_monotonic_positions(); + } + } + + testlog.info("Checking position monotonicity of re-combined stream"); + { + std::vector readers; + readers.reserve(segregated_fragment_streams.size()); + + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))); + } + + assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions(); + } + + testlog.info("Checking content of re-combined stream"); + { + std::vector readers; + readers.reserve(segregated_fragment_streams.size()); + + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))); + } + + auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers))); + for (const auto& [pkey, content] : expected_partitions) { + testlog.debug("Checking content of partition {}", pkey); + rd.produces_partition_start(pkey); + if (content.has_static_row) { + rd.produces_static_row(); + } + for (const auto& ckey : content.clustering_rows) { + rd.produces_row_with_key(ckey); + } + rd.produces_partition_end(); + } + rd.produces_end_of_stream(); + } +} + SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto schema = schema_builder("ks", get_name()) .with_column("pk", utf8_type, column_kind::partition_key)