diff --git a/sstables/mx/writer.cc b/sstables/mx/writer.cc index 80e8675b08..45759bd6d1 100644 --- a/sstables/mx/writer.cc +++ b/sstables/mx/writer.cc @@ -948,17 +948,16 @@ void writer::init_file_writers() { _sst._schema->get_compressor_params(), std::move(compressor)), _sst.get_filename()); } - if (_sst.has_component(component_type::Index)) { out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get(); - _index_writer = std::make_unique(output_stream(std::move(out)), _sst.index_filename()); + _index_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, _sst.index_filename()); } if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) { out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get(); - _rows_writer = std::make_unique(output_stream(std::move(out)), component_name(_sst, component_type::Rows)); + _rows_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows)); _bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer); out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get(); - _partitions_writer = std::make_unique(output_stream(std::move(out)), component_name(_sst, component_type::Partitions)); + _partitions_writer = std::make_unique(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions)); _bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer); } if (_delayed_filter) { @@ -988,7 +987,9 @@ void writer::close_data_writer() { void writer::close_index_writer() { if (_index_writer) { - close_writer(_index_writer); + auto writer = close_writer(_index_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().index_digest = chksum_wr->full_checksum(); } } @@ -998,7 +999,9 @@ void writer::close_partitions_writer() { _sst.get_version(), _first_key.value(), _last_key.value()); - close_writer(_partitions_writer); + auto writer = close_writer(_partitions_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().partitions_digest = chksum_wr->full_checksum(); } } @@ -1011,7 +1014,9 @@ void writer::close_rows_writer() { // upload to be a no-op, which breaks some assumptions). uint32_t garbage = seastar::cpu_to_be(0x13371337); _rows_writer->write(reinterpret_cast(&garbage), sizeof(garbage)); - close_writer(_rows_writer); + auto writer = close_writer(_rows_writer); + auto chksum_wr = static_cast(writer.get()); + _sst.get_components_digests().rows_digest = chksum_wr->full_checksum(); } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index d1f61501ee..a6222a3e69 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -956,16 +956,22 @@ future sstable::make_component_file_writer(component_type c, file_o }); } +future> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept { + return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([this, comp = component_name(*this, c)] (data_sink sink) mutable { + return std::make_unique(std::move(sink), sstable_buffer_size, comp); + }); +} + void sstable::open_sstable(const sstring& origin) { _origin = origin; generate_toc(); _storage->open(*this); } -void sstable::write_toc(file_writer w) { +void sstable::write_toc(std::unique_ptr w) { sstlog.debug("Writing TOC file {} ", toc_filename()); - do_write_simple(std::move(w), [&] (version_types v, file_writer& w) { + do_write_simple(*w, [&] (version_types v, file_writer& w) { for (auto&& key : _recognized_components) { // new line character is appended to the end of each component name. auto value = sstable_version_constants::get_component_map(v).at(key) + "\n"; @@ -973,6 +979,8 @@ void sstable::write_toc(file_writer w) { write(v, w, b); } }); + + _components_digests.toc_digest = w->full_checksum(); } void sstable::write_crc(const checksum& c) { @@ -989,6 +997,7 @@ void sstable::write_digest(uint32_t full_checksum) { auto digest = to_sstring(full_checksum); write(v, w, digest); }, buffer_size); + _components_digests.data_digest = full_checksum; } thread_local std::array, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache; @@ -1045,7 +1054,7 @@ future<> sstable::read_simple(T& component) { }); } -void sstable::do_write_simple(file_writer&& writer, +void sstable::do_write_simple(file_writer& writer, noncopyable_function write_component) { write_component(_version, writer); _metadata_size_on_disk += writer.offset(); @@ -1060,7 +1069,7 @@ void sstable::do_write_simple(component_type type, file_output_stream_options options; options.buffer_size = buffer_size; auto w = make_component_file_writer(type, std::move(options)).get(); - do_write_simple(std::move(w), std::move(write_component)); + do_write_simple(w, std::move(write_component)); } template @@ -1070,10 +1079,30 @@ void sstable::write_simple(const T& component) { }, sstable_buffer_size); } +uint32_t sstable::do_write_simple_with_digest(component_type type, + noncopyable_function write_component, unsigned buffer_size) { + auto file_path = filename(type); + sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path); + + file_output_stream_options options; + options.buffer_size = buffer_size; + auto w = make_digests_component_file_writer(type, std::move(options)).get(); + do_write_simple(*w, std::move(write_component)); + return w->full_checksum(); +} + +template +uint32_t sstable::write_simple_with_digest(const T& component) { + return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) { + write(v, w, component); + }, sstable_buffer_size); +} + template future<> sstable::read_simple(sstables::filter& f); template void sstable::write_simple(const sstables::filter& f); template void sstable::write_simple(const sstables::summary_ka&); +template uint32_t sstable::write_simple_with_digest(const sstables::summary_ka&); future<> sstable::read_compression() { // FIXME: If there is no compression, we should expect a CRC file to be present. @@ -1092,7 +1121,8 @@ void sstable::write_compression() { return; } - write_simple(_components->compression); + uint32_t digest = write_simple_with_digest(_components->compression); + _components_digests.compression_digest = digest; } void sstable::validate_partitioner() { @@ -1317,7 +1347,8 @@ future<> sstable::read_partitions_db_footer() { } void sstable::write_statistics() { - write_simple(_components->statistics); + auto digest = write_simple_with_digest(_components->statistics); + _components_digests.statistics_digest = digest; } void sstable::mark_as_being_repaired(const service::session_id& id) { @@ -1342,10 +1373,23 @@ void sstable::rewrite_statistics() { file_output_stream_options options; options.buffer_size = sstable_buffer_size; - auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options), + auto w = make_digests_component_file_writer(component_type::TemporaryStatistics, std::move(options), open_flags::wo | open_flags::create | open_flags::truncate).get(); - write(_version, w, _components->statistics); - w.close(); + write(_version, *w, _components->statistics); + w->close(); + + // When rewriting statistics, we also need to update the scylla component + // because it contains the digest of the statistics component. + if (has_scylla_component()) { + _components_digests.statistics_digest = w->full_checksum(); + _components->scylla_metadata->data.set(components_digests{_components_digests}); + sstlog.debug("Rewriting scylla component of sstable {}", get_filename()); + write_simple(*_components->scylla_metadata); + + // rename() guarantees atomicity when renaming a file into place. + sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryScylla)), fmt::to_string(filename(component_type::Scylla))).get(); + } + // rename() guarantees atomicity when renaming a file into place. sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get(); } @@ -1539,7 +1583,8 @@ void sstable::write_filter() { auto&& bs = f->bits(); auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage()); - write_simple(filter_ref); + uint32_t digest = write_simple_with_digest(filter_ref); + _components_digests.filter_digest = digest; } void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) { @@ -1998,6 +2043,8 @@ sstable::read_scylla_metadata() noexcept { } return read_simple(*_components->scylla_metadata).then([this] { _features = _components->scylla_metadata->get_features(); + _components_digests = _components->scylla_metadata->get_components_digests(); + _components->digest = _components_digests.data_digest; }); }); } @@ -2087,6 +2134,7 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier, sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}}); } _components->scylla_metadata->data.set(std::move(sstable_schema)); + _components->scylla_metadata->data.set(components_digests(_components_digests)); write_simple(*_components->scylla_metadata); } @@ -3075,6 +3123,31 @@ void sstable::set_sstable_level(uint32_t new_level) { s.sstable_level = new_level; } +std::optional sstable::get_component_digest(component_type c) const { + switch (c) { + case component_type::Index: + return _components_digests.index_digest; + case component_type::Summary: + return _components_digests.summary_digest; + case component_type::TOC: + return _components_digests.toc_digest; + case component_type::CompressionInfo: + return _components_digests.compression_digest; + case component_type::Filter: + return _components_digests.filter_digest; + case component_type::Partitions: + return _components_digests.partitions_digest; + case component_type::Rows: + return _components_digests.rows_digest; + case component_type::Data: + return _components_digests.data_digest; + case component_type::Statistics: + return _components_digests.statistics_digest; + default: + return std::nullopt; + } +} + future<> sstable::mutate_sstable_level(uint32_t new_level) { if (!has_component(component_type::Statistics)) { return make_ready_future<>(); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index e884a64ba6..2660648b90 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -9,6 +9,7 @@ #pragma once +#include "sstables/writer.hh" #include "version.hh" #include "shared_sstable.hh" #include "open_info.hh" @@ -627,6 +628,8 @@ private: // Total memory reclaimed so far from this sstable size_t _total_memory_reclaimed{0}; bool _unlinked{false}; + + components_digests _components_digests; public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } @@ -647,12 +650,18 @@ private: template void write_simple(const T& comp); - void do_write_simple(file_writer&& writer, + void do_write_simple(file_writer& writer, noncopyable_function write_component); void do_write_simple(component_type type, noncopyable_function write_component, unsigned buffer_size); + template + uint32_t write_simple_with_digest(const T& comp); + uint32_t do_write_simple_with_digest(component_type type, + noncopyable_function write_component, + unsigned buffer_size); + void write_crc(const checksum& c); void write_digest(uint32_t full_checksum); @@ -663,6 +672,9 @@ private: future make_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept; + future> make_digests_component_file_writer(component_type c, file_output_stream_options options, + open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept; + void generate_toc(); void open_sstable(const sstring& origin); @@ -693,7 +705,8 @@ private: future<> read_summary() noexcept; void write_summary() { - write_simple(_components->summary); + uint32_t digest = write_simple_with_digest(_components->summary); + _components_digests.summary_digest = digest; } // To be called when we try to load an SSTable that lacks a Summary. Could @@ -823,7 +836,7 @@ private: future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept; // runs in async context (called from storage::open) - void write_toc(file_writer w); + void write_toc(std::unique_ptr w); static future read_digest_from_file(file f); static future> read_checksum_from_file(file f); public: @@ -1013,6 +1026,12 @@ public: return _components->digest; } + components_digests& get_components_digests() { + return _components_digests; + } + + std::optional get_component_digest(component_type c) const; + // Gets ratio of droppable tombstone. A tombstone is considered droppable here // for cells and tombstones expired before the time point "GC before", which // is the point before which expiring data can be purged. diff --git a/sstables/storage.cc b/sstables/storage.cc index c55ae8f948..fb25418133 100644 --- a/sstables/storage.cc +++ b/sstables/storage.cc @@ -204,13 +204,13 @@ void filesystem_storage::open(sstable& sst) { open_flags::create | open_flags::exclusive, options).get(); - auto w = file_writer(output_stream(std::move(sink)), component_name(sst, component_type::TemporaryTOC)); + auto w = std::make_unique(std::move(sink), sst.sstable_buffer_size, component_name(sst, component_type::TemporaryTOC)); bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get(); if (toc_exists) { // TOC will exist at this point if write_components() was called with // the generation of a sstable that exists. - w.close(); + w->close(); remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get(); throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name())); } @@ -670,15 +670,10 @@ void object_storage_base::open(sstable& sst) { sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get(); memory_data_sink_buffers bufs; - sst.write_toc( - file_writer( - output_stream( - data_sink( - std::make_unique(bufs) - ) - ) - ) - ); + auto out = data_sink(std::make_unique(bufs)); + auto w = std::make_unique(std::move(out), sst.sstable_buffer_size, component_name(sst, component_type::TOC)); + + sst.write_toc(std::move(w)); put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get(); }