Merge 'sstables/mx/writer: handle non-full prefix row keys' from Botond Dénes

Although valid for compact tables, non-full (or empty) clustering key prefixes are not handled for row keys when writing sstables. Only the present components are written, consequently if the key is empty, it is omitted entirely.
When parsing sstables, the parsing code unconditionally parses a full prefix.
This mis-match results in parsing failures, as the parser parses part of the row content as a key resulting in a garbage key and subsequent mis-parsing of the row content and maybe even subsequent partitions.

Introduce a new system table: `system.corrupt_data` and infrastructure similar to `large_data_handler`: `corrupt_data_handler` which abstracts how corrupt data is handled. The sstable writer now passes rows such corrupt keys to the corrupt data handler. This way, we avoid corrupting the sstables beyond parsing and the rows are also kept around in system.corrupt_data for later inspection and possible recovery.

Add a full-stack test which checks that rows with bad keys are correctly handled.

Fixes: https://github.com/scylladb/scylladb/issues/24489

The bug is present in all versions, has to be backported to all supported versions.

Closes scylladb/scylladb#24492

* github.com:scylladb/scylladb:
  test/boost/sstable_datafile_test: add test for corrupt data
  sstables/mx/writer: handler rows with empty keys
  test/lib/cql_assertions: introduce columns_assertions
  sstables: add corrupt_data_handler to sstables::sstables
  tools/scylla-sstable: make large_data_handler a local
  db: introduce corrupt_data_handler
  mutation: introduce frozen_mutation_fragment_v2
  mutation/mutation_partition_view: read_{clustering,static}_row(): return row type
  mutation/mutation_partition_view: extract de-ser of {clustering,static} row
  idl-compiler.py: generate skip() definition for enums serializers
  idl: extract full_position.idl from position_in_partition.idl
  db/system_keyspace: add apply_mutation()
  db/system_keyspace: introduce the corrupt_data table
This commit is contained in:
Avi Kivity
2025-06-29 18:18:36 +03:00
31 changed files with 804 additions and 65 deletions

View File

@@ -981,6 +981,7 @@ scylla_core = (['message/messaging_service.cc',
'cql3/result_set.cc',
'cql3/prepare_context.cc',
'db/batchlog_manager.cc',
'db/corrupt_data_handler.cc',
'db/commitlog/commitlog.cc',
'db/commitlog/commitlog_entry.cc',
'db/commitlog/commitlog_replayer.cc',
@@ -1341,6 +1342,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/replica_exception.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
'idl/position_in_partition.idl.hh',
'idl/full_position.idl.hh',
'idl/experimental/broadcast_tables_lang.idl.hh',
'idl/storage_service.idl.hh',
'idl/join_node.idl.hh',

View File

@@ -27,6 +27,7 @@ target_sources(db
extensions.cc
heat_load_balance.cc
large_data_handler.cc
corrupt_data_handler.cc
marshal/type_parser.cc
batchlog_manager.cc
tags/utils.cc

134
db/corrupt_data_handler.cc Normal file
View File

@@ -0,0 +1,134 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/corrupt_data_handler.hh"
#include "reader_concurrency_semaphore.hh"
#include "replica/database.hh"
#include "utils/UUID_gen.hh"
static logging::logger corrupt_data_logger("corrupt_data");
namespace sm = seastar::metrics;
namespace db {
corrupt_data_handler::corrupt_data_handler(register_metrics rm) {
if (rm) {
_metrics.add_group("corrupt_data", {
sm::make_counter("entries_reported", _stats.corrupt_data_reported,
sm::description("Counts the number of corrupt data instances reported to the corrupt data handler. "
"A non-zero value indicates that the database suffered data corruption."))
});
}
}
future<corrupt_data_handler::entry_id> corrupt_data_handler::record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
++_stats.corrupt_data_reported;
++_stats.corrupt_clustering_rows_reported;
return do_record_corrupt_clustering_row(s, pk, std::move(cr), std::move(origin), std::move(sstable_name)).then([this] (entry_id id) {
if (id) {
++_stats.corrupt_data_recorded;
++_stats.corrupt_clustering_rows_recorded;
}
return id;
});
}
system_table_corrupt_data_handler::system_table_corrupt_data_handler(config cfg, register_metrics rm)
: corrupt_data_handler(rm)
, _entry_ttl(cfg.entry_ttl)
, _sys_ks("system_table_corrupt_data_handler::system_keyspace")
{
}
system_table_corrupt_data_handler::~system_table_corrupt_data_handler() {
}
reader_permit system_table_corrupt_data_handler::make_fragment_permit(const schema& s) {
return _fragment_semaphore->make_tracking_only_permit(s.shared_from_this(), "system_table_corrupt_data_handler::make_fragment_permit", db::no_timeout, {});
}
future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_record_corrupt_mutation_fragment(
pluggable_system_keyspace::permit sys_ks,
const schema& user_table_schema,
const partition_key& pk,
const clustering_key& ck,
mutation_fragment_v2::kind kind,
frozen_mutation_fragment_v2 fmf,
sstring origin,
std::optional<sstring> sstable_name) {
const corrupt_data_handler::entry_id id{utils::UUID_gen::get_time_UUID()};
const auto corrupt_data_schema = sys_ks->local_db().find_column_family(system_keyspace::NAME, system_keyspace::CORRUPT_DATA).schema();
// Using the lower-level mutation API to avoid large allocation warnings when linearizing the frozen mutation fragment.
mutation entry_mutation(corrupt_data_schema, partition_key::from_exploded(*corrupt_data_schema, {serialized(user_table_schema.ks_name()), serialized(user_table_schema.cf_name())}));
auto& entry_row = entry_mutation.partition().clustered_row(*corrupt_data_schema, clustering_key::from_single_value(*corrupt_data_schema, serialized(timeuuid_native_type{id.uuid()})));
const auto timestamp = api::new_timestamp();
auto set_cell_raw = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, managed_bytes cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value, _entry_ttl));
};
auto set_cell = [this, &entry_row, &corrupt_data_schema, timestamp] (const char* cell_name, data_value cell_value) {
auto cdef = corrupt_data_schema->get_column_definition(cell_name);
SCYLLA_ASSERT(cdef);
entry_row.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, timestamp, cell_value.serialize_nonnull(), _entry_ttl));
};
entry_row.apply(row_marker(timestamp, _entry_ttl, gc_clock::now() + _entry_ttl));
set_cell("partition_key", data_value(to_bytes(pk.representation())));
set_cell("clustering_key", data_value(to_bytes(ck.representation())));
set_cell("mutation_fragment_kind", fmt::to_string(kind));
// FIXME: Exposing knowledge here that bytes are serialized by just storing the raw value.
// Need to replace with a fragmented-buffer serialize API call, which we don't have yet.
set_cell_raw("frozen_mutation_fragment", std::move(fmf).representation().to_managed_bytes());
set_cell("origin", origin);
set_cell("sstable_name", sstable_name);
return sys_ks->apply_mutation(std::move(entry_mutation)).then([id] {
return id;
});
}
future<corrupt_data_handler::entry_id> system_table_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
auto sys_ks = _sys_ks.get_permit();
if (!sys_ks) {
co_return corrupt_data_handler::entry_id::create_null_id();
}
const auto ck = cr.key();
auto fmf = freeze(s, mutation_fragment_v2(s, make_fragment_permit(s), std::move(cr)));
co_return co_await do_record_corrupt_mutation_fragment(std::move(sys_ks), s, pk, ck, mutation_fragment_v2::kind::clustering_row, std::move(fmf),
std::move(origin), std::move(sstable_name));
}
void system_table_corrupt_data_handler::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_sys_ks.plug(sys_ks.shared_from_this());
_fragment_semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{}, "system_table_corrupt_data_handler", reader_concurrency_semaphore::register_metrics::no);
}
future<> system_table_corrupt_data_handler::unplug_system_keyspace() noexcept {
co_await _sys_ks.unplug();
co_await _fragment_semaphore->stop();
}
future<corrupt_data_handler::entry_id> nop_corrupt_data_handler::do_record_corrupt_clustering_row(const schema& s, const partition_key& pk,
clustering_row cr, sstring origin, std::optional<sstring> sstable_name) {
return make_ready_future<entry_id>(entry_id::create_null_id());
}
} // namespace db

110
db/corrupt_data_handler.hh Normal file
View File

@@ -0,0 +1,110 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "db/system_keyspace.hh"
#include "utils/UUID.hh"
#include "utils/pluggable.hh"
class reader_concurrency_semaphore;
class reader_permit;
namespace db {
class corrupt_data_handler {
public:
// An ID identifying the corrupt data entry.
// To be interpreted in the context of the storage where it is recorded, see storage_name().
using entry_id = utils::tagged_uuid<struct corrupt_data_entry_tag>;
struct stats {
// Counters for the number of corrupt data entries reported.
uint64_t corrupt_data_reported = 0;
// Counters for the number of corrupt data entries recorded.
// Can be less than reported depending on the configuration or if entries failed to be recorded.
uint64_t corrupt_data_recorded = 0;
uint64_t corrupt_clustering_rows_reported = 0;
uint64_t corrupt_clustering_rows_recorded = 0;
};
private:
stats _stats;
seastar::metrics::metric_groups _metrics;
protected:
virtual future<entry_id> do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional<sstring> sstable_name) = 0;
public:
using register_metrics = bool_class<struct corrupt_data_handler_register_metrics_tag>;
explicit corrupt_data_handler(register_metrics);
virtual ~corrupt_data_handler() = default;
const stats& get_stats() const noexcept {
return _stats;
}
// The name of the storage where corrupt data is recorded.
// The storage-name and the entry-id together should allow the user to unambiguously locate the entry.
virtual sstring storage_name() const noexcept = 0;
// Record a corrupt clustering row.
// If the returned id is null, the row was not recorded.
future<entry_id> record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional<sstring> sstable_name);
};
// Stores corrupt data entries in the system.corrupt_data table.
class system_table_corrupt_data_handler final : public corrupt_data_handler {
public:
using pluggable_system_keyspace = utils::pluggable<db::system_keyspace>;
struct config {
gc_clock::duration entry_ttl;
};
private:
gc_clock::duration _entry_ttl;
pluggable_system_keyspace _sys_ks;
std::unique_ptr<reader_concurrency_semaphore> _fragment_semaphore;
private:
reader_permit make_fragment_permit(const schema& s);
future<entry_id> do_record_corrupt_mutation_fragment(pluggable_system_keyspace::permit sys_ks, const schema& user_table_schema, const partition_key& pk, const clustering_key& ck,
mutation_fragment_v2::kind kind, frozen_mutation_fragment_v2 mf, sstring origin, std::optional<sstring> sstable_name);
virtual future<entry_id> do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional<sstring> sstable_name) override;
public:
explicit system_table_corrupt_data_handler(config, register_metrics);
~system_table_corrupt_data_handler();
virtual sstring storage_name() const noexcept override {
return format("{}.{}", db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA);
}
void plug_system_keyspace(db::system_keyspace& sys_ks) noexcept;
future<> unplug_system_keyspace() noexcept;
};
// A no-op corrupt data handler that does not record any data.
class nop_corrupt_data_handler final : public corrupt_data_handler {
virtual future<entry_id> do_record_corrupt_clustering_row(const schema& s, const partition_key& pk, clustering_row cr, sstring origin, std::optional<sstring> sstable_name) override;
public:
explicit nop_corrupt_data_handler(register_metrics rm)
: corrupt_data_handler(rm) {}
virtual sstring storage_name() const noexcept override {
return "/dev/null";
}
};
} // namespace db

View File

@@ -36,6 +36,7 @@
#include "db/schema_tables.hh"
#include "gms/generation-number.hh"
#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "service/paxos/paxos_state.hh"
#include "query-result-set.hh"
#include "idl/frozen_mutation.dist.hh"
@@ -778,6 +779,35 @@ schema_ptr system_keyspace::large_cells() {
return large_cells;
}
schema_ptr system_keyspace::corrupt_data() {
static thread_local auto corrupt_data = [] {
auto id = generate_legacy_id(NAME, CORRUPT_DATA);
return schema_builder(NAME, CORRUPT_DATA, id)
// partition key
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::partition_key)
// clustering key
.with_column("id", timeuuid_type, column_kind::clustering_key)
// regular rows
// Storing keys as bytes: having a corrupt key might be the reason
// to record the row as corrupt, so we just dump what we have and
// leave interpreting to the lucky person investigating the disaster.
.with_column("partition_key", bytes_type)
.with_column("clustering_key", bytes_type)
// Note: mutation-fragment v2
.with_column("mutation_fragment_kind", utf8_type)
.with_column("frozen_mutation_fragment", bytes_type)
.with_column("origin", utf8_type)
.with_column("sstable_name", utf8_type)
// options
.set_comment("mutation-fragments found to be corrupted")
.set_gc_grace_seconds(0)
.with_hash_version()
.build();
}();
return corrupt_data;
}
static constexpr auto schema_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
/*static*/ schema_ptr system_keyspace::scylla_local() {
@@ -2327,6 +2357,7 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
peers(), peer_events(), range_xfers(),
compactions_in_progress(), compaction_history(),
sstable_activity(), size_estimates(), large_partitions(), large_rows(), large_cells(),
corrupt_data(),
scylla_local(), db::schema_tables::scylla_table_schema_history(),
repair_history(),
v3::views_builds_in_progress(), v3::built_views(),
@@ -3662,4 +3693,12 @@ future<::shared_ptr<cql3::untyped_result_set>> system_keyspace::execute_cql(cons
return _qp.execute_internal(query_string, values, cql3::query_processor::cache_internal::yes);
}
future<> system_keyspace::apply_mutation(mutation m) {
if (m.schema()->ks_name() != NAME) {
on_internal_error(slogger, fmt::format("system_keyspace::apply_mutation(): attempted to apply mutation belonging to table {}.{}", m.schema()->cf_name(), m.schema()->ks_name()));
}
return _qp.proxy().mutate_locally(m, {}, db::commitlog::force_sync(m.schema()->static_props().wait_for_sync_to_commitlog), db::no_timeout);
}
} // namespace db

View File

@@ -131,6 +131,7 @@ class system_keyspace : public seastar::peering_sharded_service<system_keyspace>
static schema_ptr large_partitions();
static schema_ptr large_rows();
static schema_ptr large_cells();
static schema_ptr corrupt_data();
static schema_ptr scylla_local();
future<> force_blocking_flush(sstring cfname);
// This function is called when the system.peers table is read,
@@ -163,6 +164,7 @@ public:
static constexpr auto LARGE_PARTITIONS = "large_partitions";
static constexpr auto LARGE_ROWS = "large_rows";
static constexpr auto LARGE_CELLS = "large_cells";
static constexpr auto CORRUPT_DATA = "corrupt_data";
static constexpr auto SCYLLA_LOCAL = "scylla_local";
static constexpr auto RAFT = "raft";
static constexpr auto RAFT_SNAPSHOTS = "raft_snapshots";
@@ -684,6 +686,10 @@ public:
return execute_cql(req, { data_value(std::forward<Args>(args))... });
}
// Apply write as mutation to the system keyspace.
// Mutation has to belong to a table int he system keyspace.
future<> apply_mutation(mutation m);
friend future<column_mapping> db::schema_tables::get_column_mapping(db::system_keyspace& sys_ks, ::table_id table_id, table_schema_version version);
friend future<bool> db::schema_tables::column_mapping_exists(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);
friend future<> db::schema_tables::drop_column_mapping(db::system_keyspace& sys_ks, table_id table_id, table_schema_version version);

View File

@@ -121,6 +121,29 @@ SELECT * FROM system.large_cells;
SELECT * FROM system.large_cells WHERE keyspace_name = 'ks1' and table_name = 'standard1';
~~~
## system.corrupt\_data
Stores data found to be corrupt during internal operations. This data cannot be written to sstables because then it will be spread around by repair and compaction. It will also possibly cause failures in sstable parsing.
At the same time, the data should be kept around so that it can be inspected and possibly restored by the database operator.
This table is used to store such data. Data is saved at the mutation-fragment level.
Schema:
```cql
CREATE TABLE system.corrupt_data (
keyspace_name text, # keyspace name of source table
table_name text, # table name of source table
id timeuuid, # id of the corrupt mutation fragment, assigned by the database when the corrupt data entry is created
partition_key blob, # partition key of partition in the source table, can be incomplete or null due to corruption
clustering_key text, # clustering key of mutation-fragment in the source table, can be null for some mutation-fragment kinds, can be incomplete or null due to corruption
mutation_fragment_kind text, # kind of the mutation fragment, one of 'partition start', 'partition end', 'static row', 'clustering row', 'range tombstone change'; only the latter two can have clustering_key set
frozen_mutation_fragment blob, # the serialized mutation fragment itself
origin text, # the name of the process that found the corruption, e.g. 'sstable-writer'
sstable_name text, # the name of the sstable that contains the corrupt data, if known; sstable is not kept around, it could be compacted or deleted
PRIMARY KEY ((keyspace_name, table_name), id)
) WITH CLUSTERING ORDER BY (id ASC)
AND gc_grace_seconds = 0;
```
## system.raft
Holds information about Raft

View File

@@ -196,6 +196,16 @@ template<typename Input>
return static_cast<{name}>(deserialize(buf, std::type_identity<{self.underlying_type}>()));
}}""")
def serializer_skip_impl(self, cout):
name = self.ns_qualified_name()
fprintln(cout, f"""
{self.template_declaration}
template<typename Input>
void serializer<{name}>::skip(Input& buf) {{
buf.skip(sizeof({self.underlying_type}));
}}""")
class Attributes(ASTBase):
''' AST node for representing class and field attributes.
@@ -839,6 +849,7 @@ def handle_enum(enum, hout, cout):
enum.serializer_write_impl(cout)
enum.serializer_read_impl(cout)
enum.serializer_skip_impl(cout)
def join_template(template_params):

View File

@@ -59,6 +59,7 @@ set(idl_headers
replica_exception.idl.hh
per_partition_rate_limit_info.idl.hh
position_in_partition.idl.hh
full_position.idl.hh
experimental/broadcast_tables_lang.idl.hh
join_node.idl.hh
utils.idl.hh

14
idl/full_position.idl.hh Normal file
View File

@@ -0,0 +1,14 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "idl/position_in_partition.idl.hh"
struct full_position {
partition_key partition;
position_in_partition position;
};

View File

@@ -11,6 +11,7 @@
#include "idl/uuid.idl.hh"
#include "idl/keys.idl.hh"
#include "idl/position_in_partition.idl.hh"
class counter_id final {
utils::UUID uuid();
@@ -114,6 +115,12 @@ class range_tombstone [[writable]] {
bound_kind end_kind [[version 1.3]] = bound_kind::incl_end;
};
class range_tombstone_change stub [[writable]] {
clustering_key_prefix key;
bound_weight weight; // we are trying to move away from bound_kind
tombstone tomb;
};
class mutation_partition stub [[writable]] {
tombstone tomb;
row static_row;
@@ -168,3 +175,7 @@ class mutation_fragment stub [[writable]] {
partition_start, partition_end> fragment;
};
class mutation_fragment_v2 stub [[writable]] {
std::variant<clustering_row, static_row, range_tombstone_change,
partition_start, partition_end> fragment;
};

View File

@@ -26,8 +26,3 @@ class position_in_partition {
bound_weight get_bound_weight();
std::optional<clustering_key_prefix> get_clustering_key_prefix();
};
struct full_position {
partition_key partition;
position_in_partition position;
};

View File

@@ -22,6 +22,7 @@
#include "idl/keys.idl.hh"
#include "idl/uuid.idl.hh"
#include "idl/storage_service.idl.hh"
#include "idl/full_position.idl.hh"
verb [[with_client_info, with_timeout, one_way]] mutation (frozen_mutation fm [[ref]], inet_address_vector_replica_set forward [[ref]], gms::inet_address reply_to, unsigned shard, uint64_t response_id, std::optional<tracing::trace_info> trace_info [[ref]] [[version 1.3.0]], db::per_partition_rate_limit::info rate_limit_info [[version 5.1.0]], service::fencing_token fence [[version 5.4.0]], host_id_vector_replica_set forward_id [[ref, version 6.3.0]], locator::host_id reply_to_id [[version 6.3.0]]);
verb [[with_client_info, one_way]] mutation_done (unsigned shard, uint64_t response_id, db::view::update_backlog backlog [[version 3.1.0]]);

View File

@@ -281,6 +281,22 @@ public:
frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf);
class frozen_mutation_fragment_v2 {
bytes_ostream _bytes;
public:
explicit frozen_mutation_fragment_v2(bytes_ostream bytes) : _bytes(std::move(bytes)) { }
const bytes_ostream& representation() const { return _bytes; }
bytes_ostream&& representation() && { return std::move(_bytes); }
mutation_fragment_v2 unfreeze(const schema& s, reader_permit permit);
future<> clear_gently() noexcept {
return _bytes.clear_gently();
}
};
frozen_mutation_fragment_v2 freeze(const schema& s, const mutation_fragment_v2& mf);
template<FlattenedConsumerV2 Consumer>
auto frozen_mutation::consume(schema_ptr s, frozen_mutation_consumer_adaptor<Consumer>& adaptor) const -> frozen_mutation_consume_result<decltype(adaptor.consumer().consume_end_of_stream())> {
check_schema_version(schema_version(), *s);

View File

@@ -280,3 +280,37 @@ frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf)
)).end_mutation_fragment();
return frozen_mutation_fragment(std::move(out));
}
frozen_mutation_fragment_v2 freeze(const schema& s, const mutation_fragment_v2& mf) {
bytes_ostream out;
ser::writer_of_mutation_fragment_v2<bytes_ostream> writer(out);
mf.visit(seastar::make_visitor(
[&] (const clustering_row& cr) {
return write_row(std::move(writer).start_fragment_clustering_row().start_row(), s, cr.key(), cr.cells(), cr.marker(), cr.tomb())
.end_row()
.end_clustering_row();
},
[&] (const static_row& sr) {
return write_row_cells(std::move(writer).start_fragment_static_row().start_cells(), sr.cells(), s, column_kind::static_column)
.end_cells()
.end_static_row();
},
[&] (const range_tombstone_change& rtc) {
return std::move(writer).start_fragment_range_tombstone_change()
.write_key(rtc.position().key())
.write_weight(rtc.position().get_bound_weight())
.write_tomb(rtc.tombstone())
.end_range_tombstone_change();
},
[&] (const partition_start& ps) {
return std::move(writer).start_fragment_partition_start()
.write_key(ps.key().key())
.write_partition_tombstone(ps.partition_tombstone())
.end_partition_start();
},
[&] (const partition_end& pe) {
return std::move(writer).write_fragment_partition_end(pe);
}
)).end_mutation_fragment_v2();
return frozen_mutation_fragment_v2(std::move(out));
}

View File

@@ -448,59 +448,60 @@ mutation_partition_view mutation_partition_view::from_view(ser::mutation_partiti
return { v.v };
}
clustering_row read_clustered_row(const schema& s, ser::clustering_row_view crv) {
class clustering_row_builder {
const schema& _s;
clustering_row _row;
public:
clustering_row_builder(const schema& s, clustering_key key, row_tombstone t, row_marker m)
: _s(s), _row(std::move(key), std::move(t), std::move(m), row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_row.cells().append_cell(id, std::move(ac));
}
void accept_collection(column_id id, const collection_mutation& cm) {
_row.cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm));
}
clustering_row get() && { return std::move(_row); }
};
auto cr = crv.row();
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
clustering_row_builder builder(s, cr.key(), std::move(t), read_row_marker(cr.marker()));
read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder);
return std::move(builder).get();
}
static_row read_static_row(const schema& s, ser::static_row_view sr) {
class static_row_builder {
const schema& _s;
static_row _row;
public:
explicit static_row_builder(const schema& s)
: _s(s) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_row.cells().append_cell(id, std::move(ac));
}
void accept_collection(column_id id, const collection_mutation& cm) {
_row.cells().append_cell(id, collection_mutation(*_s.static_column_at(id).type, cm));
}
static_row get() && { return std::move(_row); }
};
static_row_builder builder(s);
read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder);
return std::move(builder).get();
}
mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s, reader_permit permit)
{
auto in = ser::as_input_stream(_bytes);
auto view = ser::deserialize(in, std::type_identity<ser::mutation_fragment_view>());
return seastar::visit(view.fragment(),
[&] (ser::clustering_row_view crv) {
class clustering_row_builder {
const schema& _s;
mutation_fragment _mf;
public:
clustering_row_builder(const schema& s, reader_permit permit, clustering_key key, row_tombstone t, row_marker m)
: _s(s), _mf(mutation_fragment::clustering_row_tag_t(), s, std::move(permit), std::move(key), std::move(t), std::move(m), row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable {
cr.cells().append_cell(id, std::move(ac));
});
}
void accept_collection(column_id id, const collection_mutation& cm) {
_mf.mutate_as_clustering_row(_s, [&] (clustering_row& cr) mutable {
cr.cells().append_cell(id, collection_mutation(*_s.regular_column_at(id).type, cm));
});
}
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
};
auto cr = crv.row();
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
clustering_row_builder builder(s, permit, cr.key(), std::move(t), read_row_marker(cr.marker()));
read_and_visit_row(cr.cells(), s.get_column_mapping(), column_kind::regular_column, builder);
return std::move(builder).get_mutation_fragment();
return mutation_fragment(s, permit, read_clustered_row(s, crv));
},
[&] (ser::static_row_view sr) {
class static_row_builder {
const schema& _s;
mutation_fragment _mf;
public:
explicit static_row_builder(const schema& s, reader_permit permit) : _s(s), _mf(_s, std::move(permit), static_row()) { }
void accept_atomic_cell(column_id id, atomic_cell ac) {
_mf.mutate_as_static_row(_s, [&] (static_row& sr) mutable {
sr.cells().append_cell(id, std::move(ac));
});
}
void accept_collection(column_id id, const collection_mutation& cm) {
_mf.mutate_as_static_row(_s, [&] (static_row& sr) mutable {
sr.cells().append_cell(id, collection_mutation(*_s.static_column_at(id).type, cm));
});
}
mutation_fragment get_mutation_fragment() && { return std::move(_mf); }
};
static_row_builder builder(s, permit);
read_and_visit_row(sr.cells(), s.get_column_mapping(), column_kind::static_column, builder);
return std::move(builder).get_mutation_fragment();
return mutation_fragment(s, permit, read_static_row(s, sr));
},
[&] (ser::range_tombstone_view rt) {
return mutation_fragment(s, permit, range_tombstone(rt));
@@ -517,3 +518,31 @@ mutation_fragment frozen_mutation_fragment::unfreeze(const schema& s, reader_per
}
);
}
mutation_fragment_v2 frozen_mutation_fragment_v2::unfreeze(const schema& s, reader_permit permit)
{
auto in = ser::as_input_stream(_bytes);
auto view = ser::deserialize(in, std::type_identity<ser::mutation_fragment_v2_view>());
return seastar::visit(view.fragment(),
[&] (ser::clustering_row_view crv) {
return mutation_fragment_v2(s, permit, read_clustered_row(s, crv));
},
[&] (ser::static_row_view sr) {
return mutation_fragment_v2(s, permit, read_static_row(s, sr));
},
[&] (ser::range_tombstone_change_view rtc) {
auto pos = position_in_partition(partition_region::clustered, rtc.weight(), rtc.key());
return mutation_fragment_v2(s, permit, range_tombstone_change(std::move(pos), rtc.tomb()));
},
[&] (ser::partition_start_view ps) {
auto dkey = dht::decorate_key(s, ps.key());
return mutation_fragment_v2(s, permit, partition_start(std::move(dkey), ps.partition_tombstone()));
},
[&] (partition_end) {
return mutation_fragment_v2(s, permit, partition_end());
},
[] (ser::unknown_variant_type) -> mutation_fragment_v2 {
throw std::runtime_error("Trying to deserialize unknown mutation fragment type");
}
);
}

View File

@@ -57,6 +57,7 @@
#include "db/timeout_clock.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "db/data_listeners.hh"
#include "data_dictionary/user_types_metadata.hh"
@@ -417,8 +418,10 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
_cfg.compaction_rows_count_warning_threshold,
_cfg.compaction_collection_elements_count_warning_threshold))
, _nop_large_data_handler(std::make_unique<db::nop_large_data_handler>())
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>("user", *_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>("system", *_nop_large_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group))
, _corrupt_data_handler(std::make_unique<db::system_table_corrupt_data_handler>(db::system_table_corrupt_data_handler::config{.entry_ttl = std::chrono::days(10)}, db::corrupt_data_handler::register_metrics::yes))
, _nop_corrupt_data_handler(std::make_unique<db::nop_corrupt_data_handler>(db::corrupt_data_handler::register_metrics::no))
, _user_sstables_manager(std::make_unique<sstables::sstables_manager>("user", *_large_data_handler, *_corrupt_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group, &sstm))
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>("system", *_nop_large_data_handler, *_nop_corrupt_data_handler, _cfg, feat, _row_cache_tracker, dbcfg.available_memory, sst_dir_sem, [&stm]{ return stm.get()->get_my_id(); }, scf, abort, dbcfg.streaming_scheduling_group))
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>())
, _mnotifier(mn)
@@ -3050,6 +3053,7 @@ database::as_data_dictionary() const {
void database::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept {
_compaction_manager.plug_system_keyspace(sys_ks);
_large_data_handler->plug_system_keyspace(sys_ks);
_corrupt_data_handler->plug_system_keyspace(sys_ks);
_user_sstables_manager->plug_sstables_registry(std::make_unique<db::system_keyspace_sstables_registry>(sys_ks));
}
@@ -3057,6 +3061,7 @@ future<> database::unplug_system_keyspace() noexcept {
_user_sstables_manager->unplug_sstables_registry();
co_await _compaction_manager.unplug_system_keyspace();
co_await _large_data_handler->unplug_system_keyspace();
co_await _corrupt_data_handler->unplug_system_keyspace();
}
void database::plug_view_update_generator(db::view::view_update_generator& generator) noexcept {

View File

@@ -137,6 +137,8 @@ class extensions;
class rp_handle;
class data_listeners;
class large_data_handler;
class system_table_corrupt_data_handler;
class nop_corrupt_data_handler;
class system_keyspace;
namespace view {
@@ -1583,6 +1585,9 @@ private:
std::unique_ptr<db::large_data_handler> _large_data_handler;
std::unique_ptr<db::large_data_handler> _nop_large_data_handler;
std::unique_ptr<db::system_table_corrupt_data_handler> _corrupt_data_handler;
std::unique_ptr<db::nop_corrupt_data_handler> _nop_corrupt_data_handler;
std::unique_ptr<sstables::sstables_manager> _user_sstables_manager;
std::unique_ptr<sstables::sstables_manager> _system_sstables_manager;

View File

@@ -18,6 +18,7 @@
#include "utils/assert.hh"
#include "utils/exceptions.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include <functional>
#include <boost/iterator/iterator_facade.hpp>
@@ -693,6 +694,8 @@ private:
void maybe_record_large_cells(const sstables::sstable& sst, const sstables::key& partition_key,
const clustering_key_prefix* clustering_key, const column_definition& cdef, uint64_t cell_size, uint64_t collection_elements);
void record_corrupt_row(clustering_row&& clustered_row);
// Writes single atomic cell
void write_cell(bytes_ostream& writer, const clustering_key_prefix* clustering_key, atomic_cell_view cell, const column_definition& cdef,
const row_time_properties& properties, std::optional<bytes_view> cell_path = {});
@@ -1305,12 +1308,41 @@ void writer::write_clustered(const clustering_row& clustered_row, uint64_t prev_
collect_row_stats(_data_writer->offset() - current_pos, &clustered_row.key(), is_dead);
}
void writer::record_corrupt_row(clustering_row&& clustered_row) {
auto& handler = _sst.get_corrupt_data_handler();
const auto pk = _partition_key->to_partition_key(_schema);
const auto ck = clustered_row.key();
db::corrupt_data_handler::entry_id corrupt_row_id;
sstring result;
try {
corrupt_row_id = handler.record_corrupt_clustering_row(_schema, pk, std::move(clustered_row), "sstable-write", fmt::to_string(_sst.get_filename())).get();
result = format("written corrupt row to {} with id {}", handler.storage_name(), corrupt_row_id);
} catch (...) {
result = format("failed to write corrupt row to {}: {}", handler.storage_name(), std::current_exception());
}
slogger.error("found non-full clustering key {} in partition {} while writing sstable {} for non-compact table {}.{}; {}",
ck,
pk,
_sst.get_filename(),
_schema.ks_name(),
_schema.cf_name(),
result);
}
stop_iteration writer::consume(clustering_row&& cr) {
if (_write_regular_as_static) {
ensure_tombstone_is_written();
write_static_row(cr.cells(), column_kind::regular_column);
return stop_iteration::no;
}
if (!_schema.is_compact_table() && !cr.key().is_full(_schema)) {
record_corrupt_row(std::move(cr));
return stop_iteration::no;
}
ensure_tombstone_is_written();
ensure_static_row_is_written_if_needed();
write_clustered(cr);

View File

@@ -3220,6 +3220,7 @@ sstable::sstable(schema_ptr schema,
version_types v,
format_types f,
db::large_data_handler& large_data_handler,
db::corrupt_data_handler& corrupt_data_handler,
sstables_manager& manager,
db_clock::time_point now,
io_error_handler_gen error_handler_gen,
@@ -3237,6 +3238,7 @@ sstable::sstable(schema_ptr schema,
, _read_error_handler(error_handler_gen(sstable_read_error))
, _write_error_handler(error_handler_gen(sstable_write_error))
, _large_data_handler(large_data_handler)
, _corrupt_data_handler(corrupt_data_handler)
, _manager(manager)
{
manager.add(this);

View File

@@ -58,6 +58,7 @@ class in_memory_config_type;
namespace db {
class large_data_handler;
class corrupt_data_handler;
}
namespace sstables {
@@ -199,6 +200,7 @@ public:
version_types v,
format_types f,
db::large_data_handler& large_data_handler,
db::corrupt_data_handler& corrupt_data_handler,
sstables_manager& manager,
db_clock::time_point now,
io_error_handler_gen error_handler_gen,
@@ -443,6 +445,10 @@ public:
return _large_data_handler;
}
db::corrupt_data_handler& get_corrupt_data_handler() {
return _corrupt_data_handler;
}
void assert_large_data_handler_is_running();
/**
@@ -583,6 +589,7 @@ private:
io_error_handler _write_error_handler;
db::large_data_handler& _large_data_handler;
db::corrupt_data_handler& _corrupt_data_handler;
sstables_manager& _manager;
sstables_stats _stats;

View File

@@ -26,11 +26,13 @@ namespace sstables {
logging::logger smlogger("sstables_manager");
sstables_manager::sstables_manager(
sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem,
sstring name, db::large_data_handler& large_data_handler, db::corrupt_data_handler& corrupt_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem,
noncopyable_function<locator::host_id()>&& resolve_host_id, sstable_compressor_factory& compressor_factory, const abort_source& abort, scheduling_group maintenance_sg, storage_manager* shared)
: _storage(shared)
, _available_memory(available_memory)
, _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat), _cache_tracker(ct)
, _large_data_handler(large_data_handler)
, _corrupt_data_handler(corrupt_data_handler)
, _db_config(dbcfg), _features(feat), _cache_tracker(ct)
, _sstable_metadata_concurrency_sem(
max_count_sstable_metadata_concurrent_reads,
max_memory_sstable_metadata_concurrent_reads(available_memory),
@@ -161,7 +163,7 @@ shared_sstable sstables_manager::make_sstable(schema_ptr schema,
db_clock::time_point now,
io_error_handler_gen error_handler_gen,
size_t buffer_size) {
return make_lw_shared<sstable>(std::move(schema), storage, generation, state, v, f, get_large_data_handler(), *this, now, std::move(error_handler_gen), buffer_size);
return make_lw_shared<sstable>(std::move(schema), storage, generation, state, v, f, get_large_data_handler(), get_corrupt_data_handler(), *this, now, std::move(error_handler_gen), buffer_size);
}
sstable_writer_config sstables_manager::configure_writer(sstring origin) const {

View File

@@ -31,6 +31,7 @@
namespace db {
class large_data_handler;
class corrupt_data_handler;
class config;
class object_storage_endpoint_param;
@@ -97,6 +98,7 @@ private:
storage_manager* _storage;
size_t _available_memory;
db::large_data_handler& _large_data_handler;
db::corrupt_data_handler& _corrupt_data_handler;
const db::config& _db_config;
gms::feature_service& _features;
// _sstables_format is the format used for writing new sstables.
@@ -145,8 +147,20 @@ private:
signal_type _signal_source;
public:
explicit sstables_manager(sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker&, size_t available_memory, directory_semaphore& dir_sem,
noncopyable_function<locator::host_id()>&& resolve_host_id, sstable_compressor_factory&, const abort_source& abort, scheduling_group maintenance_sg = current_scheduling_group(), storage_manager* shared = nullptr);
explicit sstables_manager(
sstring name,
db::large_data_handler& large_data_handler,
db::corrupt_data_handler& corrupt_data_handler,
const db::config& dbcfg,
gms::feature_service& feat,
cache_tracker&,
size_t available_memory,
directory_semaphore& dir_sem,
noncopyable_function<locator::host_id()>&& resolve_host_id,
sstable_compressor_factory&,
const abort_source& abort,
scheduling_group maintenance_sg = current_scheduling_group(),
storage_manager* shared = nullptr);
virtual ~sstables_manager();
shared_sstable make_sstable(schema_ptr schema,
@@ -251,6 +265,9 @@ private:
db::large_data_handler& get_large_data_handler() const {
return _large_data_handler;
}
db::corrupt_data_handler& get_corrupt_data_handler() const {
return _corrupt_data_handler;
}
friend class sstable;
// Allow testing private methods/variables via test_env_sstables_manager

View File

@@ -48,10 +48,13 @@
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/test_utils.hh"
#include "test/lib/cql_test_env.hh"
#include "readers/from_mutations.hh"
#include "readers/from_fragments.hh"
#include "readers/combined.hh"
#include "test/lib/random_schema.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/cql_assertions.hh"
namespace fs = std::filesystem;
@@ -3282,3 +3285,130 @@ SEASTAR_TEST_CASE(sstable_identifier_correctness) {
BOOST_REQUIRE_EQUAL(sst->sstable_identifier()->uuid(), sst->generation().as_uuid());
});
}
SEASTAR_TEST_CASE(test_non_full_and_empty_row_keys) {
return do_with_cql_env_thread([] (cql_test_env& env) {
const auto seed = tests::random::get_int<uint32_t>();
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema(seed, *random_spec);
testlog.info("Random schema:\n{}", random_schema.cql());
random_schema.create_with_cql(env).get();
auto schema = random_schema.schema();
auto& db = env.local_db();
auto& table = db.find_column_family(schema);
auto& manager = db.get_user_sstables_manager();
const auto generated_mutations = tests::generate_random_mutations(random_schema).get();
auto mutation_description = random_schema.new_mutation(0);
auto engine = std::mt19937(seed);
random_schema.add_row(engine, mutation_description, 0, [] (std::mt19937& engine, tests::timestamp_destination destination, api::timestamp_type min_timestamp) {
switch (destination) {
case tests::timestamp_destination::partition_tombstone:
case tests::timestamp_destination::row_tombstone:
case tests::timestamp_destination::collection_tombstone:
case tests::timestamp_destination::range_tombstone:
return api::missing_timestamp;
default:
return api::timestamp_type(100);
}
});
const auto row_mutation = mutation_description.build(schema);
auto check = [&] (const clustering_key& ck) {
testlog.info("check({})", ck);
auto permit = db.obtain_reader_permit(schema, "test_non_full_and_empty_row_keys::write", db::no_timeout, {}).get();
const auto dk = generated_mutations.front().decorated_key();
const auto row_mutation_fragment = mutation_fragment_v2(*schema, permit,
clustering_row(ck, deletable_row(*schema, row_mutation.partition().clustered_rows().begin()->row())));
std::deque<mutation_fragment_v2> fragments;
fragments.emplace_back(*schema, permit, partition_start(dk, {}));
fragments.emplace_back(*schema, permit, row_mutation_fragment);
fragments.emplace_back(*schema, permit, partition_end());
const auto original_mutation_fragment = mutation_fragment_v2(*schema, permit, fragments[1]);
auto reader = make_combined_reader(schema, permit,
make_mutation_reader_from_mutations(schema, permit, generated_mutations, query::full_partition_range),
make_mutation_reader_from_fragments(schema, permit, std::move(fragments)));
auto sst = table.make_sstable();
auto& corrupt_data_handler = sst->get_corrupt_data_handler();
const auto stats_before = corrupt_data_handler.get_stats();
sst->write_components(std::move(reader), generated_mutations.size(), schema, manager.configure_writer("test"), encoding_stats{}).get();
sst->load(schema->get_sharder(), {}).get();
testlog.info("mutations written to : {}", sst->get_filename());
// The sstable should not contain the row with the bad key -- that should be passed to the corrupt_data_handler.
assert_that(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice()))
.produces(generated_mutations);
const auto stats_after = corrupt_data_handler.get_stats();
for (const uint64_t db::corrupt_data_handler::stats::* member : {
&db::corrupt_data_handler::stats::corrupt_data_reported,
&db::corrupt_data_handler::stats::corrupt_data_recorded,
&db::corrupt_data_handler::stats::corrupt_clustering_rows_reported,
&db::corrupt_data_handler::stats::corrupt_clustering_rows_recorded}) {
BOOST_REQUIRE_EQUAL(stats_after.*member, stats_before.*member + 1);
}
auto res = env.execute_cql(format("SELECT * FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'",
db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get();
assert_that(res)
.is_rows()
.with_size(1)
.with_columns_of_row(0)
.with_typed_column<timeuuid_native_type>("id", [] (const timeuuid_native_type& v) { return !v.uuid.is_null(); })
.with_typed_column<bytes>("partition_key", [&] (const bytes& v) {
return partition_key::from_bytes(v).equal(*schema, dk.key());
})
.with_typed_column<bytes>("clustering_key", [&] (const bytes& v) {
return clustering_key::from_bytes(v).equal(*schema, ck);
})
.with_typed_column<sstring>("mutation_fragment_kind", "clustering row")
.with_typed_column<bytes>("frozen_mutation_fragment", [&] (const bytes& v) {
bytes_ostream fmf_bytes;
fmf_bytes.write(v);
frozen_mutation_fragment_v2 fmf(std::move(fmf_bytes));
const auto unfreezed_mutation_fragment = fmf.unfreeze(*schema, permit);
return unfreezed_mutation_fragment.equal(*schema, original_mutation_fragment);
})
.with_typed_column<sstring>("origin", "sstable-write")
.with_typed_column<sstring>("sstable_name", fmt::to_string(sst->get_filename()))
;
// Clear the corrupt data table so that it doesn't affect other checks.
env.execute_cql(format("DELETE FROM {}.{} WHERE keyspace_name = '{}' AND table_name = '{}'",
db::system_keyspace::NAME, db::system_keyspace::CORRUPT_DATA, schema->ks_name(), schema->cf_name())).get();
};
check(clustering_key::make_empty());
if (schema->clustering_key_size() > 1) {
auto full_ckey = random_schema.make_ckey(0);
full_ckey.erase(full_ckey.end() - 1);
check(clustering_key::from_exploded(*schema, full_ckey));
}
});
}

View File

@@ -21,6 +21,47 @@ static inline void fail(sstring msg) {
throw std::runtime_error(msg);
}
void columns_assertions::fail(const sstring& msg) {
::fail(msg);
}
columns_assertions& columns_assertions::do_with_raw_column(const char* name, std::function<void(data_type, managed_bytes_view)> func) {
const auto& names = _metadata.get_names();
auto it = std::ranges::find_if(names, [name] (const auto& col) {
return col->name->text() == name;
});
if (it == names.end()) {
::fail(seastar::format("Column {} not found in metadata", name));
}
const size_t index = std::distance(names.begin(), it);
const auto& value = _columns.at(index);
if (!value) {
::fail(seastar::format("Column {} is null", name));
}
func((*it)->type, *value);
return *this;
}
columns_assertions& columns_assertions::with_raw_column(const char* name, std::function<bool(managed_bytes_view)> predicate) {
return do_with_raw_column(name, [name, &predicate] (data_type, managed_bytes_view value) {
if (!predicate(value)) {
::fail(seastar::format("Column {} failed predicate check: value = {}", name, value));
}
});
}
columns_assertions& columns_assertions::with_raw_column(const char* name, managed_bytes_view value) {
return do_with_raw_column(name, [name, &value] (data_type, managed_bytes_view cell_value) {
if (cell_value != value) {
::fail(seastar::format("Expected column {} to have value {}, but got {}", name, value, cell_value));
}
});
}
rows_assertions::rows_assertions(shared_ptr<cql_transport::messages::result_message::rows> rows)
: _rows(rows)
{ }
@@ -165,6 +206,11 @@ rows_assertions::with_rows_ignore_order(std::vector<std::vector<bytes_opt>> rows
return {*this};
}
columns_assertions rows_assertions::with_columns_of_row(size_t row_index) {
const auto& rs = _rows->rs().result_set();
return columns_assertions(rs.get_metadata(), rs.rows().at(row_index));
}
result_msg_assertions::result_msg_assertions(shared_ptr<cql_transport::messages::result_message> msg)
: _msg(msg)
{ }

View File

@@ -10,6 +10,7 @@
#pragma once
#include "utils/assert.hh"
#include "utils/managed_bytes.hh"
#include "test/lib/cql_test_env.hh"
#include "transport/messages/result_message_base.hh"
#include "bytes.hh"
@@ -17,6 +18,45 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/future.hh>
class columns_assertions {
const cql3::metadata& _metadata;
const std::vector<managed_bytes_opt>& _columns;
columns_assertions& do_with_raw_column(const char* name, std::function<void(data_type, managed_bytes_view)> func);
void fail(const sstring& msg);
public:
columns_assertions(const cql3::metadata& metadata, const std::vector<managed_bytes_opt>& columns)
: _metadata(metadata), _columns(columns)
{ }
columns_assertions& with_raw_column(const char* name, std::function<bool(managed_bytes_view)> predicate);
columns_assertions& with_raw_column(const char* name, managed_bytes_view value);
template <typename T>
columns_assertions& with_typed_column(const char* name, std::function<bool(const T& value)> predicate) {
return do_with_raw_column(name, [this, name, predicate] (data_type type, managed_bytes_view value) {
if (type != data_type_for<T>()) {
fail(seastar::format("Column {} is not of type {}, but of type {}", name, data_type_for<T>()->name(), type->name()));
}
if (!predicate(value_cast<T>(type->deserialize(value)))) {
fail(seastar::format("Column {} failed predicate check: value = {}", name, value));
}
});
}
template <typename T>
columns_assertions& with_typed_column(const char* name, const T& value) {
return with_typed_column<T>(name, [this, name, &value] (const T& cell_value) {
if (cell_value != value) {
fail(seastar::format("Expected column {} to have value {}, but got {}", name, value, cell_value));
}
return true;
});
}
};
class rows_assertions {
shared_ptr<cql_transport::messages::result_message::rows> _rows;
public:
@@ -33,6 +73,8 @@ public:
rows_assertions with_rows_ignore_order(std::vector<std::vector<bytes_opt>> rows);
rows_assertions with_serialized_columns_count(size_t columns_count);
columns_assertions with_columns_of_row(size_t row_index);
rows_assertions is_null();
rows_assertions is_not_null();
};

View File

@@ -15,6 +15,7 @@
#include "data_dictionary/storage_options.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "sstables/version.hh"
#include "sstables/sstable_directory.hh"
#include "compaction/compaction_manager.hh"
@@ -95,6 +96,7 @@ public:
struct test_env_config {
db::large_data_handler* large_data_handler = nullptr;
db::corrupt_data_handler* corrupt_data_handler = nullptr;
data_dictionary::storage_options storage; // will be local by default
size_t available_memory = memory::stats().total_memory();
};

View File

@@ -13,6 +13,7 @@
#include "test/lib/test_utils.hh"
#include "db/config.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "dht/i_partitioner.hh"
#include "gms/feature_service.hh"
#include "repair/row_level.hh"
@@ -202,6 +203,7 @@ struct test_env::impl {
::cache_tracker cache_tracker;
gms::feature_service feature_service;
db::nop_large_data_handler nop_ld_handler;
db::nop_corrupt_data_handler nop_cd_handler;
sstable_compressor_factory& scf;
test_env_sstables_manager mgr;
std::unique_ptr<test_env_compaction_manager> cmgr;
@@ -225,10 +227,22 @@ test_env::impl::impl(test_env_config cfg, sstable_compressor_factory& scfarg, ss
, db_config(make_db_config(dir.path().native(), cfg.storage))
, dir_sem(1)
, feature_service(gms::feature_config_from_db_config(*db_config))
, nop_cd_handler(db::corrupt_data_handler::register_metrics::no)
, scf(scfarg)
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
feature_service, cache_tracker, cfg.available_memory, dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; }, scf, abort, current_scheduling_group(), sstm)
, mgr(
"test_env",
cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler,
cfg.corrupt_data_handler == nullptr ? nop_cd_handler : *cfg.corrupt_data_handler,
*db_config,
feature_service,
cache_tracker,
cfg.available_memory,
dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; },
scf,
abort,
current_scheduling_group(),
sstm)
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
, storage(std::move(cfg.storage))
{

View File

@@ -16,6 +16,7 @@
#include "db/cache_tracker.hh"
#include "db/config.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "gms/feature_service.hh"
#include "schema/schema_fwd.hh"
#include "sstables/sstable_directory.hh"
@@ -29,6 +30,7 @@ future<std::filesystem::path> get_table_directory(std::filesystem::path scylla_d
struct sstable_manager_service {
db::nop_large_data_handler large_data_handler;
db::nop_corrupt_data_handler corrupt_data_handler;
gms::feature_service feature_service;
cache_tracker tracker;
sstables::directory_semaphore dir_sem;
@@ -36,9 +38,10 @@ struct sstable_manager_service {
abort_source abort;
explicit sstable_manager_service(const db::config& dbcfg, sstable_compressor_factory& scf)
: feature_service(gms::feature_config_from_db_config(dbcfg))
: corrupt_data_handler(db::corrupt_data_handler::register_metrics::no)
, feature_service(gms::feature_config_from_db_config(dbcfg))
, dir_sem(1)
, sst_man("schema_loader", large_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }, scf, abort) {
, sst_man("schema_loader", large_data_handler, corrupt_data_handler, dbcfg, feature_service, tracker, memory::stats().total_memory(), dir_sem, []{ return locator::host_id{}; }, scf, abort) {
}
future<> stop() {

View File

@@ -27,6 +27,7 @@
#include "db/config.hh"
#include "db/extensions.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
#include "db/system_keyspace.hh"
@@ -496,12 +497,13 @@ schema_ptr do_load_schema_from_sstable(const db::config& dbcfg, std::filesystem:
}
db::nop_large_data_handler large_data_handler;
db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no);
gms::feature_service feature_service(gms::feature_config_from_db_config(dbcfg));
cache_tracker tracker;
sstables::directory_semaphore dir_sem(1);
abort_source abort;
auto scf = make_sstable_compressor_factory_for_tests_in_thread();
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, dbcfg, feature_service, tracker,
sstables::sstables_manager sst_man("tools::load_schema_from_sstable", large_data_handler, corrupt_data_handler, dbcfg, feature_service, tracker,
memory::stats().total_memory(), dir_sem,
[host_id = locator::host_id::create_random_id()] { return host_id; }, *scf, abort);
auto close_sst_man = deferred_close(sst_man);

View File

@@ -28,6 +28,7 @@
#include "cql3/statements/select_statement.hh"
#include "db/config.hh"
#include "db/large_data_handler.hh"
#include "db/corrupt_data_handler.hh"
#include "gms/feature_service.hh"
#include "reader_concurrency_semaphore.hh"
#include "readers/combined.hh"
@@ -72,8 +73,6 @@ const auto app_name = "sstable";
logging::logger sst_log(format("scylla-{}", app_name));
db::nop_large_data_handler large_data_handler;
struct decorated_key_hash {
std::size_t operator()(const dht::decorated_key& dk) const {
return dht::token::to_int64(dk.token());
@@ -3570,9 +3569,13 @@ $ scylla sstable validate /path/to/md-123456-big-Data.db /path/to/md-123457-big-
sstm.start(std::ref(dbcfg), stm_cfg).get();
auto stop_sstm = defer([&sstm] { sstm.stop().get(); });
db::nop_large_data_handler large_data_handler;
db::nop_corrupt_data_handler corrupt_data_handler(db::corrupt_data_handler::register_metrics::no);
sstables::sstables_manager sst_man(
"scylla_sstable",
large_data_handler,
corrupt_data_handler,
dbcfg,
feature_service,
tracker,