diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 35a45c9b2e..f5b1f27c70 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -762,7 +762,7 @@ "operations":[ { "method":"GET", - "summary":"Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false", + "summary":"Scrub (deserialize + reserialize at the latest version, resolving corruptions if any) the given keyspace. If columnFamilies array is empty, all CFs are scrubbed. Scrubbed CFs will be snapshotted first, if disableSnapshot is false. Scrub has the following modes: Abort (default) - abort scrub if corruption is detected; Skip (same as `skip_corrupted=true`) skip over corrupt data, omitting them from the output; Segregate - segregate data into multiple sstables if needed, such that each sstable contains data with valid order.", "type": "long", "nickname":"scrub", "produces":[ @@ -785,6 +785,19 @@ "type":"boolean", "paramType":"query" }, + { + "name":"scrub_mode", + "description":"How to handle corrupt data (overrides 'skip_corrupted'); ", + "required":false, + "allowMultiple":false, + "type":"string", + "enum":[ + "ABORT", + "SKIP", + "SEGREGATE" + ], + "paramType":"query" + }, { "name":"keyspace", "description":"The keyspace to query about", diff --git a/api/storage_service.cc b/api/storage_service.cc index 953d5dc6d8..3b038069c3 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1242,7 +1242,26 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ }); ss::scrub.set(r, wrap_ks_cf(ctx, [&snap_ctl] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { - const auto skip_corrupted = req_param(*req, "skip_corrupted", false); + auto scrub_mode = sstables::compaction_options::scrub::mode::abort; + + const sstring scrub_mode_str = req_param(*req, "scrub_mode", ""); + if (scrub_mode_str == "") { + const auto skip_corrupted = req_param(*req, "skip_corrupted", false); + + if (skip_corrupted) { + scrub_mode = sstables::compaction_options::scrub::mode::skip; + } + } else { + if (scrub_mode_str == "ABORT") { + scrub_mode = sstables::compaction_options::scrub::mode::abort; + } else if (scrub_mode_str == "SKIP") { + scrub_mode = sstables::compaction_options::scrub::mode::skip; + } else if (scrub_mode_str == "SEGREGATE") { + scrub_mode = sstables::compaction_options::scrub::mode::segregate; + } else { + throw std::invalid_argument(fmt::format("Unknown argument for 'scrub_mode' parameter: {}", scrub_mode_str)); + } + } auto f = make_ready_future<>(); if (!req_param(*req, "disable_snapshot", false)) { @@ -1252,12 +1271,12 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ }); } - return f.then([&ctx, keyspace, column_families, skip_corrupted] { + return f.then([&ctx, keyspace, column_families, scrub_mode] { return ctx.db.invoke_on_all([=] (database& db) { return do_for_each(column_families, [=, &db](sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_scrub(&cf, skip_corrupted); + return cm.perform_sstable_scrub(&cf, scrub_mode); }); }); }).then([]{ diff --git a/configure.py b/configure.py index 5d9a9355e4..0e3538ad13 100755 --- a/configure.py +++ b/configure.py @@ -986,6 +986,7 @@ scylla_core = (['database.cc', 'utils/error_injection.cc', 'mutation_writer/timestamp_based_splitting_writer.cc', 'mutation_writer/shard_based_splitting_writer.cc', + 'mutation_writer/partition_based_splitting_writer.cc', 'mutation_writer/feed_writers.cc', 'lua.cc', 'service/raft/schema_raft_state_machine.cc', diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 90a9d51ec5..58e2296d12 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -1086,6 +1086,17 @@ bool mutation_fragment_stream_validator::on_end_of_stream() { return _prev_kind == mutation_fragment::kind::partition_end; } +void mutation_fragment_stream_validator::reset(dht::decorated_key dk) { + _prev_partition_key = dk; + _prev_pos = position_in_partition::for_partition_start(); + _prev_kind = mutation_fragment::kind::partition_start; +} + +void mutation_fragment_stream_validator::reset(const mutation_fragment& mf) { + _prev_pos = mf.position(); + _prev_kind = mf.mutation_fragment_kind(); +} + namespace { [[noreturn]] void on_validation_error(seastar::logger& l, const seastar::sstring& reason) { diff --git a/mutation_fragment_stream_validator.hh b/mutation_fragment_stream_validator.hh index b6f877fb2b..964035fddf 100644 --- a/mutation_fragment_stream_validator.hh +++ b/mutation_fragment_stream_validator.hh @@ -95,6 +95,24 @@ public: /// \returns true if the partition key is valid. bool operator()(const dht::decorated_key& dk); + /// Reset the state of the validator to the given partition + /// + /// Reset the state of the validator as if it has just validated a valid + /// partition start with the provided key. This can be used t force a reset + /// to a given partition that is normally invalid and hence wouldn't advance + /// the internal state. This can be used by users that can correct such + /// invalid streams and wish to continue validating it. + void reset(dht::decorated_key dk); + + /// Reset the state of the validator to the given fragment + /// + /// Reset the state of the validator as if it has just validated a valid + /// fragment. This can be used t force a reset to a given fragment that is + /// normally invalid and hence wouldn't advance the internal state. This + /// can be used by users that can correct such invalid streams and wish to + /// continue validating it. + void reset(const mutation_fragment&); + /// Validate that the stream was properly closed. /// /// \returns false if the last partition wasn't closed, i.e. the last diff --git a/mutation_writer/partition_based_splitting_writer.cc b/mutation_writer/partition_based_splitting_writer.cc new file mode 100644 index 0000000000..c5c7b56edd --- /dev/null +++ b/mutation_writer/partition_based_splitting_writer.cc @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "mutation_writer/partition_based_splitting_writer.hh" + +namespace mutation_writer { + +class partition_based_splitting_mutation_writer { + struct bucket { + bucket_writer writer; + dht::decorated_key last_key; + }; + +private: + schema_ptr _schema; + reader_permit _permit; + reader_consumer _consumer; + std::vector _buckets; + bucket* _current_bucket = nullptr; + + future<> write_to_bucket(mutation_fragment&& mf) { + return _current_bucket->writer.consume(std::move(mf)); + } + + bucket* create_bucket_for(const dht::decorated_key& key) { + return &_buckets.emplace_back(bucket{bucket_writer(_schema, _permit, _consumer), key}); + } +public: + partition_based_splitting_mutation_writer(schema_ptr schema, reader_permit permit, reader_consumer consumer) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _consumer(std::move(consumer)) + {} + + future<> consume(partition_start&& ps) { + if (_buckets.empty()) { + _current_bucket = create_bucket_for(ps.key()); + } else if (dht::ring_position_tri_compare(*_schema, _current_bucket->last_key, ps.key()) < 0) { + // No need to change bucket, just update the last key. + _current_bucket->last_key = ps.key(); + } else { + // Find the first bucket where this partition doesn't cause + // monotonicity violations. Prefer the buckets towards the head of the list. + auto it = std::find_if(_buckets.begin(), _buckets.end(), [this, &ps] (const bucket& b) { + return dht::ring_position_tri_compare(*_schema, b.last_key, ps.key()) < 0; + }); + if (it == _buckets.end()) { + _current_bucket = create_bucket_for(ps.key()); + } else { + _current_bucket = &*it; + _current_bucket->last_key = ps.key(); + } + } + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(ps))); + } + + future<> consume(static_row&& sr) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(sr))); + } + + future<> consume(clustering_row&& cr) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(cr))); + } + + future<> consume(range_tombstone&& rt) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(rt))); + } + + future<> consume(partition_end&& pe) { + return write_to_bucket(mutation_fragment(*_schema, _permit, std::move(pe))); + } + + void consume_end_of_stream() { + for (auto& bucket : _buckets) { + bucket.writer.consume_end_of_stream(); + } + } + void abort(std::exception_ptr ep) { + for (auto& bucket : _buckets) { + bucket.writer.abort(ep); + } + } + future<> close() noexcept { + return parallel_for_each(_buckets, [] (bucket& bucket) { + return bucket.writer.close(); + }); + } +}; + +future<> segregate_by_partition(flat_mutation_reader producer, reader_consumer consumer) { + auto schema = producer.schema(); + auto permit = producer.permit(); + return feed_writer(std::move(producer), + partition_based_splitting_mutation_writer(std::move(schema), std::move(permit), std::move(consumer))); +} + +} // namespace mutation_writer diff --git a/mutation_writer/partition_based_splitting_writer.hh b/mutation_writer/partition_based_splitting_writer.hh new file mode 100644 index 0000000000..8f0cf8882a --- /dev/null +++ b/mutation_writer/partition_based_splitting_writer.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "feed_writers.hh" + +namespace mutation_writer { + +// Given a producer that may contain partitions in the wrong order, or even +// contain partitions multiple times, separate them such that each output +// stream keeps the partition ordering guarantee. In other words, repair +// a stream that violates the ordering requirements by splitting it into output +// streams that honor it. +// This is useful for scrub compaction to split sstables containing out-of-order +// and/or duplicate partitions into sstables that honor the partition ordering. +future<> segregate_by_partition(flat_mutation_reader producer, reader_consumer consumer); + +} // namespace mutation_writer diff --git a/sstables/compaction.cc b/sstables/compaction.cc index b200bb968f..417ebb380d 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -68,6 +68,7 @@ #include "leveled_manifest.hh" #include "dht/token.hh" #include "mutation_writer/shard_based_splitting_writer.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include "mutation_source_metadata.hh" #include "mutation_fragment_stream_validator.hh" @@ -122,6 +123,21 @@ std::ostream& operator<<(std::ostream& os, compaction_type type) { return os; } +std::string_view to_string(compaction_options::scrub::mode scrub_mode) { + switch (scrub_mode) { + case compaction_options::scrub::mode::abort: + return "abort"; + case compaction_options::scrub::mode::skip: + return "skip"; + case compaction_options::scrub::mode::segregate: + return "segregate"; + } +} + +std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode) { + return os << to_string(scrub_mode); +} + std::ostream& operator<<(std::ostream& os, pretty_printed_data_size data) { static constexpr const char* suffixes[] = { " bytes", "kB", "MB", "GB", "TB", "PB" }; @@ -1147,14 +1163,16 @@ public: class scrub_compaction final : public regular_compaction { class reader : public flat_mutation_reader::impl { - bool _skip_corrupted; + using skip = bool_class; + private: + compaction_options::scrub::mode _scrub_mode; flat_mutation_reader _reader; mutation_fragment_stream_validator _validator; bool _skip_to_next_partition = false; private: void maybe_abort_scrub() { - if (!_skip_corrupted) { + if (_scrub_mode == compaction_options::scrub::mode::abort) { throw compaction_stop_exception(_schema->ks_name(), _schema->cf_name(), "scrub compaction found invalid data", false); } } @@ -1191,9 +1209,21 @@ class scrub_compaction final : public regular_compaction { } } - void on_invalid_partition(const dht::decorated_key& new_key) { + skip on_invalid_partition(const dht::decorated_key& new_key) { maybe_abort_scrub(); const auto& current_key = _validator.previous_partition_key(); + if (_scrub_mode == compaction_options::scrub::mode::segregate) { + clogger.error("[scrub compaction {}.{}] Detected out-of-order partition {} ({}) (previous being {} ({}))", + _schema->ks_name(), + _schema->cf_name(), + new_key.key().with_schema(*_schema), + new_key, + current_key.key().with_schema(*_schema), + current_key); + _validator.reset(new_key); + // Let the segregating interposer consumer handle this. + return skip::no; + } clogger.error("[scrub compaction {}.{}] Skipping invalid partition {} ({}):" " partition has non-monotonic key compared to current one {} ({})", _schema->ks_name(), @@ -1203,11 +1233,38 @@ class scrub_compaction final : public regular_compaction { current_key.key().with_schema(*_schema), current_key); _skip_to_next_partition = true; + return skip::yes; } - void on_invalid_mutation_fragment(const mutation_fragment& mf) { + skip on_invalid_mutation_fragment(const mutation_fragment& mf) { maybe_abort_scrub(); + const auto& key = _validator.previous_partition_key(); + + // If the unexpected fragment is a partition end, we just drop it. + // The only case a partition end is invalid is when it comes after + // another partition end, and we can just drop it in that case. + if (!mf.is_end_of_partition() && _scrub_mode == compaction_options::scrub::mode::segregate) { + clogger.error("[scrub compaction {}.{}] Injecting partition start/end to segregate out-of-order fragment {} (previous position being {}) in partition {} ({}):", + _schema->ks_name(), + _schema->cf_name(), + mf.position(), + _validator.previous_position(), + key.key().with_schema(*_schema), + key); + + push_mutation_fragment(*_schema, _permit, partition_end{}); + + // We loose the partition tombstone if any, but it will be + // picked up when compaction merges these partitions back. + push_mutation_fragment(mutation_fragment(*_schema, _permit, partition_start(key, {}))); + + _validator.reset(mf); + + // Let the segregating interposer consumer handle this. + return skip::no; + } + clogger.error("[scrub compaction {}.{}] Skipping invalid {} fragment {}in partition {} ({}):" " fragment has non-monotonic position {} compared to previous position {}.", _schema->ks_name(), @@ -1218,6 +1275,7 @@ class scrub_compaction final : public regular_compaction { key, mf.position(), _validator.previous_position()); + return skip::yes; } void on_invalid_end_of_stream() { @@ -1245,15 +1303,13 @@ class scrub_compaction final : public regular_compaction { _skip_to_next_partition = false; // Then check that the partition monotonicity stands. const auto& dk = mf.as_partition_start().key(); - if (!_validator(dk)) { - on_invalid_partition(dk); + if (!_validator(dk) && on_invalid_partition(dk) == skip::yes) { continue; } } else if (_skip_to_next_partition) { continue; } else { - if (!_validator(mf)) { - on_invalid_mutation_fragment(mf); + if (!_validator(mf) && on_invalid_mutation_fragment(mf) == skip::yes) { continue; } } @@ -1270,13 +1326,16 @@ class scrub_compaction final : public regular_compaction { } public: - reader(flat_mutation_reader underlying, bool skip_corrupted) + reader(flat_mutation_reader underlying, compaction_options::scrub::mode scrub_mode) : impl(underlying.schema(), underlying.permit()) - , _skip_corrupted(skip_corrupted) + , _scrub_mode(scrub_mode) , _reader(std::move(underlying)) , _validator(*_schema) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { + if (_end_of_stream) { + return make_ready_future<>(); + } return repeat([this, timeout] { return _reader.fill_buffer(timeout).then([this] { fill_buffer_from_underlying(); @@ -1317,30 +1376,44 @@ class scrub_compaction final : public regular_compaction { private: compaction_options::scrub _options; + std::string _scrub_start_description; + std::string _scrub_finish_description; public: scrub_compaction(column_family& cf, compaction_descriptor descriptor, compaction_options::scrub options) : regular_compaction(cf, std::move(descriptor)) - , _options(options) { + , _options(options) + , _scrub_start_description(fmt::format("Scrubbing in {} mode", _options.operation_mode)) + , _scrub_finish_description(fmt::format("Finished scrubbing in {} mode", _options.operation_mode)) { } std::string_view report_start_desc() const override { - return "Scrubbing"; + return _scrub_start_description; } std::string_view report_finish_desc() const override { - return "Finished scrubbing"; + return _scrub_finish_description; } flat_mutation_reader make_sstable_reader() const override { - return make_flat_mutation_reader(regular_compaction::make_sstable_reader(), _options.skip_corrupted); + return make_flat_mutation_reader(regular_compaction::make_sstable_reader(), _options.operation_mode); } - friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted); + reader_consumer make_interposer_consumer(reader_consumer end_consumer) override { + return [this, end_consumer = std::move(end_consumer)] (flat_mutation_reader reader) mutable -> future<> { + return mutation_writer::segregate_by_partition(std::move(reader), std::move(end_consumer)); + }; + } + + bool use_interposer_consumer() const override { + return _options.operation_mode == compaction_options::scrub::mode::segregate; + } + + friend flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode); }; -flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted) { - return make_flat_mutation_reader(std::move(rd), skip_corrupted); +flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode) { + return make_flat_mutation_reader(std::move(rd), scrub_mode); } class resharding_compaction final : public compaction { diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 4f28985872..84ac15d9aa 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -111,5 +111,5 @@ namespace sstables { get_fully_expired_sstables(column_family& cf, const std::vector& compacting, gc_clock::time_point gc_before); // For tests, can drop after we virtualize sstables. - flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, bool skip_corrupted); + flat_mutation_reader make_scrubbing_reader(flat_mutation_reader rd, compaction_options::scrub::mode scrub_mode); } diff --git a/sstables/compaction_descriptor.hh b/sstables/compaction_descriptor.hh index c7b7379d79..927991a782 100644 --- a/sstables/compaction_descriptor.hh +++ b/sstables/compaction_descriptor.hh @@ -73,7 +73,12 @@ public: std::reference_wrapper db; }; struct scrub { - bool skip_corrupted; + enum class mode { + abort, // abort scrub on the first sign of corruption + skip, // skip corrupt data, including range of rows and/or partitions that are out-of-order + segregate, // segregate out-of-order data into streams that all contain data with correct order + }; + mode operation_mode = mode::abort; }; struct reshard { }; @@ -110,8 +115,8 @@ public: return compaction_options(upgrade{db}); } - static compaction_options make_scrub(bool skip_corrupted) { - return compaction_options(scrub{skip_corrupted}); + static compaction_options make_scrub(scrub::mode mode) { + return compaction_options(scrub{mode}); } template @@ -122,6 +127,9 @@ public: compaction_type type() const; }; +std::string_view to_string(compaction_options::scrub::mode); +std::ostream& operator<<(std::ostream& os, compaction_options::scrub::mode scrub_mode); + struct compaction_descriptor { // List of sstables to be compacted. std::vector sstables; diff --git a/sstables/compaction_manager.cc b/sstables/compaction_manager.cc index 816b83bb8b..26a702b8b8 100644 --- a/sstables/compaction_manager.cc +++ b/sstables/compaction_manager.cc @@ -830,8 +830,8 @@ future<> compaction_manager::perform_sstable_upgrade(database& db, column_family } // Submit a column family to be scrubbed and wait for its termination. -future<> compaction_manager::perform_sstable_scrub(column_family* cf, bool skip_corrupted) { - return rewrite_sstables(cf, sstables::compaction_options::make_scrub(skip_corrupted), [this] (const table& cf) { +future<> compaction_manager::perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode) { + return rewrite_sstables(cf, sstables::compaction_options::make_scrub(scrub_mode), [this] (const table& cf) { return get_candidates(cf); }); } diff --git a/sstables/compaction_manager.hh b/sstables/compaction_manager.hh index 5117448f66..c65eda8eea 100644 --- a/sstables/compaction_manager.hh +++ b/sstables/compaction_manager.hh @@ -210,7 +210,7 @@ public: future<> perform_sstable_upgrade(database& db, column_family* cf, bool exclude_current_version); // Submit a column family to be scrubbed and wait for its termination. - future<> perform_sstable_scrub(column_family* cf, bool skip_corrupted); + future<> perform_sstable_scrub(column_family* cf, sstables::compaction_options::scrub::mode scrub_mode); // Submit a column family for major compaction. future<> submit_major_compaction(column_family* cf); diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index 07999a5fb2..2191c24469 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -31,7 +31,9 @@ #include "flat_mutation_reader.hh" #include "mutation_writer/multishard_writer.hh" #include "mutation_writer/timestamp_based_splitting_writer.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include "test/lib/cql_test_env.hh" +#include "test/lib/flat_mutation_reader_assertions.hh" #include "test/lib/mutation_assertions.hh" #include "test/lib/random_utils.hh" #include "test/lib/random_schema.hh" @@ -427,3 +429,26 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer_abort) { BOOST_TEST_PASSPOINT(); } } + +// Check that the partition_based_splitting_mutation_writer can fix reordered partitions +SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer) { + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 2), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 1)); + + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + + auto input_mutations = tests::generate_random_mutations(random_schema).get(); + input_mutations.emplace_back(*input_mutations.begin()); // Have a duplicate partition as well. + std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen()); + + mutation_writer::segregate_by_partition(flat_mutation_reader_from_mutations(tests::make_permit(), std::move(input_mutations)), [] (flat_mutation_reader rd) { + testlog.info("Checking segregated output stream"); + return async([rd = std::move(rd)] () mutable { + assert_that(std::move(rd)).has_monotonic_positions(); + }); + }).get(); +} diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 90e7e3d4cb..919422e547 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -62,6 +62,7 @@ #include "mutation_compactor.hh" #include "service/priority_manager.hh" #include "db/config.hh" +#include "mutation_writer/partition_based_splitting_writer.hh" #include #include @@ -5300,7 +5301,7 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { }); } -SEASTAR_TEST_CASE(sstable_scrub_test) { +SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { cql_test_config test_cfg; auto& db_cfg = *test_cfg.db_config; @@ -5321,6 +5322,7 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { .with_column("ck", int32_type, column_kind::clustering_key) .with_column("s", int32_type, column_kind::static_column) .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); auto tmp = tmpdir(); auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared(1)] () mutable { @@ -5365,9 +5367,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing partition {}", pkey.with_schema(*schema)); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {})); + scrubbed_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_start(dkey, {})); + corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); writer.consume_new_partition(dkey); { @@ -5376,9 +5378,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing row {}", sr.position()); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr)); + scrubbed_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), static_row(*schema, sr)); + corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); writer.consume(std::move(sr)); } @@ -5389,16 +5391,16 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing row {}", cr.position()); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr)); + scrubbed_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, cr)); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); writer.consume(clustering_row(*schema, cr)); // write row twice if (i == (rows_count / 2)) { auto bad_cr = make_clustering_row(i - 2); testlog.trace("Writing out-of-order row {}", bad_cr.position()); - corrupt_fragments.emplace_back(*schema, tests::make_permit(), clustering_row(*schema, bad_cr)); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr)); writer.consume(std::move(bad_cr)); } } @@ -5406,9 +5408,9 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { testlog.trace("Writing partition_end"); if (write_to_scrubbed) { - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); } - corrupt_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); + corrupt_fragments.emplace_back(*schema, permit, partition_end{}); writer.consume_end_of_partition(); }; @@ -5438,6 +5440,172 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader(schema, permit)); + for (const auto& mf : mfs) { + testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + r.produces(*schema, mf); + } + r.produces_end_of_stream(); + }; + + testlog.info("Verifying written data..."); + + // Make sure we wrote what we though we wrote. + verify_fragments(sst, corrupt_fragments); + + testlog.info("Scrub in abort mode"); + + // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get(); + + BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + verify_fragments(sst, corrupt_fragments); + + testlog.info("Scrub in skip mode"); + + // We expect the scrub with mode=srub::mode::skip to get rid of all invalid data. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::skip).get(); + + BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + BOOST_REQUIRE(table->in_strategy_sstables().front() != sst); + verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments); + }); + }, test_cfg); +} + +SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { + cql_test_config test_cfg; + + auto& db_cfg = *test_cfg.db_config; + + // Disable cache to filter out its possible "corrections" to the corrupt sstable. + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + + return do_with_cql_env([this] (cql_test_env& cql_env) -> future<> { + return test_env::do_with_async([this, &cql_env] (test_env& env) { + cell_locker_stats cl_stats; + + auto& db = cql_env.local_db(); + auto& compaction_manager = db.get_compaction_manager(); + + auto schema = schema_builder("ks", get_name()) + .with_column("pk", utf8_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("s", int32_type, column_kind::static_column) + .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); + + auto tmp = tmpdir(); + auto sst_gen = [&env, schema, &tmp, gen = make_lw_shared(1)] () mutable { + return env.make_sstable(schema, tmp.path().string(), (*gen)++); + }; + + std::vector corrupt_fragments; + auto scrubbed_mt = make_lw_shared(schema); + auto sst = sst_gen(); + + testlog.info("Writing sstable {}", sst->get_filename()); + + { + const auto ts = api::timestamp_type{1}; + + auto local_keys = make_local_keys(3, schema); + + auto config = env.manager().configure_writer(); + config.validation_level = mutation_fragment_stream_validation_level::partition_region; // this test violates key order on purpose + auto writer = sst->get_writer(*schema, local_keys.size(), config, encoding_stats{}); + + auto make_static_row = [&, schema, ts] (mutation& mut) { + auto r = row{}; + auto cdef = schema->static_column_at(0); + r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + mut.set_static_cell(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + return static_row(*schema, std::move(r)); + }; + + auto make_clustering_row = [&, schema, ts] (unsigned i, mutation* mut) { + auto r = row{}; + auto cdef = schema->regular_column_at(0); + auto ckey = clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))); + r.apply(cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + if (mut) { + mut->set_clustered_cell(ckey, cdef, atomic_cell_or_collection{atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)))}); + } + return clustering_row(std::move(ckey), {}, {}, std::move(r)); + }; + + auto write_partition = [&, schema, ts] (int pk) { + auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) }); + auto dkey = dht::decorate_key(*schema, pkey); + + testlog.trace("Writing partition {}", pkey); + + auto mut = mutation(schema, dkey); + corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); + writer.consume_new_partition(dkey); + + { + auto sr = make_static_row(mut); + + testlog.trace("Writing row {}", sr.position()); + + corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); + writer.consume(std::move(sr)); + } + + const unsigned rows_count = 10; + for (unsigned i = 0; i < rows_count; ++i) { + auto cr = make_clustering_row(i, &mut); + + testlog.trace("Writing row {}", cr.position()); + + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); + writer.consume(clustering_row(*schema, cr)); + + // write row twice + if (i == (rows_count / 2)) { + auto bad_cr = make_clustering_row(i - 2, nullptr); + testlog.trace("Writing out-of-order row {}", bad_cr.position()); + corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr)); + writer.consume(std::move(bad_cr)); + } + } + + testlog.trace("Writing partition_end"); + + corrupt_fragments.emplace_back(*schema, permit, partition_end{}); + writer.consume_end_of_partition(); + scrubbed_mt->apply(mut); + }; + + write_partition(1); + write_partition(0); + write_partition(2); + + testlog.info("Writing done"); + writer.consume_end_of_stream(); + } + sst->load().get(); + + testlog.info("Loaded sstable {}", sst->get_filename()); + + auto cfg = column_family_test_config(env.manager()); + cfg.datadir = tmp.path().string(); + auto table = make_lw_shared(schema, cfg, column_family::no_commitlog(), + db.get_compaction_manager(), cl_stats, db.row_cache_tracker()); + auto stop_table = defer([table] { + table->stop().get(); + }); + table->mark_ready_for_writes(); + table->start(); + + table->add_sstable_and_update_cache(sst).get(); + + BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { auto r = assert_that(sst->as_mutation_source().make_reader(schema, tests::make_permit())); for (const auto& mf : mfs) { @@ -5452,32 +5620,189 @@ SEASTAR_TEST_CASE(sstable_scrub_test) { // Make sure we wrote what we though we wrote. verify_fragments(sst, corrupt_fragments); - testlog.info("Scrub with --skip-corrupted=false"); + testlog.info("Scrub in abort mode"); - // We expect the scrub with skip_corrupted=false to stop on the first invalid fragment. - compaction_manager.perform_sstable_scrub(table.get(), false).get(); + // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::abort).get(); BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); verify_fragments(sst, corrupt_fragments); - testlog.info("Scrub with --skip-corrupted=true"); + testlog.info("Scrub in segregate mode"); - // We expect the scrub with skip_corrupted=true to get rid of all invalid data. - compaction_manager.perform_sstable_scrub(table.get(), true).get(); + // We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data. + compaction_manager.perform_sstable_scrub(table.get(), sstables::compaction_options::scrub::mode::segregate).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() != sst); - verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments); + testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size()); + BOOST_REQUIRE(table->in_strategy_sstables().size() > 1); + { + auto sst_reader = assert_that(table->as_mutation_source().make_reader(schema, tests::make_permit())); + auto mt_reader = scrubbed_mt->as_data_source().make_reader(schema, tests::make_permit()); + auto mt_reader_close = deferred_close(mt_reader); + while (auto mf_opt = mt_reader(db::no_timeout).get()) { + testlog.trace("Expecting {}", mutation_fragment::printer(*schema, *mf_opt)); + sst_reader.produces(*schema, *mf_opt); + } + sst_reader.produces_end_of_stream(); + } }); }, test_cfg); } +// Test the scrub_reader in segregate mode and segregate_by_partition together, +// as they are used in scrub compaction in segregate mode. +SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { + simple_schema ss; + auto schema = ss.schema(); + auto permit = tests::make_permit(); + + struct expected_rows_type { + using expected_clustering_rows_type = std::set; + + bool has_static_row = false; + expected_clustering_rows_type clustering_rows; + + explicit expected_rows_type(const ::schema& s) : clustering_rows(s) { } + }; + using expected_partitions_type = std::map; + expected_partitions_type expected_partitions{dht::decorated_key::less_comparator(schema)}; + + std::deque all_fragments; + size_t double_partition_end = 0; + size_t missing_partition_end = 0; + + for (uint32_t p = 0; p < 10; ++p) { + auto dk = ss.make_pkey(tests::random::get_int(0, 8)); + auto it = expected_partitions.find(dk); + + testlog.trace("Generating data for {} partition {}", it == expected_partitions.end() ? "new" : "existing", dk); + + if (it == expected_partitions.end()) { + auto [inserted_it, _] = expected_partitions.emplace(dk, expected_rows_type(*schema)); + it = inserted_it; + } + + all_fragments.emplace_back(*schema, permit, partition_start(dk, {})); + + auto& expected_rows = it->second; + + for (uint32_t r = 0; r < 10; ++r) { + const auto is_clustering_row = tests::random::get_int(0, 8); + if (is_clustering_row) { + auto ck = ss.make_ckey(tests::random::get_int(0, 8)); + testlog.trace("Generating clustering row {}", ck); + + all_fragments.emplace_back(*schema, permit, ss.make_row(ck, "cv")); + expected_rows.clustering_rows.insert(ck); + } else { + testlog.trace("Generating static row"); + + all_fragments.emplace_back(*schema, permit, ss.make_static_row("sv")); + expected_rows.has_static_row = true; + } + } + + const auto partition_end_roll = tests::random::get_int(0, 100); + if (partition_end_roll < 80) { + testlog.trace("Generating partition end"); + all_fragments.emplace_back(*schema, permit, partition_end()); + } else if (partition_end_roll < 90) { + testlog.trace("Generating double partition end"); + ++double_partition_end; + all_fragments.emplace_back(*schema, permit, partition_end()); + all_fragments.emplace_back(*schema, permit, partition_end()); + } else { + testlog.trace("Not generating partition end"); + ++missing_partition_end; + } + } + + { + size_t rows = 0; + for (const auto& part : expected_partitions) { + rows += part.second.clustering_rows.size(); + } + testlog.info("Generated {} partitions (with {} double and {} missing partition ends), {} rows and {} fragments total", expected_partitions.size(), double_partition_end, missing_partition_end, rows, all_fragments.size()); + } + + auto copy_fragments = [&schema] (const std::deque& frags) { + auto permit = tests::make_permit(); + std::deque copied_fragments; + for (const auto& frag : frags) { + copied_fragments.emplace_back(*schema, permit, frag); + } + return copied_fragments; + }; + + std::list> segregated_fragment_streams; + + mutation_writer::segregate_by_partition(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), + sstables::compaction_options::scrub::mode::segregate), [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) { + return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable { + auto close = deferred_close(rd); + auto& fragments = segregated_fragment_streams.emplace_back(); + while (auto mf_opt = rd(db::no_timeout).get()) { + fragments.emplace_back(*schema, rd.permit(), *mf_opt); + } + }); + }).get(); + + testlog.info("Segregation resulted in {} fragment streams", segregated_fragment_streams.size()); + + testlog.info("Checking position monotonicity of segregated streams"); + { + size_t i = 0; + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + testlog.debug("Checking position monotonicity of segregated stream #{}", i++); + assert_that(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))) + .has_monotonic_positions(); + } + } + + testlog.info("Checking position monotonicity of re-combined stream"); + { + std::vector readers; + readers.reserve(segregated_fragment_streams.size()); + + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))); + } + + assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions(); + } + + testlog.info("Checking content of re-combined stream"); + { + std::vector readers; + readers.reserve(segregated_fragment_streams.size()); + + for (const auto& segregated_fragment_stream : segregated_fragment_streams) { + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(copy_fragments(segregated_fragment_stream)))); + } + + auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers))); + for (const auto& [pkey, content] : expected_partitions) { + testlog.debug("Checking content of partition {}", pkey); + rd.produces_partition_start(pkey); + if (content.has_static_row) { + rd.produces_static_row(); + } + for (const auto& ckey : content.clustering_rows) { + rd.produces_row_with_key(ckey); + } + rd.produces_partition_end(); + } + rd.produces_end_of_stream(); + } +} + SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto schema = schema_builder("ks", get_name()) .with_column("pk", utf8_type, column_kind::partition_key) .with_column("ck", int32_type, column_kind::clustering_key) .with_column("s", int32_type, column_kind::static_column) .with_column("v", int32_type).build(); + auto permit = tests::make_permit(); std::deque corrupt_fragments; std::deque scrubbed_fragments; @@ -5488,7 +5813,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto make_partition_start = [&, schema] (unsigned pk) { auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) }); auto dkey = dht::decorate_key(*schema, pkey); - return mutation_fragment(*schema, tests::make_permit(), partition_start(std::move(dkey), {})); + return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {})); }; auto make_static_row = [&, schema, ts] { @@ -5496,7 +5821,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->static_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, tests::make_permit(), static_row(*schema, std::move(r))); + return mutation_fragment(*schema, permit, static_row(*schema, std::move(r))); }; auto make_clustering_row = [&, schema, ts] (unsigned i) { @@ -5504,12 +5829,12 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->regular_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, tests::make_permit(), + return mutation_fragment(*schema, permit, clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r))); }; auto add_fragment = [&, schema] (mutation_fragment mf, bool add_to_scrubbed = true) { - corrupt_fragments.emplace_back(mutation_fragment(*schema, tests::make_permit(), mf)); + corrupt_fragments.emplace_back(mutation_fragment(*schema, permit, mf)); if (add_to_scrubbed) { scrubbed_fragments.emplace_back(std::move(mf)); } @@ -5521,7 +5846,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(0)); add_fragment(make_clustering_row(2)); add_fragment(make_clustering_row(1), false); // out-of-order clustering key - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end // Partition 2 add_fragment(make_partition_start(2)); @@ -5529,7 +5854,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(0)); add_fragment(make_clustering_row(1)); add_fragment(make_static_row(), false); // out-of-order static row - add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{})); + add_fragment(mutation_fragment(*schema, permit, partition_end{})); // Partition 1 - out-of-order add_fragment(make_partition_start(1), false); @@ -5538,7 +5863,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(1), false); add_fragment(make_clustering_row(2), false); add_fragment(make_clustering_row(3), false); - add_fragment(mutation_fragment(*schema, tests::make_permit(), partition_end{}), false); + add_fragment(mutation_fragment(*schema, permit, partition_end{}), false); // Partition 3 add_fragment(make_partition_start(3)); @@ -5547,9 +5872,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(1)); add_fragment(make_clustering_row(2)); add_fragment(make_clustering_row(3)); - scrubbed_fragments.emplace_back(*schema, tests::make_permit(), partition_end{}); // missing partition-end - at EOS + scrubbed_fragments.emplace_back(*schema, permit, partition_end{}); // missing partition-end - at EOS - auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, tests::make_permit(), std::move(corrupt_fragments)), true)); + auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), + compaction_options::scrub::mode::skip)); for (const auto& mf : scrubbed_fragments) { testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf)); r.produces(*schema, mf);