diff --git a/configure.py b/configure.py index fdf5c46b27..f8e6deaf46 100755 --- a/configure.py +++ b/configure.py @@ -981,6 +981,7 @@ scylla_core = (['message/messaging_service.cc', 'cql3/result_set.cc', 'cql3/prepare_context.cc', 'db/batchlog_manager.cc', + 'db/corrupt_data_handler.cc', 'db/commitlog/commitlog.cc', 'db/commitlog/commitlog_entry.cc', 'db/commitlog/commitlog_replayer.cc', @@ -1341,6 +1342,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/replica_exception.idl.hh', 'idl/per_partition_rate_limit_info.idl.hh', 'idl/position_in_partition.idl.hh', + 'idl/full_position.idl.hh', 'idl/experimental/broadcast_tables_lang.idl.hh', 'idl/storage_service.idl.hh', 'idl/join_node.idl.hh', diff --git a/db/CMakeLists.txt b/db/CMakeLists.txt index 0ad9ffdcc7..3440b8544e 100644 --- a/db/CMakeLists.txt +++ b/db/CMakeLists.txt @@ -27,6 +27,7 @@ target_sources(db extensions.cc heat_load_balance.cc large_data_handler.cc + corrupt_data_handler.cc marshal/type_parser.cc batchlog_manager.cc tags/utils.cc diff --git a/db/corrupt_data_handler.cc b/db/corrupt_data_handler.cc new file mode 100644 index 0000000000..441a596867 --- /dev/null +++ b/db/corrupt_data_handler.cc @@ -0,0 +1,134 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "db/corrupt_data_handler.hh" +#include "reader_concurrency_semaphore.hh" +#include "replica/database.hh" +#include "utils/UUID_gen.hh" + +static logging::logger corrupt_data_logger("corrupt_data"); + +namespace sm = seastar::metrics; + +namespace db { + +corrupt_data_handler::corrupt_data_handler(register_metrics rm) { + if (rm) { + _metrics.add_group("corrupt_data", { + sm::make_counter("entries_reported", _stats.corrupt_data_reported, + sm::description("Counts the number of corrupt data instances reported to the corrupt data handler. " + "A non-zero value indicates that the database suffered data corruption.")) + }); + } +} + +future corrupt_data_handler::record_corrupt_clustering_row(const schema& s, const partition_key& pk, + clustering_row cr, sstring origin, std::optional sstable_name) { + ++_stats.corrupt_data_reported; + ++_stats.corrupt_clustering_rows_reported; + return do_record_corrupt_clustering_row(s, pk, std::move(cr), std::move(origin), std::move(sstable_name)).then([this] (entry_id id) { + if (id) { + ++_stats.corrupt_data_recorded; + ++_stats.corrupt_clustering_rows_recorded; + } + return id; + }); +} + +system_table_corrupt_data_handler::system_table_corrupt_data_handler(config cfg, register_metrics rm) + : corrupt_data_handler(rm) + , _entry_ttl(cfg.entry_ttl) + , _sys_ks("system_table_corrupt_data_handler::system_keyspace") +{ +} + +system_table_corrupt_data_handler::~system_table_corrupt_data_handler() { +} + +reader_permit system_table_corrupt_data_handler::make_fragment_permit(const schema& s) { + return _fragment_semaphore->make_tracking_only_permit(s.shared_from_this(), "system_table_corrupt_data_handler::make_fragment_permit", db::no_timeout, {}); +} + +future system_table_corrupt_data_handler::do_record_corrupt_mutation_fragment( + pluggable_system_keyspace::permit sys_ks, + const schema& user_table_schema, + const partition_key& pk, + const clustering_key& ck, + mutation_fragment_v2::kind kind, + frozen_mutation_fragment_v2 fmf, + sstring origin, + std::optional sstable_name) { + const corrupt_data_handler::entry_id id{utils::UUID_gen::get_time_UUID()}; + + const auto corrupt_data_schema = sys_ks->local_db().find_column_family(system_keyspace::NAME, system_keyspace::CORRUPT_DATA).schema(); + + // Using the lower-level mutation API to avoid large allocation warnings when linearizing the frozen mutation fragment. + mutation entry_mutation(corrupt_data_schema, partition_key::from_exploded(*corrupt_data_schema, {serialized(user_table_schema.ks_name()), serialized(user_table_schema.cf_name())})); + auto& entry_row = entry_mutation.partition().clustered_row(*corrupt_data_schema, clustering_key::from_single_value(*corrupt_data_schema, serialized(timeuuid_native_type{id.uuid()}))); + + const auto timestamp = api::new_timestamp(); + + auto set_cell_raw = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, managed_bytes cell_value) { + auto cdef = corrupt_data_schema->get_column_definition(cell_name); + SCYLLA_ASSERT(cdef); + + entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value, _entry_ttl)); + }; + + auto set_cell = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, data_value cell_value) { + auto cdef = corrupt_data_schema->get_column_definition(cell_name); + SCYLLA_ASSERT(cdef); + + entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value.serialize_nonnull(), _entry_ttl)); + }; + + entry_row.apply(row_marker(timestamp, _entry_ttl, gc_clock::now() + _entry_ttl)); + set_cell("partition_key", data_value(to_bytes(pk.representation()))); + set_cell("clustering_key", data_value(to_bytes(ck.representation()))); + set_cell("mutation_fragment_kind", fmt::to_string(kind)); + // FIXME: Exposing knowledge here that bytes are serialized by just storing the raw value. + // Need to replace with a fragmented-buffer serialize API call, which we don't have yet. + set_cell_raw("frozen_mutation_fragment", std::move(fmf).representation().to_managed_bytes()); + set_cell("origin", origin); + set_cell("sstable_name", sstable_name); + + return sys_ks->apply_mutation(std::move(entry_mutation)).then([id] { + return id; + }); +} + +future system_table_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, + clustering_row cr, sstring origin, std::optional sstable_name) { + auto sys_ks = _sys_ks.get_permit(); + if (!sys_ks) { + co_return corrupt_data_handler::entry_id::create_null_id(); + } + + const auto ck = cr.key(); + auto fmf = freeze(s, mutation_fragment_v2(s, make_fragment_permit(s), std::move(cr))); + + co_return co_await do_record_corrupt_mutation_fragment(std::move(sys_ks), s, pk, ck, mutation_fragment_v2::kind::clustering_row, std::move(fmf), + std::move(origin), std::move(sstable_name)); +} + +void system_table_corrupt_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept { + _sys_ks.plug(sys_ks.shared_from_this()); + _fragment_semaphore = std::make_unique(reader_concurrency_semaphore::no_limits{}, "system_table_corrupt_data_handler", reader_concurrency_semaphore::register_metrics::no); +} + +future<> system_table_corrupt_data_handler::unplug_system_keyspace() noexcept { + co_await _sys_ks.unplug(); + co_await _fragment_semaphore->stop(); +} + +future nop_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, + clustering_row cr, sstring origin, std::optional sstable_name) { + return make_ready_future(entry_id::create_null_id()); +} + +} // namespace db diff --git a/db/corrupt_data_handler.hh b/db/corrupt_data_handler.hh new file mode 100644 index 0000000000..00750deb6d --- /dev/null +++ b/db/corrupt_data_handler.hh @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#pragma once + +#include "db/system_keyspace.hh" +#include "utils/UUID.hh" +#include "utils/pluggable.hh" + +class reader_concurrency_semaphore; +class reader_permit; + +namespace db { + +class corrupt_data_handler { +public: + // An ID identifying the corrupt data entry. + // To be interpreted in the context of the storage where it is recorded, see storage_name(). + using entry_id = utils::tagged_uuid; + + struct stats { + // Counters for the number of corrupt data entries reported. + uint64_t corrupt_data_reported = 0; + // Counters for the number of corrupt data entries recorded. + // Can be less than reported depending on the configuration or if entries failed to be recorded. + uint64_t corrupt_data_recorded = 0; + + uint64_t corrupt_clustering_rows_reported = 0; + uint64_t corrupt_clustering_rows_recorded = 0; + }; + +private: + stats _stats; + + seastar::metrics::metric_groups _metrics; + +protected: + virtual future do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional sstable_name) = 0; + +public: + using register_metrics = bool_class; + explicit corrupt_data_handler(register_metrics); + virtual ~corrupt_data_handler() = default; + + const stats& get_stats() const noexcept { + return _stats; + } + + // The name of the storage where corrupt data is recorded. + // The storage-name and the entry-id together should allow the user to unambiguously locate the entry. + virtual sstring storage_name() const noexcept = 0; + + // Record a corrupt clustering row. + // If the returned id is null, the row was not recorded. + future record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional sstable_name); +}; + +// Stores corrupt data entries in the system.corrupt_data table. +class system_table_corrupt_data_handler final : public corrupt_data_handler { +public: + using pluggable_system_keyspace = utils::pluggable; + + struct config { + gc_clock::duration entry_ttl; + }; + +private: + gc_clock::duration _entry_ttl; + + pluggable_system_keyspace _sys_ks; + std::unique_ptr _fragment_semaphore; + +private: + reader_permit make_fragment_permit(const schema& s); + + future do_record_corrupt_mutation_fragment(pluggable_system_keyspace::permit sys_ks, const schema& user_table_schema, const partition_key& pk, const clustering_key& ck, + mutation_fragment_v2::kind kind, frozen_mutation_fragment_v2 mf, sstring origin, std::optional sstable_name); + + virtual future do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional sstable_name) override; + +public: + explicit system_table_corrupt_data_handler(config, register_metrics); + ~system_table_corrupt_data_handler(); + + virtual sstring storage_name() const noexcept override { + return format("{}.{}", db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA); + } + + void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept; + future<> unplug_system_keyspace() noexcept; +}; + +// A no-op corrupt data handler that does not record any data. +class nop_corrupt_data_handler final : public corrupt_data_handler { + virtual future do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional sstable_name) override; + +public: + explicit nop_corrupt_data_handler(register_metrics rm) + : corrupt_data_handler(rm) {} + virtual sstring storage_name() const noexcept override { + return "/dev/null"; + } +}; + +} // namespace db diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 94c1994309..f26a6363ac 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -36,6 +36,7 @@ #include "db/schema_tables.hh" #include "gms/generation-number.hh" #include "service/storage_service.hh" +#include "service/storage_proxy.hh" #include "service/paxos/paxos_state.hh" #include "query-result-set.hh" #include "idl/frozen_mutation.dist.hh" @@ -778,6 +779,35 @@ schema_ptr system_keyspace::large_cells() { return large_cells; } +schema_ptr system_keyspace::corrupt_data() { + static thread_local auto corrupt_data = [] { + auto id = generate_legacy_id(NAME, CORRUPT_DATA); + return schema_builder(NAME, CORRUPT_DATA, id) + // partition key + .with_column("keyspace_name", utf8_type, column_kind::partition_key) + .with_column("table_name", utf8_type, column_kind::partition_key) + // clustering key + .with_column("id", timeuuid_type, column_kind::clustering_key) + // regular rows + // Storing keys as bytes: having a corrupt key might be the reason + // to record the row as corrupt, so we just dump what we have and + // leave interpreting to the lucky person investigating the disaster. + .with_column("partition_key", bytes_type) + .with_column("clustering_key", bytes_type) + // Note: mutation-fragment v2 + .with_column("mutation_fragment_kind", utf8_type) + .with_column("frozen_mutation_fragment", bytes_type) + .with_column("origin", utf8_type) + .with_column("sstable_name", utf8_type) + // options + .set_comment("mutation-fragments found to be corrupted") + .set_gc_grace_seconds(0) + .with_hash_version() + .build(); + }(); + return corrupt_data; +} + static constexpr auto schema_gc_grace = std::chrono::duration_cast(days(7)).count(); /*static*/ schema_ptr system_keyspace::scylla_local() { @@ -2327,6 +2357,7 @@ std::vector system_keyspace::all_tables(const db::config& cfg) { peers(), peer_events(), range_xfers(), compactions_in_progress(), compaction_history(), sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(), + corrupt_data(), scylla_local(), db::schema_tables::scylla_table_schema_history(), repair_history(), v3::views_builds_in_progress(), v3::built_views(), @@ -3662,4 +3693,12 @@ future<::shared_ptr> system_keyspace::execute_cql(cons return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes); } +future<> system_keyspace::apply_mutation(mutation m) { + if (m.schema()->ks_name() != NAME) { + on_internal_error(slogger, fmt::format("system_keyspace::apply_mutation(): attempted to apply mutation belonging to table {}.{}", m.schema()->cf_name(), m.schema()->ks_name())); + } + + return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout); +} + } // namespace db diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index 16c0164736..eff14dfcac 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -131,6 +131,7 @@ class system_keyspace : public seastar::peering_sharded_service static schema_ptr large_partitions(); static schema_ptr large_rows(); static schema_ptr large_cells(); + static schema_ptr corrupt_data(); static schema_ptr scylla_local(); future<> force_blocking_flush(sstring cfname); // This function is called when the system.peers table is read, @@ -163,6 +164,7 @@ public: static constexpr auto LARGE_PARTITIONS = "large_partitions"; static constexpr auto LARGE_ROWS = "large_rows"; static constexpr auto LARGE_CELLS = "large_cells"; + static constexpr auto CORRUPT_DATA = "corrupt_data"; static constexpr auto SCYLLA_LOCAL = "scylla_local"; static constexpr auto RAFT = "raft"; static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots"; @@ -684,6 +686,10 @@ public: return execute_cql(req, { data_value(std::forward(args))... }); } + // Apply write as mutation to the system keyspace. + // Mutation has to belong to a table int he system keyspace. + future<> apply_mutation(mutation m); + friend future db::schema_tables::get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version); friend future db::schema_tables::column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); friend future<> db::schema_tables::drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version); diff --git a/docs/dev/system_keyspace.md b/docs/dev/system_keyspace.md index 43a039bdb0..2d9cfe3ecd 100644 --- a/docs/dev/system_keyspace.md +++ b/docs/dev/system_keyspace.md @@ -121,6 +121,29 @@ SELECT * FROM system.large_cells; SELECT * FROM system.large_cells WHERE keyspace_name = 'ks1' and table_name = 'standard1'; ~~~ +## system.corrupt\_data + +Stores data found to be corrupt during internal operations. This data cannot be written to sstables because then it will be spread around by repair and compaction. It will also possibly cause failures in sstable parsing. +At the same time, the data should be kept around so that it can be inspected and possibly restored by the database operator. +This table is used to store such data. Data is saved at the mutation-fragment level. + +Schema: +```cql +CREATE TABLE system.corrupt_data ( + keyspace_name text, # keyspace name of source table + table_name text, # table name of source table + id timeuuid, # id of the corrupt mutation fragment, assigned by the database when the corrupt data entry is created + partition_key blob, # partition key of partition in the source table, can be incomplete or null due to corruption + clustering_key text, # clustering key of mutation-fragment in the source table, can be null for some mutation-fragment kinds, can be incomplete or null due to corruption + mutation_fragment_kind text, # kind of the mutation fragment, one of 'partition start', 'partition end', 'static row', 'clustering row', 'range tombstone change'; only the latter two can have clustering_key set + frozen_mutation_fragment blob, # the serialized mutation fragment itself + origin text, # the name of the process that found the corruption, e.g. 'sstable-writer' + sstable_name text, # the name of the sstable that contains the corrupt data, if known; sstable is not kept around, it could be compacted or deleted + PRIMARY KEY ((keyspace_name, table_name), id) +) WITH CLUSTERING ORDER BY (id ASC) + AND gc_grace_seconds = 0; +``` + ## system.raft Holds information about Raft diff --git a/idl-compiler.py b/idl-compiler.py index 412e89efc4..ef5cee790d 100755 --- a/idl-compiler.py +++ b/idl-compiler.py @@ -196,6 +196,16 @@ template return static_cast<{name}>(deserialize(buf, std::type_identity<{self.underlying_type}>())); }}""") + def serializer_skip_impl(self, cout): + name = self.ns_qualified_name() + + fprintln(cout, f""" +{self.template_declaration} +template +void serializer<{name}>::skip(Input& buf) {{ + buf.skip(sizeof({self.underlying_type})); +}}""") + class Attributes(ASTBase): ''' AST node for representing class and field attributes. @@ -839,6 +849,7 @@ def handle_enum(enum, hout, cout): enum.serializer_write_impl(cout) enum.serializer_read_impl(cout) + enum.serializer_skip_impl(cout) def join_template(template_params): diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index c0d6d0987f..b963a74a10 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -59,6 +59,7 @@ set(idl_headers replica_exception.idl.hh per_partition_rate_limit_info.idl.hh position_in_partition.idl.hh + full_position.idl.hh experimental/broadcast_tables_lang.idl.hh join_node.idl.hh utils.idl.hh diff --git a/idl/full_position.idl.hh b/idl/full_position.idl.hh new file mode 100644 index 0000000000..c7eb2fcb8f --- /dev/null +++ b/idl/full_position.idl.hh @@ -0,0 +1,14 @@ +/* + * Copyright 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "idl/position_in_partition.idl.hh" + +struct full_position { + partition_key partition; + position_in_partition position; +}; diff --git a/idl/mutation.idl.hh b/idl/mutation.idl.hh index 1d8adb7d67..fc955a621e 100644 --- a/idl/mutation.idl.hh +++ b/idl/mutation.idl.hh @@ -11,6 +11,7 @@ #include "idl/uuid.idl.hh" #include "idl/keys.idl.hh" +#include "idl/position_in_partition.idl.hh" class counter_id final { utils::UUID uuid(); @@ -114,6 +115,12 @@ class range_tombstone [[writable]] { bound_kind end_kind [[version 1.3]] = bound_kind::incl_end; }; +class range_tombstone_change stub [[writable]] { + clustering_key_prefix key; + bound_weight weight; // we are trying to move away from bound_kind + tombstone tomb; +}; + class mutation_partition stub [[writable]] { tombstone tomb; row static_row; @@ -168,3 +175,7 @@ class mutation_fragment stub [[writable]] { partition_start, partition_end> fragment; }; +class mutation_fragment_v2 stub [[writable]] { + std::variant fragment; +}; diff --git a/idl/position_in_partition.idl.hh b/idl/position_in_partition.idl.hh index 62e1618a95..17e1d67c2a 100644 --- a/idl/position_in_partition.idl.hh +++ b/idl/position_in_partition.idl.hh @@ -26,8 +26,3 @@ class position_in_partition { bound_weight get_bound_weight(); std::optional get_clustering_key_prefix(); }; - -struct full_position { - partition_key partition; - position_in_partition position; -}; diff --git a/idl/storage_proxy.idl.hh b/idl/storage_proxy.idl.hh index c2e6230255..caad282b4b 100644 --- a/idl/storage_proxy.idl.hh +++ b/idl/storage_proxy.idl.hh @@ -22,6 +22,7 @@ #include "idl/keys.idl.hh" #include "idl/uuid.idl.hh" #include "idl/storage_service.idl.hh" +#include "idl/full_position.idl.hh" verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]); verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]); diff --git a/mutation/frozen_mutation.hh b/mutation/frozen_mutation.hh index ada85fae90..bc8f0c70a2 100644 --- a/mutation/frozen_mutation.hh +++ b/mutation/frozen_mutation.hh @@ -281,6 +281,22 @@ public: frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf); +class frozen_mutation_fragment_v2 { + bytes_ostream _bytes; +public: + explicit frozen_mutation_fragment_v2(bytes_ostream bytes) : _bytes(std::move(bytes)) { } + const bytes_ostream& representation() const { return _bytes; } + bytes_ostream&& representation() && { return std::move(_bytes); } + + mutation_fragment_v2 unfreeze(const schema& s, reader_permit permit); + + future<> clear_gently() noexcept { + return _bytes.clear_gently(); + } +}; + +frozen_mutation_fragment_v2 freeze(const schema& s, const mutation_fragment_v2& mf); + template auto frozen_mutation::consume(schema_ptr s, frozen_mutation_consumer_adaptor& adaptor) const -> frozen_mutation_consume_result { check_schema_version(schema_version(), *s); diff --git a/mutation/mutation_partition_serializer.cc b/mutation/mutation_partition_serializer.cc index 99052bdbb5..1f60a9556a 100644 --- a/mutation/mutation_partition_serializer.cc +++ b/mutation/mutation_partition_serializer.cc @@ -280,3 +280,37 @@ frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf) )).end_mutation_fragment(); return frozen_mutation_fragment(std::move(out)); } + +frozen_mutation_fragment_v2 freeze(const schema& s, const mutation_fragment_v2& mf) { + bytes_ostream out; + ser::writer_of_mutation_fragment_v2 writer(out); + mf.visit(seastar::make_visitor( + [&] (const clustering_row& cr) { + return write_row(std::move(writer).start_fragment_clustering_row().start_row(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb()) + .end_row() + .end_clustering_row(); + }, + [&] (const static_row& sr) { + return write_row_cells(std::move(writer).start_fragment_static_row().start_cells(), sr.cells(), s, column_kind::static_column) + .end_cells() + .end_static_row(); + }, + [&] (const range_tombstone_change& rtc) { + return std::move(writer).start_fragment_range_tombstone_change() + .write_key(rtc.position().key()) + .write_weight(rtc.position().get_bound_weight()) + .write_tomb(rtc.tombstone()) + .end_range_tombstone_change(); + }, + [&] (const partition_start& ps) { + return std::move(writer).start_fragment_partition_start() + .write_key(ps.key().key()) + .write_partition_tombstone(ps.partition_tombstone()) + .end_partition_start(); + }, + [&] (const partition_end& pe) { + return std::move(writer).write_fragment_partition_end(pe); + } + )).end_mutation_fragment_v2(); + return frozen_mutation_fragment_v2(std::move(out)); +} diff --git a/mutation/mutation_partition_view.cc b/mutation/mutation_partition_view.cc index cc5f53d4d4..f2c8850b68 100644 --- a/mutation/mutation_partition_view.cc +++ b/mutation/mutation_partition_view.cc @@ -448,59 +448,60 @@ mutation_partition_view mutation_partition_view::from_view(ser::mutation_partiti return { v.v }; } +clustering_row read_clustered_row(const schema& s, ser::clustering_row_view crv) { + class clustering_row_builder { + const schema& _s; + clustering_row _row; + public: + clustering_row_builder(const schema& s, clustering_key key, row_tombstone t, row_marker m) + : _s(s), _row(std::move(key), std::move(t), std::move(m), row()) { } + void accept_atomic_cell(column_id id, atomic_cell ac) { + _row.cells().append_cell(id, std::move(ac)); + } + void accept_collection(column_id id, const collection_mutation& cm) { + _row.cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm)); + } + clustering_row get() && { return std::move(_row); } + }; + + auto cr = crv.row(); + auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at())); + clustering_row_builder builder(s, cr.key(), std::move(t), read_row_marker(cr.marker())); + read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder); + return std::move(builder).get(); +} + +static_row read_static_row(const schema& s, ser::static_row_view sr) { + class static_row_builder { + const schema& _s; + static_row _row; + public: + explicit static_row_builder(const schema& s) + : _s(s) { } + void accept_atomic_cell(column_id id, atomic_cell ac) { + _row.cells().append_cell(id, std::move(ac)); + } + void accept_collection(column_id id, const collection_mutation& cm) { + _row.cells().append_cell(id, collection_mutation(*_s.static_column_at(id).type, cm)); + } + static_row get() && { return std::move(_row); } + }; + + static_row_builder builder(s); + read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder); + return std::move(builder).get(); +} + mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s, reader_permit permit) { auto in = ser::as_input_stream(_bytes); auto view = ser::deserialize(in, std::type_identity()); return seastar::visit(view.fragment(), [&] (ser::clustering_row_view crv) { - class clustering_row_builder { - const schema& _s; - mutation_fragment _mf; - public: - clustering_row_builder(const schema& s, reader_permit permit, clustering_key key, row_tombstone t, row_marker m) - : _s(s), _mf(mutation_fragment::clustering_row_tag_t(), s, std::move(permit), std::move(key), std::move(t), std::move(m), row()) { } - void accept_atomic_cell(column_id id, atomic_cell ac) { - _mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable { - cr.cells().append_cell(id, std::move(ac)); - }); - } - void accept_collection(column_id id, const collection_mutation& cm) { - _mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable { - cr.cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm)); - }); - } - mutation_fragment get_mutation_fragment() && { return std::move(_mf); } - }; - - auto cr = crv.row(); - auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at())); - clustering_row_builder builder(s, permit, cr.key(), std::move(t), read_row_marker(cr.marker())); - read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder); - return std::move(builder).get_mutation_fragment(); + return mutation_fragment(s, permit, read_clustered_row(s, crv)); }, [&] (ser::static_row_view sr) { - class static_row_builder { - const schema& _s; - mutation_fragment _mf; - public: - explicit static_row_builder(const schema& s, reader_permit permit) : _s(s), _mf(_s, std::move(permit), static_row()) { } - void accept_atomic_cell(column_id id, atomic_cell ac) { - _mf.mutate_as_static_row(_s, [&] (static_row& sr) mutable { - sr.cells().append_cell(id, std::move(ac)); - }); - } - void accept_collection(column_id id, const collection_mutation& cm) { - _mf.mutate_as_static_row(_s, [&] (static_row& sr) mutable { - sr.cells().append_cell(id, collection_mutation(*_s.static_column_at(id).type, cm)); - }); - } - mutation_fragment get_mutation_fragment() && { return std::move(_mf); } - }; - - static_row_builder builder(s, permit); - read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder); - return std::move(builder).get_mutation_fragment(); + return mutation_fragment(s, permit, read_static_row(s, sr)); }, [&] (ser::range_tombstone_view rt) { return mutation_fragment(s, permit, range_tombstone(rt)); @@ -517,3 +518,31 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s, reader_per } ); } + +mutation_fragment_v2 frozen_mutation_fragment_v2::unfreeze(const schema& s, reader_permit permit) +{ + auto in = ser::as_input_stream(_bytes); + auto view = ser::deserialize(in, std::type_identity()); + return seastar::visit(view.fragment(), + [&] (ser::clustering_row_view crv) { + return mutation_fragment_v2(s, permit, read_clustered_row(s, crv)); + }, + [&] (ser::static_row_view sr) { + return mutation_fragment_v2(s, permit, read_static_row(s, sr)); + }, + [&] (ser::range_tombstone_change_view rtc) { + auto pos = position_in_partition(partition_region::clustered, rtc.weight(), rtc.key()); + return mutation_fragment_v2(s, permit, range_tombstone_change(std::move(pos), rtc.tomb())); + }, + [&] (ser::partition_start_view ps) { + auto dkey = dht::decorate_key(s, ps.key()); + return mutation_fragment_v2(s, permit, partition_start(std::move(dkey), ps.partition_tombstone())); + }, + [&] (partition_end) { + return mutation_fragment_v2(s, permit, partition_end()); + }, + [] (ser::unknown_variant_type) -> mutation_fragment_v2 { + throw std::runtime_error("Trying to deserialize unknown mutation fragment type"); + } + ); +} diff --git a/replica/database.cc b/replica/database.cc index 697f81dda8..094447e419 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -57,6 +57,7 @@ #include "db/timeout_clock.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "db/data_listeners.hh" #include "data_dictionary/user_types_metadata.hh" @@ -417,8 +418,10 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat _cfg.compaction_rows_count_warning_threshold, _cfg.compaction_collection_elements_count_warning_threshold)) , _nop_large_data_handler(std::make_unique()) - , _user_sstables_manager(std::make_unique("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group, &sstm)) - , _system_sstables_manager(std::make_unique("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group)) + , _corrupt_data_handler(std::make_unique(db::system_table_corrupt_data_handler::config{.entry_ttl = std::chrono::days(10)}, db::corrupt_data_handler::register_metrics::yes)) + , _nop_corrupt_data_handler(std::make_unique(db::corrupt_data_handler::register_metrics::no)) + , _user_sstables_manager(std::make_unique("user", *_large_data_handler, *_corrupt_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group, &sstm)) + , _system_sstables_manager(std::make_unique("system", *_nop_large_data_handler, *_nop_corrupt_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group)) , _result_memory_limiter(dbcfg.available_memory / 10) , _data_listeners(std::make_unique()) , _mnotifier(mn) @@ -3050,6 +3053,7 @@ database::as_data_dictionary() const { void database::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept { _compaction_manager.plug_system_keyspace(sys_ks); _large_data_handler->plug_system_keyspace(sys_ks); + _corrupt_data_handler->plug_system_keyspace(sys_ks); _user_sstables_manager->plug_sstables_registry(std::make_unique(sys_ks)); } @@ -3057,6 +3061,7 @@ future<> database::unplug_system_keyspace() noexcept { _user_sstables_manager->unplug_sstables_registry(); co_await _compaction_manager.unplug_system_keyspace(); co_await _large_data_handler->unplug_system_keyspace(); + co_await _corrupt_data_handler->unplug_system_keyspace(); } void database::plug_view_update_generator(db::view::view_update_generator& generator) noexcept { diff --git a/replica/database.hh b/replica/database.hh index 891c811ac6..a8ec86d405 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -137,6 +137,8 @@ class extensions; class rp_handle; class data_listeners; class large_data_handler; +class system_table_corrupt_data_handler; +class nop_corrupt_data_handler; class system_keyspace; namespace view { @@ -1583,6 +1585,9 @@ private: std::unique_ptr _large_data_handler; std::unique_ptr _nop_large_data_handler; + std::unique_ptr _corrupt_data_handler; + std::unique_ptr _nop_corrupt_data_handler; + std::unique_ptr _user_sstables_manager; std::unique_ptr _system_sstables_manager; diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 743e12bd01..6dc05c9d94 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -18,6 +18,7 @@ #include "utils/assert.hh" #include "utils/exceptions.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include #include @@ -693,6 +694,8 @@ private: void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key, const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements); + void record_corrupt_row(clustering_row&& clustered_row); + // Writes single atomic cell void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef, const row_time_properties& properties, std::optional cell_path = {}); @@ -1305,12 +1308,41 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_ collect_row_stats(_data_writer->offset() - current_pos, &clustered_row.key(), is_dead); } +void writer::record_corrupt_row(clustering_row&& clustered_row) { + auto& handler = _sst.get_corrupt_data_handler(); + + const auto pk = _partition_key->to_partition_key(_schema); + const auto ck = clustered_row.key(); + + db::corrupt_data_handler::entry_id corrupt_row_id; + sstring result; + try { + corrupt_row_id = handler.record_corrupt_clustering_row(_schema, pk, std::move(clustered_row), "sstable-write", fmt::to_string(_sst.get_filename())).get(); + result = format("written corrupt row to {} with id {}", handler.storage_name(), corrupt_row_id); + } catch (...) { + result = format("failed to write corrupt row to {}: {}", handler.storage_name(), std::current_exception()); + } + + slogger.error("found non-full clustering key {} in partition {} while writing sstable {} for non-compact table {}.{}; {}", + ck, + pk, + _sst.get_filename(), + _schema.ks_name(), + _schema.cf_name(), + result); +} + stop_iteration writer::consume(clustering_row&& cr) { if (_write_regular_as_static) { ensure_tombstone_is_written(); write_static_row(cr.cells(), column_kind::regular_column); return stop_iteration::no; } + if (!_schema.is_compact_table() && !cr.key().is_full(_schema)) { + record_corrupt_row(std::move(cr)); + return stop_iteration::no; + } + ensure_tombstone_is_written(); ensure_static_row_is_written_if_needed(); write_clustered(cr); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index bafd4d81d3..a0005cdb22 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -3220,6 +3220,7 @@ sstable::sstable(schema_ptr schema, version_types v, format_types f, db::large_data_handler& large_data_handler, + db::corrupt_data_handler& corrupt_data_handler, sstables_manager& manager, db_clock::time_point now, io_error_handler_gen error_handler_gen, @@ -3237,6 +3238,7 @@ sstable::sstable(schema_ptr schema, , _read_error_handler(error_handler_gen(sstable_read_error)) , _write_error_handler(error_handler_gen(sstable_write_error)) , _large_data_handler(large_data_handler) + , _corrupt_data_handler(corrupt_data_handler) , _manager(manager) { manager.add(this); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 2919450e14..b885bbd478 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -58,6 +58,7 @@ class in_memory_config_type; namespace db { class large_data_handler; +class corrupt_data_handler; } namespace sstables { @@ -199,6 +200,7 @@ public: version_types v, format_types f, db::large_data_handler& large_data_handler, + db::corrupt_data_handler& corrupt_data_handler, sstables_manager& manager, db_clock::time_point now, io_error_handler_gen error_handler_gen, @@ -443,6 +445,10 @@ public: return _large_data_handler; } + db::corrupt_data_handler& get_corrupt_data_handler() { + return _corrupt_data_handler; + } + void assert_large_data_handler_is_running(); /** @@ -583,6 +589,7 @@ private: io_error_handler _write_error_handler; db::large_data_handler& _large_data_handler; + db::corrupt_data_handler& _corrupt_data_handler; sstables_manager& _manager; sstables_stats _stats; diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index bc64b1dc30..af630ca71e 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -26,11 +26,13 @@ namespace sstables { logging::logger smlogger("sstables_manager"); sstables_manager::sstables_manager( - sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, + sstring name, db::large_data_handler& large_data_handler, db::corrupt_data_handler& corrupt_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function&& resolve_host_id, sstable_compressor_factory& compressor_factory, const abort_source& abort, scheduling_group maintenance_sg, storage_manager* shared) : _storage(shared) , _available_memory(available_memory) - , _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat), _cache_tracker(ct) + , _large_data_handler(large_data_handler) + , _corrupt_data_handler(corrupt_data_handler) + , _db_config(dbcfg), _features(feat), _cache_tracker(ct) , _sstable_metadata_concurrency_sem( max_count_sstable_metadata_concurrent_reads, max_memory_sstable_metadata_concurrent_reads(available_memory), @@ -161,7 +163,7 @@ shared_sstable sstables_manager::make_sstable(schema_ptr schema, db_clock::time_point now, io_error_handler_gen error_handler_gen, size_t buffer_size) { - return make_lw_shared(std::move(schema), storage, generation, state, v, f, get_large_data_handler(), *this, now, std::move(error_handler_gen), buffer_size); + return make_lw_shared(std::move(schema), storage, generation, state, v, f, get_large_data_handler(), get_corrupt_data_handler(), *this, now, std::move(error_handler_gen), buffer_size); } sstable_writer_config sstables_manager::configure_writer(sstring origin) const { diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index b66046f088..df55dbcd5d 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -31,6 +31,7 @@ namespace db { class large_data_handler; +class corrupt_data_handler; class config; class object_storage_endpoint_param; @@ -97,6 +98,7 @@ private: storage_manager* _storage; size_t _available_memory; db::large_data_handler& _large_data_handler; + db::corrupt_data_handler& _corrupt_data_handler; const db::config& _db_config; gms::feature_service& _features; // _sstables_format is the format used for writing new sstables. @@ -145,8 +147,20 @@ private: signal_type _signal_source; public: - explicit sstables_manager(sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker&, size_t available_memory, directory_semaphore& dir_sem, - noncopyable_function&& resolve_host_id, sstable_compressor_factory&, const abort_source& abort, scheduling_group maintenance_sg = current_scheduling_group(), storage_manager* shared = nullptr); + explicit sstables_manager( + sstring name, + db::large_data_handler& large_data_handler, + db::corrupt_data_handler& corrupt_data_handler, + const db::config& dbcfg, + gms::feature_service& feat, + cache_tracker&, + size_t available_memory, + directory_semaphore& dir_sem, + noncopyable_function&& resolve_host_id, + sstable_compressor_factory&, + const abort_source& abort, + scheduling_group maintenance_sg = current_scheduling_group(), + storage_manager* shared = nullptr); virtual ~sstables_manager(); shared_sstable make_sstable(schema_ptr schema, @@ -251,6 +265,9 @@ private: db::large_data_handler& get_large_data_handler() const { return _large_data_handler; } + db::corrupt_data_handler& get_corrupt_data_handler() const { + return _corrupt_data_handler; + } friend class sstable; // Allow testing private methods/variables via test_env_sstables_manager diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 48c95b90e2..4761f2c3ad 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -48,10 +48,13 @@ #include "test/lib/sstable_utils.hh" #include "test/lib/random_utils.hh" #include "test/lib/test_utils.hh" +#include "test/lib/cql_test_env.hh" #include "readers/from_mutations.hh" #include "readers/from_fragments.hh" +#include "readers/combined.hh" #include "test/lib/random_schema.hh" #include "test/lib/exception_utils.hh" +#include "test/lib/cql_assertions.hh" namespace fs = std::filesystem; @@ -3282,3 +3285,130 @@ SEASTAR_TEST_CASE(sstable_identifier_correctness) { BOOST_REQUIRE_EQUAL(sst->sstable_identifier()->uuid(), sst->generation().as_uuid()); }); } + +SEASTAR_TEST_CASE(test_non_full_and_empty_row_keys) { + return do_with_cql_env_thread([] (cql_test_env& env) { + const auto seed = tests::random::get_int(); + auto random_spec = tests::make_random_schema_specification( + "ks", + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(1, 4), + std::uniform_int_distribution(2, 8), + std::uniform_int_distribution(2, 8)); + auto random_schema = tests::random_schema(seed, *random_spec); + + testlog.info("Random schema:\n{}", random_schema.cql()); + + random_schema.create_with_cql(env).get(); + + auto schema = random_schema.schema(); + + auto& db = env.local_db(); + auto& table = db.find_column_family(schema); + auto& manager = db.get_user_sstables_manager(); + + const auto generated_mutations = tests::generate_random_mutations(random_schema).get(); + + auto mutation_description = random_schema.new_mutation(0); + auto engine = std::mt19937(seed); + random_schema.add_row(engine, mutation_description, 0, [] (std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) { + switch (destination) { + case tests::timestamp_destination::partition_tombstone: + case tests::timestamp_destination::row_tombstone: + case tests::timestamp_destination::collection_tombstone: + case tests::timestamp_destination::range_tombstone: + return api::missing_timestamp; + default: + return api::timestamp_type(100); + } + }); + + const auto row_mutation = mutation_description.build(schema); + + auto check = [&] (const clustering_key& ck) { + testlog.info("check({})", ck); + + auto permit = db.obtain_reader_permit(schema, "test_non_full_and_empty_row_keys::write", db::no_timeout, {}).get(); + + const auto dk = generated_mutations.front().decorated_key(); + + const auto row_mutation_fragment = mutation_fragment_v2(*schema, permit, + clustering_row(ck, deletable_row(*schema, row_mutation.partition().clustered_rows().begin()->row()))); + + std::deque fragments; + fragments.emplace_back(*schema, permit, partition_start(dk, {})); + fragments.emplace_back(*schema, permit, row_mutation_fragment); + fragments.emplace_back(*schema, permit, partition_end()); + + const auto original_mutation_fragment = mutation_fragment_v2(*schema, permit, fragments[1]); + + auto reader = make_combined_reader(schema, permit, + make_mutation_reader_from_mutations(schema, permit, generated_mutations, query::full_partition_range), + make_mutation_reader_from_fragments(schema, permit, std::move(fragments))); + + auto sst = table.make_sstable(); + auto& corrupt_data_handler = sst->get_corrupt_data_handler(); + const auto stats_before = corrupt_data_handler.get_stats(); + + sst->write_components(std::move(reader), generated_mutations.size(), schema, manager.configure_writer("test"), encoding_stats{}).get(); + sst->load(schema->get_sharder(), {}).get(); + + testlog.info("mutations written to : {}", sst->get_filename()); + + // The sstable should not contain the row with the bad key -- that should be passed to the corrupt_data_handler. + assert_that(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice())) + .produces(generated_mutations); + + const auto stats_after = corrupt_data_handler.get_stats(); + + for (const uint64_t db::corrupt_data_handler::stats::* member : { + &db::corrupt_data_handler::stats::corrupt_data_reported, + &db::corrupt_data_handler::stats::corrupt_data_recorded, + &db::corrupt_data_handler::stats::corrupt_clustering_rows_reported, + &db::corrupt_data_handler::stats::corrupt_clustering_rows_recorded}) { + BOOST_REQUIRE_EQUAL(stats_after.*member, stats_before.*member + 1); + } + + auto res = env.execute_cql(format("SELECT * FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'", + db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get(); + + assert_that(res) + .is_rows() + .with_size(1) + .with_columns_of_row(0) + .with_typed_column("id", [] (const timeuuid_native_type& v) { return !v.uuid.is_null(); }) + .with_typed_column("partition_key", [&] (const bytes& v) { + return partition_key::from_bytes(v).equal(*schema, dk.key()); + }) + .with_typed_column("clustering_key", [&] (const bytes& v) { + return clustering_key::from_bytes(v).equal(*schema, ck); + }) + .with_typed_column("mutation_fragment_kind", "clustering row") + .with_typed_column("frozen_mutation_fragment", [&] (const bytes& v) { + bytes_ostream fmf_bytes; + fmf_bytes.write(v); + + frozen_mutation_fragment_v2 fmf(std::move(fmf_bytes)); + + const auto unfreezed_mutation_fragment = fmf.unfreeze(*schema, permit); + + return unfreezed_mutation_fragment.equal(*schema, original_mutation_fragment); + }) + .with_typed_column("origin", "sstable-write") + .with_typed_column("sstable_name", fmt::to_string(sst->get_filename())) + ; + + // Clear the corrupt data table so that it doesn't affect other checks. + env.execute_cql(format("DELETE FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'", + db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get(); + }; + + check(clustering_key::make_empty()); + + if (schema->clustering_key_size() > 1) { + auto full_ckey = random_schema.make_ckey(0); + full_ckey.erase(full_ckey.end() - 1); + check(clustering_key::from_exploded(*schema, full_ckey)); + } + }); +} diff --git a/test/lib/cql_assertions.cc b/test/lib/cql_assertions.cc index c035a55a14..072c065be7 100644 --- a/test/lib/cql_assertions.cc +++ b/test/lib/cql_assertions.cc @@ -21,6 +21,47 @@ static inline void fail(sstring msg) { throw std::runtime_error(msg); } +void columns_assertions::fail(const sstring& msg) { + ::fail(msg); +} + +columns_assertions& columns_assertions::do_with_raw_column(const char* name, std::function func) { + const auto& names = _metadata.get_names(); + + auto it = std::ranges::find_if(names, [name] (const auto& col) { + return col->name->text() == name; + }); + if (it == names.end()) { + ::fail(seastar::format("Column {} not found in metadata", name)); + } + + const size_t index = std::distance(names.begin(), it); + const auto& value = _columns.at(index); + if (!value) { + ::fail(seastar::format("Column {} is null", name)); + } + + func((*it)->type, *value); + + return *this; +} + +columns_assertions& columns_assertions::with_raw_column(const char* name, std::function predicate) { + return do_with_raw_column(name, [name, &predicate] (data_type, managed_bytes_view value) { + if (!predicate(value)) { + ::fail(seastar::format("Column {} failed predicate check: value = {}", name, value)); + } + }); +} + +columns_assertions& columns_assertions::with_raw_column(const char* name, managed_bytes_view value) { + return do_with_raw_column(name, [name, &value] (data_type, managed_bytes_view cell_value) { + if (cell_value != value) { + ::fail(seastar::format("Expected column {} to have value {}, but got {}", name, value, cell_value)); + } + }); +} + rows_assertions::rows_assertions(shared_ptr rows) : _rows(rows) { } @@ -165,6 +206,11 @@ rows_assertions::with_rows_ignore_order(std::vector> rows return {*this}; } +columns_assertions rows_assertions::with_columns_of_row(size_t row_index) { + const auto& rs = _rows->rs().result_set(); + return columns_assertions(rs.get_metadata(), rs.rows().at(row_index)); +} + result_msg_assertions::result_msg_assertions(shared_ptr msg) : _msg(msg) { } diff --git a/test/lib/cql_assertions.hh b/test/lib/cql_assertions.hh index 8392c24708..89ce1dec5b 100644 --- a/test/lib/cql_assertions.hh +++ b/test/lib/cql_assertions.hh @@ -10,6 +10,7 @@ #pragma once #include "utils/assert.hh" +#include "utils/managed_bytes.hh" #include "test/lib/cql_test_env.hh" #include "transport/messages/result_message_base.hh" #include "bytes.hh" @@ -17,6 +18,45 @@ #include #include +class columns_assertions { + const cql3::metadata& _metadata; + const std::vector& _columns; + + columns_assertions& do_with_raw_column(const char* name, std::function func); + + void fail(const sstring& msg); + +public: + columns_assertions(const cql3::metadata& metadata, const std::vector& columns) + : _metadata(metadata), _columns(columns) + { } + + columns_assertions& with_raw_column(const char* name, std::function predicate); + columns_assertions& with_raw_column(const char* name, managed_bytes_view value); + + template + columns_assertions& with_typed_column(const char* name, std::function predicate) { + return do_with_raw_column(name, [this, name, predicate] (data_type type, managed_bytes_view value) { + if (type != data_type_for()) { + fail(seastar::format("Column {} is not of type {}, but of type {}", name, data_type_for()->name(), type->name())); + } + if (!predicate(value_cast(type->deserialize(value)))) { + fail(seastar::format("Column {} failed predicate check: value = {}", name, value)); + } + }); + } + + template + columns_assertions& with_typed_column(const char* name, const T& value) { + return with_typed_column(name, [this, name, &value] (const T& cell_value) { + if (cell_value != value) { + fail(seastar::format("Expected column {} to have value {}, but got {}", name, value, cell_value)); + } + return true; + }); + } +}; + class rows_assertions { shared_ptr _rows; public: @@ -33,6 +73,8 @@ public: rows_assertions with_rows_ignore_order(std::vector> rows); rows_assertions with_serialized_columns_count(size_t columns_count); + columns_assertions with_columns_of_row(size_t row_index); + rows_assertions is_null(); rows_assertions is_not_null(); }; diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 2682cd4c3a..0e5640f7a2 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -15,6 +15,7 @@ #include "data_dictionary/storage_options.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "sstables/version.hh" #include "sstables/sstable_directory.hh" #include "compaction/compaction_manager.hh" @@ -95,6 +96,7 @@ public: struct test_env_config { db::large_data_handler* large_data_handler = nullptr; + db::corrupt_data_handler* corrupt_data_handler = nullptr; data_dictionary::storage_options storage; // will be local by default size_t available_memory = memory::stats().total_memory(); }; diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 387a397819..afa0a30adf 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -13,6 +13,7 @@ #include "test/lib/test_utils.hh" #include "db/config.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "dht/i_partitioner.hh" #include "gms/feature_service.hh" #include "repair/row_level.hh" @@ -202,6 +203,7 @@ struct test_env::impl { ::cache_tracker cache_tracker; gms::feature_service feature_service; db::nop_large_data_handler nop_ld_handler; + db::nop_corrupt_data_handler nop_cd_handler; sstable_compressor_factory& scf; test_env_sstables_manager mgr; std::unique_ptr cmgr; @@ -225,10 +227,22 @@ test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, ss , db_config(make_db_config(dir.path().native(), cfg.storage)) , dir_sem(1) , feature_service(gms::feature_config_from_db_config(*db_config)) + , nop_cd_handler(db::corrupt_data_handler::register_metrics::no) , scf(scfarg) - , mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, - feature_service, cache_tracker, cfg.available_memory, dir_sem, - [host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm) + , mgr( + "test_env", + cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, + cfg.corrupt_data_handler == nullptr ? nop_cd_handler : *cfg.corrupt_data_handler, + *db_config, + feature_service, + cache_tracker, + cfg.available_memory, + dir_sem, + [host_id = locator::host_id::create_random_id()]{ return host_id; }, + scf, + abort, + current_scheduling_group(), + sstm) , semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no) , storage(std::move(cfg.storage)) { diff --git a/tools/read_mutation.hh b/tools/read_mutation.hh index 08ee73027e..48701b7163 100644 --- a/tools/read_mutation.hh +++ b/tools/read_mutation.hh @@ -16,6 +16,7 @@ #include "db/cache_tracker.hh" #include "db/config.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "gms/feature_service.hh" #include "schema/schema_fwd.hh" #include "sstables/sstable_directory.hh" @@ -29,6 +30,7 @@ future get_table_directory(std::filesystem::path scylla_d struct sstable_manager_service { db::nop_large_data_handler large_data_handler; + db::nop_corrupt_data_handler corrupt_data_handler; gms::feature_service feature_service; cache_tracker tracker; sstables::directory_semaphore dir_sem; @@ -36,9 +38,10 @@ struct sstable_manager_service { abort_source abort; explicit sstable_manager_service(const db::config& dbcfg, sstable_compressor_factory& scf) - : feature_service(gms::feature_config_from_db_config(dbcfg)) + : corrupt_data_handler(db::corrupt_data_handler::register_metrics::no) + , feature_service(gms::feature_config_from_db_config(dbcfg)) , dir_sem(1) - , sst_man("schema_loader", large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }, scf, abort) { + , sst_man("schema_loader", large_data_handler, corrupt_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }, scf, abort) { } future<> stop() { diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc index de4fe88bc6..cbb4979d8f 100644 --- a/tools/schema_loader.cc +++ b/tools/schema_loader.cc @@ -27,6 +27,7 @@ #include "db/config.hh" #include "db/extensions.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "db/system_distributed_keyspace.hh" #include "db/schema_tables.hh" #include "db/system_keyspace.hh" @@ -496,12 +497,13 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem: } db::nop_large_data_handler large_data_handler; + db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no); gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg)); cache_tracker tracker; sstables::directory_semaphore dir_sem(1); abort_source abort; auto scf = make_sstable_compressor_factory_for_tests_in_thread(); - sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker, + sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, corrupt_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, [host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort); auto close_sst_man = deferred_close(sst_man); diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index 8e0d9e8439..a17543186b 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -28,6 +28,7 @@ #include "cql3/statements/select_statement.hh" #include "db/config.hh" #include "db/large_data_handler.hh" +#include "db/corrupt_data_handler.hh" #include "gms/feature_service.hh" #include "reader_concurrency_semaphore.hh" #include "readers/combined.hh" @@ -72,8 +73,6 @@ const auto app_name = "sstable"; logging::logger sst_log(format("scylla-{}", app_name)); -db::nop_large_data_handler large_data_handler; - struct decorated_key_hash { std::size_t operator()(const dht::decorated_key& dk) const { return dht::token::to_int64(dk.token()); @@ -3570,9 +3569,13 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big- sstm.start(std::ref(dbcfg), stm_cfg).get(); auto stop_sstm = defer([&sstm] { sstm.stop().get(); }); + db::nop_large_data_handler large_data_handler; + db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no); + sstables::sstables_manager sst_man( "scylla_sstable", large_data_handler, + corrupt_data_handler, dbcfg, feature_service, tracker,