Merge 'sstables: purge SCYLLA_ASSERT from the sstable read/parse paths' from Botond Dénes

Introduce `sstables::parse_assert()`, to replace `SCYLLA_ASSERT()` on the read/parse path. SSTables can get corrupt for various reasons, some outside of the database's control. A bad SSTable should not bring down the database, the parsing should simply be aborted, with as much information printed as possible for the investigation of the nature of the corruption. The newly introduced `parse_assert()` uses `on_internal_error()` under the hood, which prints a backtrace and optionally allows for aborting when on the error, to generate a coredump.

Fixes https://github.com/scylladb/scylladb/issues/20845

We just hit another case of `SCYLLA_ASSERT()` triggering due to corrupt sstables bringing down nodes in the field, should be backported to all releases, so we don't hit this in the future

Closes scylladb/scylladb#24534

* github.com:scylladb/scylladb:
  sstables: replace SCYLLA_ASSERT() with parse_assert() on the read path
  sstables/exceptions: introduce parse_assert()
This commit is contained in:
Avi Kivity
2025-06-25 14:55:37 +03:00
committed by Piotr Dulikowski
14 changed files with 80 additions and 60 deletions

View File

@@ -17,9 +17,9 @@
#include <seastar/net/byteorder.hh>
#include "bytes.hh"
#include "reader_permit.hh"
#include "utils/assert.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include "utils/small_vector.hh"
#include "exceptions.hh"
#include <variant>
@@ -324,7 +324,7 @@ private:
// Reads bytes belonging to an integer of size len. Returns true
// if a full integer is now available.
bool process_int(Buffer& data, unsigned len) {
SCYLLA_ASSERT(_pos < len);
sstables::parse_assert(_pos < len);
auto n = std::min((size_t)(len - _pos), data.size());
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
data.trim_front(n);
@@ -557,7 +557,7 @@ public:
while (data || (!primitive_consumer::active() && non_consuming())) {
// The primitive_consumer must finish before the enclosing state machine can continue.
if (__builtin_expect(primitive_consumer::consume(data) == read_status::waiting, false)) {
SCYLLA_ASSERT(data.size() == 0);
sstables::parse_assert(data.size() == 0);
return proceed::yes;
}
auto ret = state_processor().process_state(data);
@@ -607,7 +607,7 @@ public:
}, [this, &data, orig_data_size](skip_bytes skip) {
// we only expect skip_bytes to be used if reader needs to skip beyond the provided buffer
// otherwise it should just trim_front and proceed as usual
SCYLLA_ASSERT(data.size() == 0);
sstables::parse_assert(data.size() == 0);
_remain -= orig_data_size;
if (skip.get_value() >= _remain) {
skip_bytes skip_remaining(_remain);
@@ -625,11 +625,11 @@ public:
}
future<> fast_forward_to(size_t begin, size_t end) {
SCYLLA_ASSERT(begin >= _stream_position.position);
sstables::parse_assert(begin >= _stream_position.position);
auto n = begin - _stream_position.position;
_stream_position.position = begin;
SCYLLA_ASSERT(end >= _stream_position.position);
sstables::parse_assert(end >= _stream_position.position);
_remain = end - _stream_position.position;
primitive_consumer::reset();

View File

@@ -28,6 +28,19 @@ public:
}
};
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename);
// Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables.
// SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background
// radiation hitting the disk the wrong way. Either way a corrupt SSTable should not bring down the
// whole server. This method will call on_internal_error() if the condition is false.
// The exception will include a complete backtrace, so no need to add call-site identifiers to the message.
inline void parse_assert(bool condition, std::optional<component_name> filename = {}, const char* message = nullptr) {
if (!condition) [[unlikely]] {
on_parse_error(message, filename);
}
}
struct bufsize_mismatch_exception : malformed_sstable_exception {
bufsize_mismatch_exception(size_t size, size_t expected) :
malformed_sstable_exception(format("Buffer improperly sized to hold requested data. Got: {:d}. Expected: {:d}", size, expected))

View File

@@ -7,7 +7,6 @@
*/
#pragma once
#include "utils/assert.hh"
#include "sstables.hh"
#include "consumer.hh"
#include "downsampling.hh"
@@ -507,7 +506,7 @@ private:
// Must be called for non-decreasing summary_idx.
future<> advance_to_page(index_bound& bound, uint64_t summary_idx) {
sstlog.trace("index {}: advance_to_page({}), bound {}", fmt::ptr(this), summary_idx, fmt::ptr(&bound));
SCYLLA_ASSERT(!bound.current_list || bound.current_summary_idx <= summary_idx);
parse_assert(!bound.current_list || bound.current_summary_idx <= summary_idx, _sstable->index_filename());
if (bound.current_list && bound.current_summary_idx == summary_idx) {
sstlog.trace("index {}: same page", fmt::ptr(this));
return make_ready_future<>();
@@ -607,7 +606,7 @@ private:
// Valid if partition_data_ready(bound)
index_entry& current_partition_entry(index_bound& bound) {
SCYLLA_ASSERT(bound.current_list);
parse_assert(bool(bound.current_list), _sstable->index_filename());
return *bound.current_list->_entries[bound.current_index_idx];
}
@@ -672,7 +671,7 @@ private:
// is no G in that bucket so we read the following one to get the
// position (see the advance_to_page() call below). After we've got it, it's time to
// get J] position. Again, summary points us to the first bucket and we
// hit an SCYLLA_ASSERT since the reader is already at the second bucket and we
// hit an parse_assert since the reader is already at the second bucket and we
// cannot go backward.
// The solution is this condition above. If our lookup requires reading
// the previous bucket we assume that the entry doesn't exist and return
@@ -726,7 +725,7 @@ public:
// So need to make sure first that it is read
if (!partition_data_ready(_lower_bound)) {
return read_partition_data().then([this, pos] {
SCYLLA_ASSERT(partition_data_ready());
parse_assert(partition_data_ready(), _sstable->index_filename());
return advance_upper_past(pos);
});
}
@@ -804,12 +803,12 @@ public:
// Ensures that partition_data_ready() returns true.
// Can be called only when !eof()
future<> read_partition_data() {
SCYLLA_ASSERT(!eof());
parse_assert(!eof(), _sstable->index_filename());
if (partition_data_ready(_lower_bound)) {
return make_ready_future<>();
}
// The only case when _current_list may be missing is when the cursor is at the beginning
SCYLLA_ASSERT(_lower_bound.current_summary_idx == 0);
parse_assert(_lower_bound.current_summary_idx == 0, _sstable->index_filename());
return advance_to_page(_lower_bound, 0);
}
@@ -912,7 +911,7 @@ public:
if (!partition_data_ready()) {
return read_partition_data().then([this, pos] {
sstlog.trace("index {}: page done", fmt::ptr(this));
SCYLLA_ASSERT(partition_data_ready(_lower_bound));
parse_assert(partition_data_ready(_lower_bound), _sstable->index_filename());
return advance_to(pos);
});
}
@@ -993,7 +992,7 @@ public:
// so need to make sure first that the lower bound partition data is in memory.
if (!partition_data_ready(_lower_bound)) {
return read_partition_data().then([this, pos] {
SCYLLA_ASSERT(partition_data_ready());
parse_assert(partition_data_ready(), _sstable->index_filename());
return advance_reverse(pos);
});
}
@@ -1038,7 +1037,7 @@ public:
//
// Preconditions: sstable version >= mc, partition_data_ready().
future<std::optional<uint64_t>> last_block_offset() {
SCYLLA_ASSERT(partition_data_ready());
parse_assert(partition_data_ready(), _sstable->index_filename());
auto cur = current_clustered_cursor();
if (!cur) {

View File

@@ -15,7 +15,6 @@
#include "clustering_key_filter.hh"
#include "clustering_ranges_walker.hh"
#include "concrete_types.hh"
#include "utils/assert.hh"
#include "utils/to_string.hh"
#include "utils/value_or_reference.hh"
@@ -268,7 +267,7 @@ private:
std::optional<collection_mutation> _pending_collection = {};
collection_mutation& pending_collection(const column_definition *cdef) {
SCYLLA_ASSERT(cdef->is_multi_cell() && "frozen set should behave like a cell\n");
parse_assert(cdef->is_multi_cell(), _sst->get_filename(), "frozen set should behave like a cell");
if (!_pending_collection || _pending_collection->is_new_collection(cdef)) {
flush_pending_collection(*_schema);
_pending_collection = collection_mutation(cdef);
@@ -436,7 +435,7 @@ public:
flush_pending_collection(*_schema);
// If _ready is already set we have a bug: get_mutation_fragment()
// was not called, and below we will lose one clustering row!
SCYLLA_ASSERT(!_ready);
parse_assert(!_ready, _sst->get_filename());
if (!_skip_in_progress) {
_ready = std::exchange(_in_progress, { });
return push_ready_fragments_with_ready_set();
@@ -1124,7 +1123,7 @@ public:
_state = state::ATOM_START;
break;
default:
SCYLLA_ASSERT(0);
on_parse_error(format("Invalid indexable element {}", static_cast<std::underlying_type<indexable_element>::type>(el)), _sst->get_filename());
}
_consumer.reset(el);
_gen = do_process_state();
@@ -1216,7 +1215,7 @@ private:
_read_enabled = false;
return make_ready_future<>();
}
SCYLLA_ASSERT(_index_reader->element_kind() == indexable_element::partition);
parse_assert(_index_reader->element_kind() == indexable_element::partition, _sst->get_filename());
return skip_to(_index_reader->element_kind(), start).then([this] {
_sst->get_stats().on_partition_seek();
});
@@ -1296,7 +1295,7 @@ private:
if (!pos || pos->is_before_all_fragments(*_schema)) {
return make_ready_future<>();
}
SCYLLA_ASSERT (_current_partition_key);
parse_assert(bool(_current_partition_key), _sst->get_filename());
return [this] {
if (!_index_in_current_partition) {
_index_in_current_partition = true;
@@ -1340,7 +1339,7 @@ private:
}
auto [begin, end] = _index_reader->data_file_positions();
SCYLLA_ASSERT(end);
parse_assert(bool(end), _sst->get_filename());
if (_single_partition_read) {
_read_enabled = (begin != *end);
@@ -1389,11 +1388,11 @@ public:
_partition_finished = true;
_before_partition = true;
_end_of_stream = false;
SCYLLA_ASSERT(_index_reader);
parse_assert(bool(_index_reader), _sst->get_filename());
auto f1 = _index_reader->advance_to(pr);
return f1.then([this] {
auto [start, end] = _index_reader->data_file_positions();
SCYLLA_ASSERT(end);
parse_assert(bool(end), _sst->get_filename());
if (start != *end) {
_read_enabled = true;
_index_in_current_partition = true;

View File

@@ -8,7 +8,7 @@
#pragma once
#include "utils/assert.hh"
#include "exceptions.hh"
#include "mutation/mutation_fragment.hh"
#include "clustering_ranges_walker.hh"
#include "clustering_key_filter.hh"
@@ -130,7 +130,7 @@ public:
* query ranges tracked by _walker.
*/
std::optional<position_in_partition_view> fast_forward_to(position_range r) {
SCYLLA_ASSERT(_fwd);
parse_assert(bool(_fwd));
_fwd_end = std::move(r).end();
_out_of_range = !_walker.advance_to(r.start(), _fwd_end);

View File

@@ -12,7 +12,6 @@
#include "sstables/column_translation.hh"
#include "parsers.hh"
#include "schema/schema.hh"
#include "utils/assert.hh"
#include "utils/cached_file.hh"
#include "utils/to_string.hh"
@@ -107,13 +106,13 @@ public:
const schema& _s;
bool operator()(const promoted_index_block& lhs, position_in_partition_view rhs) const {
SCYLLA_ASSERT(lhs.start);
parse_assert(bool(lhs.start));
position_in_partition::less_compare less(_s);
return less(*lhs.start, rhs);
}
bool operator()(position_in_partition_view lhs, const promoted_index_block& rhs) const {
SCYLLA_ASSERT(rhs.start);
parse_assert(bool(rhs.start));
position_in_partition::less_compare less(_s);
return less(lhs, *rhs.start);
}

View File

@@ -8,7 +8,6 @@
#pragma once
#include "utils/assert.hh"
#include "sstables/consumer.hh"
#include "sstables/types.hh"
#include "sstables/column_translation.hh"
@@ -290,7 +289,7 @@ public:
}
[[fallthrough]];
case state::END_OPEN_MARKER_FLAG:
SCYLLA_ASSERT(_primitive._i64 + width_base > 0);
parse_assert(_primitive._i64 + width_base > 0);
_width = (_primitive._i64 + width_base);
if (_primitive.read_8(data) != read_status::ready) {
_state = state::END_OPEN_MARKER_LOCAL_DELETION_TIME;

View File

@@ -14,7 +14,6 @@
#include "sstables/m_format_read_helpers.hh"
#include "sstables/sstable_mutation_reader.hh"
#include "sstables/processing_result_generator.hh"
#include "utils/assert.hh"
#include "utils/to_string.hh"
#include "utils/value_or_reference.hh"
@@ -502,7 +501,7 @@ public:
return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone);
}
default:
SCYLLA_ASSERT(false && "Invalid boundary type");
on_parse_error(format("Invalid boundary type", static_cast<std::underlying_type<sstables::bound_kind_m>::type>(kind)), _sst->get_filename());
}
}
@@ -1306,7 +1305,7 @@ public:
" partition range: {}", pr));
}
// FIXME: if only the defaults were better...
//SCYLLA_ASSERT(fwd_mr == mutation_reader::forwarding::no);
//parse_assert(fwd_mr == mutation_reader::forwarding::no);
}
}
@@ -1351,7 +1350,7 @@ private:
_read_enabled = false;
return make_ready_future<>();
}
SCYLLA_ASSERT(_index_reader->element_kind() == indexable_element::partition);
parse_assert(_index_reader->element_kind() == indexable_element::partition, _sst->get_filename());
return skip_to(_index_reader->element_kind(), start).then([this] {
_sst->get_stats().on_partition_seek();
});
@@ -1431,7 +1430,7 @@ private:
if (!pos || pos->is_before_all_fragments(*_schema)) {
return make_ready_future<>();
}
SCYLLA_ASSERT (_current_partition_key);
parse_assert(bool(_current_partition_key), _sst->get_filename());
return [this] {
if (!_index_in_current_partition) {
_index_in_current_partition = true;
@@ -1449,7 +1448,7 @@ private:
// The reversing data source will notice the skip and update the data ranges
// from which it prepares the data given to us.
SCYLLA_ASSERT(_reversed_read_sstable_position);
parse_assert(_reversed_read_sstable_position, _sst->get_filename());
auto ip = _index_reader->data_file_positions();
if (ip.end >= *_reversed_read_sstable_position) {
// The reversing data source was already ahead (in reverse - its position was smaller)
@@ -1545,7 +1544,7 @@ private:
}
auto [begin, end] = _index_reader->data_file_positions();
SCYLLA_ASSERT(end);
parse_assert(bool(end), _sst->get_filename());
sstlog.trace("sstable_reader: {}: data file range [{}, {})", fmt::ptr(this), begin, *end);
@@ -1617,11 +1616,11 @@ public:
_partition_finished = true;
_before_partition = true;
_end_of_stream = false;
SCYLLA_ASSERT(_index_reader);
parse_assert(bool(_index_reader), _sst->get_filename());
auto f1 = _index_reader->advance_to(pr);
return f1.then([this] {
auto [start, end] = _index_reader->data_file_positions();
SCYLLA_ASSERT(end);
parse_assert(bool(end), _sst->get_filename());
if (start != *end) {
_read_enabled = true;
_index_in_current_partition = true;
@@ -2070,7 +2069,7 @@ public:
case bound_kind_m::excl_end_incl_start:
return consume_range_tombstone(ecp, bound_kind::incl_start, start_tombstone);
default:
SCYLLA_ASSERT(false && "Invalid boundary type");
on_parse_error(format("Invalid boundary type", static_cast<std::underlying_type_t<bound_kind_m>>(kind)), {});
}
}

View File

@@ -8,7 +8,7 @@
#pragma once
#include "utils/assert.hh"
#include "sstables/exceptions.hh"
#include "clustering_bounds_comparator.hh"
#include <iosfwd>
@@ -63,7 +63,7 @@ inline bool is_start(bound_kind_m kind) {
}
inline bound_kind to_bound_kind(bound_kind_m kind) {
SCYLLA_ASSERT(is_bound_kind(kind));
parse_assert(is_bound_kind(kind));
using underlying_type = std::underlying_type_t<bound_kind_m>;
return bound_kind{static_cast<underlying_type>(kind)};
}
@@ -74,12 +74,12 @@ inline bound_kind_m to_bound_kind_m(bound_kind kind) {
}
inline bound_kind boundary_to_start_bound(bound_kind_m kind) {
SCYLLA_ASSERT(is_boundary_between_adjacent_intervals(kind));
parse_assert(is_boundary_between_adjacent_intervals(kind));
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::excl_start : bound_kind::incl_start;
}
inline bound_kind boundary_to_end_bound(bound_kind_m kind) {
SCYLLA_ASSERT(is_boundary_between_adjacent_intervals(kind));
parse_assert(is_boundary_between_adjacent_intervals(kind));
return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::incl_end : bound_kind::excl_end;
}

View File

@@ -208,7 +208,7 @@ public:
return with_allocator(_region.allocator(), [&] {
auto it_and_flag = _cache.emplace(key, this, key);
entry &cp = *it_and_flag.first;
SCYLLA_ASSERT(it_and_flag.second);
parse_assert(it_and_flag.second);
try {
return share(cp);
} catch (...) {

View File

@@ -8,7 +8,6 @@
#pragma once
#include "utils/assert.hh"
#include "consumer.hh"
#include "column_translation.hh"
#include "sstables/mx/parsers.hh"
@@ -173,7 +172,7 @@ public:
std::visit([this, &data] (auto& ctx) mutable { return process_state(data, ctx); }, _ctx);
if (_mode == consuming_mode::consume_until) {
SCYLLA_ASSERT(_pos);
parse_assert(bool(_pos));
auto cmp_with_start = [this, pos_cmp = promoted_index_block_compare(_s)]
(position_in_partition_view pos, const promoted_index_block& block) -> bool {
return pos_cmp(pos, block.start(_s));

View File

@@ -8,7 +8,7 @@
#pragma once
#include "utils/assert.hh"
#include "exceptions.hh"
#include <memory>
#include <seastar/core/file.hh>
#include <seastar/core/fstream.hh>
@@ -26,7 +26,7 @@ protected:
virtual input_stream<char> open_at(uint64_t pos) = 0;
void set(input_stream<char> in) {
SCYLLA_ASSERT(!_in);
parse_assert(!_in);
_in = std::make_unique<input_stream<char>>(std::move(in));
}

View File

@@ -8,7 +8,6 @@
#pragma once
#include "utils/assert.hh"
#include "sstables/index_entry.hh"
#include "sstables/promoted_index_blocks_reader.hh"
#include "schema/schema.hh"
@@ -97,7 +96,7 @@ private:
// End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la
auto end_pos = prev->end(_s);
position_in_partition_view* open_rt_pos = std::get_if<position_in_partition_view>(&end_pos);
SCYLLA_ASSERT(open_rt_pos);
parse_assert(bool(open_rt_pos));
return skip_info{offset, tombstone(*prev->end_open_marker()), position_in_partition{*open_rt_pos}};
}
}

View File

@@ -34,7 +34,6 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/as_future.hh>
#include "utils/assert.hh"
#include "utils/error_injection.hh"
#include "utils/to_string.hh"
#include "data_dictionary/storage_options.hh"
@@ -106,6 +105,20 @@ thread_local utils::updateable_value<bool> global_cache_index_pages(true);
logging::logger sstlog("sstable");
[[noreturn]] void on_parse_error(sstring message, std::optional<component_name> filename) {
auto make_exception = [&] {
if (message.empty()) {
message = "parse_assert() failed";
}
if (filename) {
return malformed_sstable_exception(message, *filename);
}
return malformed_sstable_exception(message);
};
auto ex = std::make_exception_ptr(make_exception());
on_internal_error(sstlog, std::move(ex));
}
template <typename T>
const char* nullsafe_typename(T* x) noexcept {
try {
@@ -897,7 +910,7 @@ void file_writer::close() {
// to work, because file stream would step on unaligned IO and S3 upload
// stream would send completion message to the server and would lose any
// subsequent write.
SCYLLA_ASSERT(!_closed && "file_writer already closed");
parse_assert(!_closed, _component, "file_writer already closed");
std::exception_ptr ex;
try {
_out.flush().get();
@@ -1329,7 +1342,7 @@ future<> sstable::open_or_create_data(open_flags oflags, file_open_options optio
future<> sstable::open_data(sstable_open_config cfg) noexcept {
co_await open_or_create_data(open_flags::ro);
co_await update_info_for_opened_data(cfg);
SCYLLA_ASSERT(!_shards.empty());
parse_assert(!_shards.empty(), get_filename());
auto* sm = _components->scylla_metadata->data.get<scylla_metadata_type::Sharding, sharding_metadata>();
if (sm) {
// Sharding information uses a lot of memory and once we're doing with this computation we will no longer use it.
@@ -1366,7 +1379,7 @@ future<> sstable::update_info_for_opened_data(sstable_open_config cfg) {
auto size = co_await _index_file.size();
_index_file_size = size;
SCYLLA_ASSERT(!_cached_index_file);
parse_assert(!_cached_index_file, get_filename());
_cached_index_file = seastar::make_shared<cached_file>(_index_file,
_manager.get_cache_tracker().get_index_cached_file_stats(),
_manager.get_cache_tracker().get_lru(),
@@ -1643,7 +1656,7 @@ sstable::load_owner_shards(const dht::sharder& sharder) {
}
void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) {
SCYLLA_ASSERT(expected_partition_count >= 1);
parse_assert(expected_partition_count >= 1);
s.header.min_index_interval = min_index_interval;
s.header.sampling_level = downsampling::BASE_SAMPLING_LEVEL;
@@ -1665,7 +1678,7 @@ future<> seal_summary(summary& s,
s.header.size = s.entries.size();
s.header.size_at_full_sampling = sstable::get_size_at_full_sampling(state.partition_count, s.header.min_index_interval);
SCYLLA_ASSERT(first_key); // assume non-empty sstable
parse_assert(bool(first_key), {}, "attempted to seal summary of empty sstable");
s.first_key.value = first_key->get_bytes();
if (last_key) {
@@ -2238,7 +2251,8 @@ sstring sstable::component_basename(const sstring& ks, const sstring& cf, versio
case sstable::version_types::me:
return v + "-" + g + "-" + f + "-" + component;
}
SCYLLA_ASSERT(0 && "invalid version");
on_internal_error(sstlog, seastar::format("invalid version {} for sstable: table={}.{}, generation={}, format={}, component={}",
static_cast<std::underlying_type<version_types>::type>(version), ks, cf, g, f, component));
}
sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,