sstables: Check if digest component exists

Extend `read_digest()` to first check if the digest component exists
before attempting to load it from disk.

Make `validate_checksums()` throw an error if the component does not
exist to preserve its current behavior.

Signed-off-by: Nikos Dragazis <nikolaos.dragazis@scylladb.com>
This commit is contained in:
Nikos Dragazis
2024-09-14 17:08:50 +03:00
parent 7e738bcd2d
commit 347f5ee166
2 changed files with 13 additions and 7 deletions

View File

@@ -2601,10 +2601,13 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
co_return valid;
}
future<uint32_t> sstable::read_digest() {
future<std::optional<uint32_t>> sstable::read_digest() {
if (_components->digest) {
co_return *_components->digest;
}
if (!has_component(component_type::Digest)) {
co_return std::nullopt;
}
sstring digest_str;
co_await do_read_simple(component_type::Digest, [&] (version_types v, file digest_file) -> future<> {
@@ -2626,7 +2629,7 @@ future<uint32_t> sstable::read_digest() {
});
_components->digest = boost::lexical_cast<uint32_t>(digest_str);
co_return *_components->digest;
co_return _components->digest;
}
future<lw_shared_ptr<checksum>> sstable::read_checksum() {
@@ -2672,6 +2675,9 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit) {
const auto digest = co_await sst->read_digest();
if (!digest) {
throw std::runtime_error(seastar::format("No digest available for SSTable: {}", sst->get_filename()));
}
auto data_stream = sst->data_stream(0, sst->ondisk_data_size(), permit, nullptr, nullptr, sstable::raw_stream::yes);
@@ -2682,9 +2688,9 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
try {
if (sst->get_compression()) {
if (sst->get_version() >= sstable_version_types::mc) {
valid = co_await do_validate_compressed<crc32_utils>(data_stream, sst->get_compression(), true, digest);
valid = co_await do_validate_compressed<crc32_utils>(data_stream, sst->get_compression(), true, *digest);
} else {
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, digest);
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, *digest);
}
} else {
auto checksum = co_await sst->read_checksum();
@@ -2692,9 +2698,9 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
sstlog.warn("No checksums available for SSTable: {}", sst->get_filename());
ret = validate_checksums_result::no_checksum;
} else if (sst->get_version() >= sstable_version_types::mc) {
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, *checksum, digest);
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, *checksum, *digest);
} else {
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, *checksum, digest);
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, *checksum, *digest);
}
}
} catch (...) {

View File

@@ -1017,7 +1017,7 @@ public:
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
future<uint32_t> read_digest();
future<std::optional<uint32_t>> read_digest();
future<lw_shared_ptr<checksum>> read_checksum();
};