From bc2e83bc1fbcd4ad940915815015a5ba920d080a Mon Sep 17 00:00:00 2001 From: Taras Veretilnyk Date: Wed, 26 Nov 2025 23:19:27 +0100 Subject: [PATCH] sstables: store digest of all sstable components in scylla metadata This change replaces plain file_writer with crc32_digest_file_writer for all SSTable components that should be checksummed. The resulting component digests are stored in the sstable structure and later persisted to disk as part of the Scylla metadata component during writer::consume_end_of_stream. --- sstables/mx/writer.cc | 19 +++++---- sstables/sstables.cc | 93 ++++++++++++++++++++++++++++++++++++++----- sstables/sstables.hh | 25 ++++++++++-- sstables/storage.cc | 17 +++----- 4 files changed, 123 insertions(+), 31 deletions(-) diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 80e8675b08..45759bd6d1 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -948,17 +948,16 @@ void writer::init_file_writers() { _sst._schema->get_compressor_params(), std::move(compressor)), _sst.get_filename()); } - if (_sst.has_component(component_type::Index)) { out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get(); - _index_writer = std::make_unique(output_stream(std::move(out)), _sst.index_filename()); + _index_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, _sst.index_filename()); } if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) { out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get(); - _rows_writer = std::make_unique(output_stream(std::move(out)), component_name(_sst, component_type::Rows)); + _rows_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows)); _bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer); out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get(); - _partitions_writer = std::make_unique(output_stream(std::move(out)), component_name(_sst, component_type::Partitions)); + _partitions_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions)); _bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer); } if (_delayed_filter) { @@ -988,7 +987,9 @@ void writer::close_data_writer() { void writer::close_index_writer() { if (_index_writer) { - close_writer(_index_writer); + auto writer = close_writer(_index_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().index_digest = chksum_wr->full_checksum(); } } @@ -998,7 +999,9 @@ void writer::close_partitions_writer() { _sst.get_version(), _first_key.value(), _last_key.value()); - close_writer(_partitions_writer); + auto writer = close_writer(_partitions_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().partitions_digest = chksum_wr->full_checksum(); } } @@ -1011,7 +1014,9 @@ void writer::close_rows_writer() { // upload to be a no-op, which breaks some assumptions). uint32_t garbage = seastar::cpu_to_be(0x13371337); _rows_writer->write(reinterpret_cast(&garbage), sizeof(garbage)); - close_writer(_rows_writer); + auto writer = close_writer(_rows_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().rows_digest = chksum_wr->full_checksum(); } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index d1f61501ee..a6222a3e69 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -956,16 +956,22 @@ future sstable::make_component_file_writer(component_type c, file_o }); } +future> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept { + return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([this, comp = component_name(*this, c)] (data_sink sink) mutable { + return std::make_unique(std::move(sink), sstable_buffer_size, comp); + }); +} + void sstable::open_sstable(const sstring& origin) { _origin = origin; generate_toc(); _storage->open(*this); } -void sstable::write_toc(file_writer w) { +void sstable::write_toc(std::unique_ptr w) { sstlog.debug("Writing TOC file {} ", toc_filename()); - do_write_simple(std::move(w), [&] (version_types v, file_writer& w) { + do_write_simple(*w, [&] (version_types v, file_writer& w) { for (auto&& key : _recognized_components) { // new line character is appended to the end of each component name. auto value = sstable_version_constants::get_component_map(v).at(key) + "\n"; @@ -973,6 +979,8 @@ void sstable::write_toc(file_writer w) { write(v, w, b); } }); + + _components_digests.toc_digest = w->full_checksum(); } void sstable::write_crc(const checksum& c) { @@ -989,6 +997,7 @@ void sstable::write_digest(uint32_t full_checksum) { auto digest = to_sstring(full_checksum); write(v, w, digest); }, buffer_size); + _components_digests.data_digest = full_checksum; } thread_local std::array, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache; @@ -1045,7 +1054,7 @@ future<> sstable::read_simple(T& component) { }); } -void sstable::do_write_simple(file_writer&& writer, +void sstable::do_write_simple(file_writer& writer, noncopyable_function write_component) { write_component(_version, writer); _metadata_size_on_disk += writer.offset(); @@ -1060,7 +1069,7 @@ void sstable::do_write_simple(component_type type, file_output_stream_options options; options.buffer_size = buffer_size; auto w = make_component_file_writer(type, std::move(options)).get(); - do_write_simple(std::move(w), std::move(write_component)); + do_write_simple(w, std::move(write_component)); } template @@ -1070,10 +1079,30 @@ void sstable::write_simple(const T& component) { }, sstable_buffer_size); } +uint32_t sstable::do_write_simple_with_digest(component_type type, + noncopyable_function write_component, unsigned buffer_size) { + auto file_path = filename(type); + sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path); + + file_output_stream_options options; + options.buffer_size = buffer_size; + auto w = make_digests_component_file_writer(type, std::move(options)).get(); + do_write_simple(*w, std::move(write_component)); + return w->full_checksum(); +} + +template +uint32_t sstable::write_simple_with_digest(const T& component) { + return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) { + write(v, w, component); + }, sstable_buffer_size); +} + template future<> sstable::read_simple(sstables::filter& f); template void sstable::write_simple(const sstables::filter& f); template void sstable::write_simple(const sstables::summary_ka&); +template uint32_t sstable::write_simple_with_digest(const sstables::summary_ka&); future<> sstable::read_compression() { // FIXME: If there is no compression, we should expect a CRC file to be present. @@ -1092,7 +1121,8 @@ void sstable::write_compression() { return; } - write_simple(_components->compression); + uint32_t digest = write_simple_with_digest(_components->compression); + _components_digests.compression_digest = digest; } void sstable::validate_partitioner() { @@ -1317,7 +1347,8 @@ future<> sstable::read_partitions_db_footer() { } void sstable::write_statistics() { - write_simple(_components->statistics); + auto digest = write_simple_with_digest(_components->statistics); + _components_digests.statistics_digest = digest; } void sstable::mark_as_being_repaired(const service::session_id& id) { @@ -1342,10 +1373,23 @@ void sstable::rewrite_statistics() { file_output_stream_options options; options.buffer_size = sstable_buffer_size; - auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options), + auto w = make_digests_component_file_writer(component_type::TemporaryStatistics, std::move(options), open_flags::wo | open_flags::create | open_flags::truncate).get(); - write(_version, w, _components->statistics); - w.close(); + write(_version, *w, _components->statistics); + w->close(); + + // When rewriting statistics, we also need to update the scylla component + // because it contains the digest of the statistics component. + if (has_scylla_component()) { + _components_digests.statistics_digest = w->full_checksum(); + _components->scylla_metadata->data.set(components_digests{_components_digests}); + sstlog.debug("Rewriting scylla component of sstable {}", get_filename()); + write_simple(*_components->scylla_metadata); + + // rename() guarantees atomicity when renaming a file into place. + sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryScylla)), fmt::to_string(filename(component_type::Scylla))).get(); + } + // rename() guarantees atomicity when renaming a file into place. sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get(); } @@ -1539,7 +1583,8 @@ void sstable::write_filter() { auto&& bs = f->bits(); auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage()); - write_simple(filter_ref); + uint32_t digest = write_simple_with_digest(filter_ref); + _components_digests.filter_digest = digest; } void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) { @@ -1998,6 +2043,8 @@ sstable::read_scylla_metadata() noexcept { } return read_simple(*_components->scylla_metadata).then([this] { _features = _components->scylla_metadata->get_features(); + _components_digests = _components->scylla_metadata->get_components_digests(); + _components->digest = _components_digests.data_digest; }); }); } @@ -2087,6 +2134,7 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier, sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}}); } _components->scylla_metadata->data.set(std::move(sstable_schema)); + _components->scylla_metadata->data.set(components_digests(_components_digests)); write_simple(*_components->scylla_metadata); } @@ -3075,6 +3123,31 @@ void sstable::set_sstable_level(uint32_t new_level) { s.sstable_level = new_level; } +std::optional sstable::get_component_digest(component_type c) const { + switch (c) { + case component_type::Index: + return _components_digests.index_digest; + case component_type::Summary: + return _components_digests.summary_digest; + case component_type::TOC: + return _components_digests.toc_digest; + case component_type::CompressionInfo: + return _components_digests.compression_digest; + case component_type::Filter: + return _components_digests.filter_digest; + case component_type::Partitions: + return _components_digests.partitions_digest; + case component_type::Rows: + return _components_digests.rows_digest; + case component_type::Data: + return _components_digests.data_digest; + case component_type::Statistics: + return _components_digests.statistics_digest; + default: + return std::nullopt; + } +} + future<> sstable::mutate_sstable_level(uint32_t new_level) { if (!has_component(component_type::Statistics)) { return make_ready_future<>(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index e884a64ba6..2660648b90 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -9,6 +9,7 @@ #pragma once +#include "sstables/writer.hh" #include "version.hh" #include "shared_sstable.hh" #include "open_info.hh" @@ -627,6 +628,8 @@ private: // Total memory reclaimed so far from this sstable size_t _total_memory_reclaimed{0}; bool _unlinked{false}; + + components_digests _components_digests; public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } @@ -647,12 +650,18 @@ private: template void write_simple(const T& comp); - void do_write_simple(file_writer&& writer, + void do_write_simple(file_writer& writer, noncopyable_function write_component); void do_write_simple(component_type type, noncopyable_function write_component, unsigned buffer_size); + template + uint32_t write_simple_with_digest(const T& comp); + uint32_t do_write_simple_with_digest(component_type type, + noncopyable_function write_component, + unsigned buffer_size); + void write_crc(const checksum& c); void write_digest(uint32_t full_checksum); @@ -663,6 +672,9 @@ private: future make_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept; + future> make_digests_component_file_writer(component_type c, file_output_stream_options options, + open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept; + void generate_toc(); void open_sstable(const sstring& origin); @@ -693,7 +705,8 @@ private: future<> read_summary() noexcept; void write_summary() { - write_simple(_components->summary); + uint32_t digest = write_simple_with_digest(_components->summary); + _components_digests.summary_digest = digest; } // To be called when we try to load an SSTable that lacks a Summary. Could @@ -823,7 +836,7 @@ private: future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept; // runs in async context (called from storage::open) - void write_toc(file_writer w); + void write_toc(std::unique_ptr w); static future read_digest_from_file(file f); static future> read_checksum_from_file(file f); public: @@ -1013,6 +1026,12 @@ public: return _components->digest; } + components_digests& get_components_digests() { + return _components_digests; + } + + std::optional get_component_digest(component_type c) const; + // Gets ratio of droppable tombstone. A tombstone is considered droppable here // for cells and tombstones expired before the time point "GC before", which // is the point before which expiring data can be purged. diff --git a/sstables/storage.cc b/sstables/storage.cc index c55ae8f948..fb25418133 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -204,13 +204,13 @@ void filesystem_storage::open(sstable& sst) { open_flags::create | open_flags::exclusive, options).get(); - auto w = file_writer(output_stream(std::move(sink)), component_name(sst, component_type::TemporaryTOC)); + auto w = std::make_unique(std::move(sink), sst.sstable_buffer_size, component_name(sst, component_type::TemporaryTOC)); bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get(); if (toc_exists) { // TOC will exist at this point if write_components() was called with // the generation of a sstable that exists. - w.close(); + w->close(); remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get(); throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name())); } @@ -670,15 +670,10 @@ void object_storage_base::open(sstable& sst) { sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get(); memory_data_sink_buffers bufs; - sst.write_toc( - file_writer( - output_stream( - data_sink( - std::make_unique(bufs) - ) - ) - ) - ); + auto out = data_sink(std::make_unique(bufs)); + auto w = std::make_unique(std::move(out), sst.sstable_buffer_size, component_name(sst, component_type::TOC)); + + sst.write_toc(std::move(w)); put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get(); }