mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 11:10:40 +00:00
sstables: store digest of all sstable components in scylla metadata
This change replaces plain file_writer with crc32_digest_file_writer for all SSTable components that should be checksummed. The resulting component digests are stored in the sstable structure and later persisted to disk as part of the Scylla metadata component during writer::consume_end_of_stream.
This commit is contained in:
@@ -948,17 +948,16 @@ void writer::init_file_writers() {
|
||||
_sst._schema->get_compressor_params(),
|
||||
std::move(compressor)), _sst.get_filename());
|
||||
}
|
||||
|
||||
if (_sst.has_component(component_type::Index)) {
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Index).get();
|
||||
_index_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), _sst.index_filename());
|
||||
_index_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, _sst.index_filename());
|
||||
}
|
||||
if (_sst.has_component(component_type::Partitions) && _sst.has_component(component_type::Rows)) {
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Rows).get();
|
||||
_rows_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Rows));
|
||||
_rows_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Rows));
|
||||
_bti_row_index_writer = trie::bti_row_index_writer(*_rows_writer);
|
||||
out = _sst._storage->make_data_or_index_sink(_sst, component_type::Partitions).get();
|
||||
_partitions_writer = std::make_unique<file_writer>(output_stream<char>(std::move(out)), component_name(_sst, component_type::Partitions));
|
||||
_partitions_writer = std::make_unique<crc32_digest_file_writer>(std::move(out), _sst.sstable_buffer_size, component_name(_sst, component_type::Partitions));
|
||||
_bti_partition_index_writer = trie::bti_partition_index_writer(*_partitions_writer);
|
||||
}
|
||||
if (_delayed_filter) {
|
||||
@@ -988,7 +987,9 @@ void writer::close_data_writer() {
|
||||
|
||||
void writer::close_index_writer() {
|
||||
if (_index_writer) {
|
||||
close_writer(_index_writer);
|
||||
auto writer = close_writer(_index_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().index_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -998,7 +999,9 @@ void writer::close_partitions_writer() {
|
||||
_sst.get_version(),
|
||||
_first_key.value(),
|
||||
_last_key.value());
|
||||
close_writer(_partitions_writer);
|
||||
auto writer = close_writer(_partitions_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().partitions_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1011,7 +1014,9 @@ void writer::close_rows_writer() {
|
||||
// upload to be a no-op, which breaks some assumptions).
|
||||
uint32_t garbage = seastar::cpu_to_be(0x13371337);
|
||||
_rows_writer->write(reinterpret_cast<const char*>(&garbage), sizeof(garbage));
|
||||
close_writer(_rows_writer);
|
||||
auto writer = close_writer(_rows_writer);
|
||||
auto chksum_wr = static_cast<crc32_digest_file_writer*>(writer.get());
|
||||
_sst.get_components_digests().rows_digest = chksum_wr->full_checksum();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -956,16 +956,22 @@ future<file_writer> sstable::make_component_file_writer(component_type c, file_o
|
||||
});
|
||||
}
|
||||
|
||||
future<std::unique_ptr<crc32_digest_file_writer>> sstable::make_digests_component_file_writer(component_type c, file_output_stream_options options, open_flags oflags) noexcept {
|
||||
return _storage->make_component_sink(*this, c, oflags, std::move(options)).then([this, comp = component_name(*this, c)] (data_sink sink) mutable {
|
||||
return std::make_unique<crc32_digest_file_writer>(std::move(sink), sstable_buffer_size, comp);
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::open_sstable(const sstring& origin) {
|
||||
_origin = origin;
|
||||
generate_toc();
|
||||
_storage->open(*this);
|
||||
}
|
||||
|
||||
void sstable::write_toc(file_writer w) {
|
||||
void sstable::write_toc(std::unique_ptr<crc32_digest_file_writer> w) {
|
||||
sstlog.debug("Writing TOC file {} ", toc_filename());
|
||||
|
||||
do_write_simple(std::move(w), [&] (version_types v, file_writer& w) {
|
||||
do_write_simple(*w, [&] (version_types v, file_writer& w) {
|
||||
for (auto&& key : _recognized_components) {
|
||||
// new line character is appended to the end of each component name.
|
||||
auto value = sstable_version_constants::get_component_map(v).at(key) + "\n";
|
||||
@@ -973,6 +979,8 @@ void sstable::write_toc(file_writer w) {
|
||||
write(v, w, b);
|
||||
}
|
||||
});
|
||||
|
||||
_components_digests.toc_digest = w->full_checksum();
|
||||
}
|
||||
|
||||
void sstable::write_crc(const checksum& c) {
|
||||
@@ -989,6 +997,7 @@ void sstable::write_digest(uint32_t full_checksum) {
|
||||
auto digest = to_sstring<bytes>(full_checksum);
|
||||
write(v, w, digest);
|
||||
}, buffer_size);
|
||||
_components_digests.data_digest = full_checksum;
|
||||
}
|
||||
|
||||
thread_local std::array<std::vector<int>, downsampling::BASE_SAMPLING_LEVEL> downsampling::_sample_pattern_cache;
|
||||
@@ -1045,7 +1054,7 @@ future<> sstable::read_simple(T& component) {
|
||||
});
|
||||
}
|
||||
|
||||
void sstable::do_write_simple(file_writer&& writer,
|
||||
void sstable::do_write_simple(file_writer& writer,
|
||||
noncopyable_function<void (version_types, file_writer&)> write_component) {
|
||||
write_component(_version, writer);
|
||||
_metadata_size_on_disk += writer.offset();
|
||||
@@ -1060,7 +1069,7 @@ void sstable::do_write_simple(component_type type,
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = buffer_size;
|
||||
auto w = make_component_file_writer(type, std::move(options)).get();
|
||||
do_write_simple(std::move(w), std::move(write_component));
|
||||
do_write_simple(w, std::move(write_component));
|
||||
}
|
||||
|
||||
template <component_type Type, typename T>
|
||||
@@ -1070,10 +1079,30 @@ void sstable::write_simple(const T& component) {
|
||||
}, sstable_buffer_size);
|
||||
}
|
||||
|
||||
uint32_t sstable::do_write_simple_with_digest(component_type type,
|
||||
noncopyable_function<void (version_types version, file_writer& writer)> write_component, unsigned buffer_size) {
|
||||
auto file_path = filename(type);
|
||||
sstlog.debug("Writing {} file {}", sstable_version_constants::get_component_map(_version).at(type), file_path);
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = buffer_size;
|
||||
auto w = make_digests_component_file_writer(type, std::move(options)).get();
|
||||
do_write_simple(*w, std::move(write_component));
|
||||
return w->full_checksum();
|
||||
}
|
||||
|
||||
template <component_type Type, typename T>
|
||||
uint32_t sstable::write_simple_with_digest(const T& component) {
|
||||
return do_write_simple_with_digest(Type, [&component] (version_types v, file_writer& w) {
|
||||
write(v, w, component);
|
||||
}, sstable_buffer_size);
|
||||
}
|
||||
|
||||
template future<> sstable::read_simple<component_type::Filter>(sstables::filter& f);
|
||||
template void sstable::write_simple<component_type::Filter>(const sstables::filter& f);
|
||||
|
||||
template void sstable::write_simple<component_type::Summary>(const sstables::summary_ka&);
|
||||
template uint32_t sstable::write_simple_with_digest<component_type::Summary>(const sstables::summary_ka&);
|
||||
|
||||
future<> sstable::read_compression() {
|
||||
// FIXME: If there is no compression, we should expect a CRC file to be present.
|
||||
@@ -1092,7 +1121,8 @@ void sstable::write_compression() {
|
||||
return;
|
||||
}
|
||||
|
||||
write_simple<component_type::CompressionInfo>(_components->compression);
|
||||
uint32_t digest = write_simple_with_digest<component_type::CompressionInfo>(_components->compression);
|
||||
_components_digests.compression_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::validate_partitioner() {
|
||||
@@ -1317,7 +1347,8 @@ future<> sstable::read_partitions_db_footer() {
|
||||
}
|
||||
|
||||
void sstable::write_statistics() {
|
||||
write_simple<component_type::Statistics>(_components->statistics);
|
||||
auto digest = write_simple_with_digest<component_type::Statistics>(_components->statistics);
|
||||
_components_digests.statistics_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::mark_as_being_repaired(const service::session_id& id) {
|
||||
@@ -1342,10 +1373,23 @@ void sstable::rewrite_statistics() {
|
||||
|
||||
file_output_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
auto w = make_component_file_writer(component_type::TemporaryStatistics, std::move(options),
|
||||
auto w = make_digests_component_file_writer(component_type::TemporaryStatistics, std::move(options),
|
||||
open_flags::wo | open_flags::create | open_flags::truncate).get();
|
||||
write(_version, w, _components->statistics);
|
||||
w.close();
|
||||
write(_version, *w, _components->statistics);
|
||||
w->close();
|
||||
|
||||
// When rewriting statistics, we also need to update the scylla component
|
||||
// because it contains the digest of the statistics component.
|
||||
if (has_scylla_component()) {
|
||||
_components_digests.statistics_digest = w->full_checksum();
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests{_components_digests});
|
||||
sstlog.debug("Rewriting scylla component of sstable {}", get_filename());
|
||||
write_simple<component_type::TemporaryScylla>(*_components->scylla_metadata);
|
||||
|
||||
// rename() guarantees atomicity when renaming a file into place.
|
||||
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryScylla)), fmt::to_string(filename(component_type::Scylla))).get();
|
||||
}
|
||||
|
||||
// rename() guarantees atomicity when renaming a file into place.
|
||||
sstable_write_io_check(rename_file, fmt::to_string(filename(component_type::TemporaryStatistics)), fmt::to_string(filename(component_type::Statistics))).get();
|
||||
}
|
||||
@@ -1539,7 +1583,8 @@ void sstable::write_filter() {
|
||||
|
||||
auto&& bs = f->bits();
|
||||
auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage());
|
||||
write_simple<component_type::Filter>(filter_ref);
|
||||
uint32_t digest = write_simple_with_digest<component_type::Filter>(filter_ref);
|
||||
_components_digests.filter_digest = digest;
|
||||
}
|
||||
|
||||
void sstable::maybe_rebuild_filter_from_index(uint64_t num_partitions) {
|
||||
@@ -1998,6 +2043,8 @@ sstable::read_scylla_metadata() noexcept {
|
||||
}
|
||||
return read_simple<component_type::Scylla>(*_components->scylla_metadata).then([this] {
|
||||
_features = _components->scylla_metadata->get_features();
|
||||
_components_digests = _components->scylla_metadata->get_components_digests();
|
||||
_components->digest = _components_digests.data_digest;
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -2087,6 +2134,7 @@ sstable::write_scylla_metadata(shard_id shard, struct run_identifier identifier,
|
||||
sstable_schema.columns.elements.push_back(sstable_column_description{to_sstable_column_kind(col.kind), {col.name()}, {to_bytes(col.type->name())}});
|
||||
}
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::Schema>(std::move(sstable_schema));
|
||||
_components->scylla_metadata->data.set<scylla_metadata_type::ComponentsDigests>(components_digests(_components_digests));
|
||||
|
||||
write_simple<component_type::Scylla>(*_components->scylla_metadata);
|
||||
}
|
||||
@@ -3075,6 +3123,31 @@ void sstable::set_sstable_level(uint32_t new_level) {
|
||||
s.sstable_level = new_level;
|
||||
}
|
||||
|
||||
std::optional<uint32_t> sstable::get_component_digest(component_type c) const {
|
||||
switch (c) {
|
||||
case component_type::Index:
|
||||
return _components_digests.index_digest;
|
||||
case component_type::Summary:
|
||||
return _components_digests.summary_digest;
|
||||
case component_type::TOC:
|
||||
return _components_digests.toc_digest;
|
||||
case component_type::CompressionInfo:
|
||||
return _components_digests.compression_digest;
|
||||
case component_type::Filter:
|
||||
return _components_digests.filter_digest;
|
||||
case component_type::Partitions:
|
||||
return _components_digests.partitions_digest;
|
||||
case component_type::Rows:
|
||||
return _components_digests.rows_digest;
|
||||
case component_type::Data:
|
||||
return _components_digests.data_digest;
|
||||
case component_type::Statistics:
|
||||
return _components_digests.statistics_digest;
|
||||
default:
|
||||
return std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
future<> sstable::mutate_sstable_level(uint32_t new_level) {
|
||||
if (!has_component(component_type::Statistics)) {
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "sstables/writer.hh"
|
||||
#include "version.hh"
|
||||
#include "shared_sstable.hh"
|
||||
#include "open_info.hh"
|
||||
@@ -627,6 +628,8 @@ private:
|
||||
// Total memory reclaimed so far from this sstable
|
||||
size_t _total_memory_reclaimed{0};
|
||||
bool _unlinked{false};
|
||||
|
||||
components_digests _components_digests;
|
||||
public:
|
||||
bool has_component(component_type f) const;
|
||||
sstables_manager& manager() { return _manager; }
|
||||
@@ -647,12 +650,18 @@ private:
|
||||
|
||||
template <component_type Type, typename T>
|
||||
void write_simple(const T& comp);
|
||||
void do_write_simple(file_writer&& writer,
|
||||
void do_write_simple(file_writer& writer,
|
||||
noncopyable_function<void (version_types, file_writer&)> write_component);
|
||||
void do_write_simple(component_type type,
|
||||
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
|
||||
unsigned buffer_size);
|
||||
|
||||
template <component_type Type, typename T>
|
||||
uint32_t write_simple_with_digest(const T& comp);
|
||||
uint32_t do_write_simple_with_digest(component_type type,
|
||||
noncopyable_function<void (version_types version, file_writer& writer)> write_component,
|
||||
unsigned buffer_size);
|
||||
|
||||
void write_crc(const checksum& c);
|
||||
void write_digest(uint32_t full_checksum);
|
||||
|
||||
@@ -663,6 +672,9 @@ private:
|
||||
future<file_writer> make_component_file_writer(component_type c, file_output_stream_options options,
|
||||
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
|
||||
|
||||
future<std::unique_ptr<crc32_digest_file_writer>> make_digests_component_file_writer(component_type c, file_output_stream_options options,
|
||||
open_flags oflags = open_flags::wo | open_flags::create | open_flags::exclusive) noexcept;
|
||||
|
||||
void generate_toc();
|
||||
void open_sstable(const sstring& origin);
|
||||
|
||||
@@ -693,7 +705,8 @@ private:
|
||||
future<> read_summary() noexcept;
|
||||
|
||||
void write_summary() {
|
||||
write_simple<component_type::Summary>(_components->summary);
|
||||
uint32_t digest = write_simple_with_digest<component_type::Summary>(_components->summary);
|
||||
_components_digests.summary_digest = digest;
|
||||
}
|
||||
|
||||
// To be called when we try to load an SSTable that lacks a Summary. Could
|
||||
@@ -823,7 +836,7 @@ private:
|
||||
|
||||
future<> open_or_create_data(open_flags oflags, file_open_options options = {}) noexcept;
|
||||
// runs in async context (called from storage::open)
|
||||
void write_toc(file_writer w);
|
||||
void write_toc(std::unique_ptr<crc32_digest_file_writer> w);
|
||||
static future<uint32_t> read_digest_from_file(file f);
|
||||
static future<lw_shared_ptr<checksum>> read_checksum_from_file(file f);
|
||||
public:
|
||||
@@ -1013,6 +1026,12 @@ public:
|
||||
return _components->digest;
|
||||
}
|
||||
|
||||
components_digests& get_components_digests() {
|
||||
return _components_digests;
|
||||
}
|
||||
|
||||
std::optional<uint32_t> get_component_digest(component_type c) const;
|
||||
|
||||
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
|
||||
// for cells and tombstones expired before the time point "GC before", which
|
||||
// is the point before which expiring data can be purged.
|
||||
|
||||
@@ -204,13 +204,13 @@ void filesystem_storage::open(sstable& sst) {
|
||||
open_flags::create |
|
||||
open_flags::exclusive,
|
||||
options).get();
|
||||
auto w = file_writer(output_stream<char>(std::move(sink)), component_name(sst, component_type::TemporaryTOC));
|
||||
auto w = std::make_unique<crc32_digest_file_writer>(std::move(sink), sst.sstable_buffer_size, component_name(sst, component_type::TemporaryTOC));
|
||||
|
||||
bool toc_exists = file_exists(fmt::to_string(sst.filename(component_type::TOC))).get();
|
||||
if (toc_exists) {
|
||||
// TOC will exist at this point if write_components() was called with
|
||||
// the generation of a sstable that exists.
|
||||
w.close();
|
||||
w->close();
|
||||
remove_file(fmt::to_string(sst.filename(component_type::TemporaryTOC))).get();
|
||||
throw std::runtime_error(format("SSTable write failed due to existence of TOC file for generation {} of {}.{}", sst._generation, sst._schema->ks_name(), sst._schema->cf_name()));
|
||||
}
|
||||
@@ -670,15 +670,10 @@ void object_storage_base::open(sstable& sst) {
|
||||
sst.manager().sstables_registry().create_entry(owner(), status_creating, sst._state, std::move(desc)).get();
|
||||
|
||||
memory_data_sink_buffers bufs;
|
||||
sst.write_toc(
|
||||
file_writer(
|
||||
output_stream<char>(
|
||||
data_sink(
|
||||
std::make_unique<memory_data_sink>(bufs)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
auto out = data_sink(std::make_unique<memory_data_sink>(bufs));
|
||||
auto w = std::make_unique<crc32_digest_file_writer>(std::move(out), sst.sstable_buffer_size, component_name(sst, component_type::TOC));
|
||||
|
||||
sst.write_toc(std::move(w));
|
||||
put_object(make_object_name(sst, component_type::TOC), std::move(bufs)).get();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user