diff --git a/sstables/consumer.hh b/sstables/consumer.hh index 070c4417b5..0115e28094 100644 --- a/sstables/consumer.hh +++ b/sstables/consumer.hh @@ -17,9 +17,9 @@ #include #include "bytes.hh" #include "reader_permit.hh" -#include "utils/assert.hh" #include "utils/fragmented_temporary_buffer.hh" #include "utils/small_vector.hh" +#include "exceptions.hh" #include @@ -324,7 +324,7 @@ private: // Reads bytes belonging to an integer of size len. Returns true // if a full integer is now available. bool process_int(Buffer& data, unsigned len) { - SCYLLA_ASSERT(_pos < len); + sstables::parse_assert(_pos < len); auto n = std::min((size_t)(len - _pos), data.size()); std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); data.trim_front(n); @@ -557,7 +557,7 @@ public: while (data || (!primitive_consumer::active() && non_consuming())) { // The primitive_consumer must finish before the enclosing state machine can continue. if (__builtin_expect(primitive_consumer::consume(data) == read_status::waiting, false)) { - SCYLLA_ASSERT(data.size() == 0); + sstables::parse_assert(data.size() == 0); return proceed::yes; } auto ret = state_processor().process_state(data); @@ -607,7 +607,7 @@ public: }, [this, &data, orig_data_size](skip_bytes skip) { // we only expect skip_bytes to be used if reader needs to skip beyond the provided buffer // otherwise it should just trim_front and proceed as usual - SCYLLA_ASSERT(data.size() == 0); + sstables::parse_assert(data.size() == 0); _remain -= orig_data_size; if (skip.get_value() >= _remain) { skip_bytes skip_remaining(_remain); @@ -625,11 +625,11 @@ public: } future<> fast_forward_to(size_t begin, size_t end) { - SCYLLA_ASSERT(begin >= _stream_position.position); + sstables::parse_assert(begin >= _stream_position.position); auto n = begin - _stream_position.position; _stream_position.position = begin; - SCYLLA_ASSERT(end >= _stream_position.position); + sstables::parse_assert(end >= _stream_position.position); _remain = end - _stream_position.position; primitive_consumer::reset(); diff --git a/sstables/exceptions.hh b/sstables/exceptions.hh index 2a0e0447eb..a78d001432 100644 --- a/sstables/exceptions.hh +++ b/sstables/exceptions.hh @@ -28,6 +28,19 @@ public: } }; +[[noreturn]] void on_parse_error(sstring message, std::optional filename); + +// Use this instead of SCYLLA_ASSERT() or assert() in code that is used while parsing SSTables. +// SSTables can be corrupted either by ScyllaDB itself or by a freak accident like cosmic background +// radiation hitting the disk the wrong way. Either way a corrupt SSTable should not bring down the +// whole server. This method will call on_internal_error() if the condition is false. +// The exception will include a complete backtrace, so no need to add call-site identifiers to the message. +inline void parse_assert(bool condition, std::optional filename = {}, const char* message = nullptr) { + if (!condition) [[unlikely]] { + on_parse_error(message, filename); + } +} + struct bufsize_mismatch_exception : malformed_sstable_exception { bufsize_mismatch_exception(size_t size, size_t expected) : malformed_sstable_exception(format("Buffer improperly sized to hold requested data. Got: {:d}. Expected: {:d}", size, expected)) diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh index 0693f75f87..608878abb3 100644 --- a/sstables/index_reader.hh +++ b/sstables/index_reader.hh @@ -7,7 +7,6 @@ */ #pragma once -#include "utils/assert.hh" #include "sstables.hh" #include "consumer.hh" #include "downsampling.hh" @@ -507,7 +506,7 @@ private: // Must be called for non-decreasing summary_idx. future<> advance_to_page(index_bound& bound, uint64_t summary_idx) { sstlog.trace("index {}: advance_to_page({}), bound {}", fmt::ptr(this), summary_idx, fmt::ptr(&bound)); - SCYLLA_ASSERT(!bound.current_list || bound.current_summary_idx <= summary_idx); + parse_assert(!bound.current_list || bound.current_summary_idx <= summary_idx, _sstable->index_filename()); if (bound.current_list && bound.current_summary_idx == summary_idx) { sstlog.trace("index {}: same page", fmt::ptr(this)); return make_ready_future<>(); @@ -607,7 +606,7 @@ private: // Valid if partition_data_ready(bound) index_entry& current_partition_entry(index_bound& bound) { - SCYLLA_ASSERT(bound.current_list); + parse_assert(bool(bound.current_list), _sstable->index_filename()); return *bound.current_list->_entries[bound.current_index_idx]; } @@ -672,7 +671,7 @@ private: // is no G in that bucket so we read the following one to get the // position (see the advance_to_page() call below). After we've got it, it's time to // get J] position. Again, summary points us to the first bucket and we - // hit an SCYLLA_ASSERT since the reader is already at the second bucket and we + // hit an parse_assert since the reader is already at the second bucket and we // cannot go backward. // The solution is this condition above. If our lookup requires reading // the previous bucket we assume that the entry doesn't exist and return @@ -726,7 +725,7 @@ public: // So need to make sure first that it is read if (!partition_data_ready(_lower_bound)) { return read_partition_data().then([this, pos] { - SCYLLA_ASSERT(partition_data_ready()); + parse_assert(partition_data_ready(), _sstable->index_filename()); return advance_upper_past(pos); }); } @@ -804,12 +803,12 @@ public: // Ensures that partition_data_ready() returns true. // Can be called only when !eof() future<> read_partition_data() { - SCYLLA_ASSERT(!eof()); + parse_assert(!eof(), _sstable->index_filename()); if (partition_data_ready(_lower_bound)) { return make_ready_future<>(); } // The only case when _current_list may be missing is when the cursor is at the beginning - SCYLLA_ASSERT(_lower_bound.current_summary_idx == 0); + parse_assert(_lower_bound.current_summary_idx == 0, _sstable->index_filename()); return advance_to_page(_lower_bound, 0); } @@ -912,7 +911,7 @@ public: if (!partition_data_ready()) { return read_partition_data().then([this, pos] { sstlog.trace("index {}: page done", fmt::ptr(this)); - SCYLLA_ASSERT(partition_data_ready(_lower_bound)); + parse_assert(partition_data_ready(_lower_bound), _sstable->index_filename()); return advance_to(pos); }); } @@ -993,7 +992,7 @@ public: // so need to make sure first that the lower bound partition data is in memory. if (!partition_data_ready(_lower_bound)) { return read_partition_data().then([this, pos] { - SCYLLA_ASSERT(partition_data_ready()); + parse_assert(partition_data_ready(), _sstable->index_filename()); return advance_reverse(pos); }); } @@ -1038,7 +1037,7 @@ public: // // Preconditions: sstable version >= mc, partition_data_ready(). future> last_block_offset() { - SCYLLA_ASSERT(partition_data_ready()); + parse_assert(partition_data_ready(), _sstable->index_filename()); auto cur = current_clustered_cursor(); if (!cur) { diff --git a/sstables/kl/reader.cc b/sstables/kl/reader.cc index 81a91a6e38..5b59a42502 100644 --- a/sstables/kl/reader.cc +++ b/sstables/kl/reader.cc @@ -15,7 +15,6 @@ #include "clustering_key_filter.hh" #include "clustering_ranges_walker.hh" #include "concrete_types.hh" -#include "utils/assert.hh" #include "utils/to_string.hh" #include "utils/value_or_reference.hh" @@ -268,7 +267,7 @@ private: std::optional _pending_collection = {}; collection_mutation& pending_collection(const column_definition *cdef) { - SCYLLA_ASSERT(cdef->is_multi_cell() && "frozen set should behave like a cell\n"); + parse_assert(cdef->is_multi_cell(), _sst->get_filename(), "frozen set should behave like a cell"); if (!_pending_collection || _pending_collection->is_new_collection(cdef)) { flush_pending_collection(*_schema); _pending_collection = collection_mutation(cdef); @@ -436,7 +435,7 @@ public: flush_pending_collection(*_schema); // If _ready is already set we have a bug: get_mutation_fragment() // was not called, and below we will lose one clustering row! - SCYLLA_ASSERT(!_ready); + parse_assert(!_ready, _sst->get_filename()); if (!_skip_in_progress) { _ready = std::exchange(_in_progress, { }); return push_ready_fragments_with_ready_set(); @@ -1124,7 +1123,7 @@ public: _state = state::ATOM_START; break; default: - SCYLLA_ASSERT(0); + on_parse_error(format("Invalid indexable element {}", static_cast::type>(el)), _sst->get_filename()); } _consumer.reset(el); _gen = do_process_state(); @@ -1216,7 +1215,7 @@ private: _read_enabled = false; return make_ready_future<>(); } - SCYLLA_ASSERT(_index_reader->element_kind() == indexable_element::partition); + parse_assert(_index_reader->element_kind() == indexable_element::partition, _sst->get_filename()); return skip_to(_index_reader->element_kind(), start).then([this] { _sst->get_stats().on_partition_seek(); }); @@ -1296,7 +1295,7 @@ private: if (!pos || pos->is_before_all_fragments(*_schema)) { return make_ready_future<>(); } - SCYLLA_ASSERT (_current_partition_key); + parse_assert(bool(_current_partition_key), _sst->get_filename()); return [this] { if (!_index_in_current_partition) { _index_in_current_partition = true; @@ -1340,7 +1339,7 @@ private: } auto [begin, end] = _index_reader->data_file_positions(); - SCYLLA_ASSERT(end); + parse_assert(bool(end), _sst->get_filename()); if (_single_partition_read) { _read_enabled = (begin != *end); @@ -1389,11 +1388,11 @@ public: _partition_finished = true; _before_partition = true; _end_of_stream = false; - SCYLLA_ASSERT(_index_reader); + parse_assert(bool(_index_reader), _sst->get_filename()); auto f1 = _index_reader->advance_to(pr); return f1.then([this] { auto [start, end] = _index_reader->data_file_positions(); - SCYLLA_ASSERT(end); + parse_assert(bool(end), _sst->get_filename()); if (start != *end) { _read_enabled = true; _index_in_current_partition = true; diff --git a/sstables/mutation_fragment_filter.hh b/sstables/mutation_fragment_filter.hh index f967895e19..6bce96bd88 100644 --- a/sstables/mutation_fragment_filter.hh +++ b/sstables/mutation_fragment_filter.hh @@ -8,7 +8,7 @@ #pragma once -#include "utils/assert.hh" +#include "exceptions.hh" #include "mutation/mutation_fragment.hh" #include "clustering_ranges_walker.hh" #include "clustering_key_filter.hh" @@ -130,7 +130,7 @@ public: * query ranges tracked by _walker. */ std::optional fast_forward_to(position_range r) { - SCYLLA_ASSERT(_fwd); + parse_assert(bool(_fwd)); _fwd_end = std::move(r).end(); _out_of_range = !_walker.advance_to(r.start(), _fwd_end); diff --git a/sstables/mx/bsearch_clustered_cursor.hh b/sstables/mx/bsearch_clustered_cursor.hh index d40a5e48ed..f7183cc851 100644 --- a/sstables/mx/bsearch_clustered_cursor.hh +++ b/sstables/mx/bsearch_clustered_cursor.hh @@ -12,7 +12,6 @@ #include "sstables/column_translation.hh" #include "parsers.hh" #include "schema/schema.hh" -#include "utils/assert.hh" #include "utils/cached_file.hh" #include "utils/to_string.hh" @@ -107,13 +106,13 @@ public: const schema& _s; bool operator()(const promoted_index_block& lhs, position_in_partition_view rhs) const { - SCYLLA_ASSERT(lhs.start); + parse_assert(bool(lhs.start)); position_in_partition::less_compare less(_s); return less(*lhs.start, rhs); } bool operator()(position_in_partition_view lhs, const promoted_index_block& rhs) const { - SCYLLA_ASSERT(rhs.start); + parse_assert(bool(rhs.start)); position_in_partition::less_compare less(_s); return less(lhs, *rhs.start); } diff --git a/sstables/mx/parsers.hh b/sstables/mx/parsers.hh index 9eb85767a2..793fd23557 100644 --- a/sstables/mx/parsers.hh +++ b/sstables/mx/parsers.hh @@ -8,7 +8,6 @@ #pragma once -#include "utils/assert.hh" #include "sstables/consumer.hh" #include "sstables/types.hh" #include "sstables/column_translation.hh" @@ -290,7 +289,7 @@ public: } [[fallthrough]]; case state::END_OPEN_MARKER_FLAG: - SCYLLA_ASSERT(_primitive._i64 + width_base > 0); + parse_assert(_primitive._i64 + width_base > 0); _width = (_primitive._i64 + width_base); if (_primitive.read_8(data) != read_status::ready) { _state = state::END_OPEN_MARKER_LOCAL_DELETION_TIME; diff --git a/sstables/mx/reader.cc b/sstables/mx/reader.cc index e8636bf6a3..4a867104e5 100644 --- a/sstables/mx/reader.cc +++ b/sstables/mx/reader.cc @@ -14,7 +14,6 @@ #include "sstables/m_format_read_helpers.hh" #include "sstables/sstable_mutation_reader.hh" #include "sstables/processing_result_generator.hh" -#include "utils/assert.hh" #include "utils/to_string.hh" #include "utils/value_or_reference.hh" @@ -502,7 +501,7 @@ public: return consume_range_tombstone_boundary(std::move(pos), end_tombstone, start_tombstone); } default: - SCYLLA_ASSERT(false && "Invalid boundary type"); + on_parse_error(format("Invalid boundary type", static_cast::type>(kind)), _sst->get_filename()); } } @@ -1306,7 +1305,7 @@ public: " partition range: {}", pr)); } // FIXME: if only the defaults were better... - //SCYLLA_ASSERT(fwd_mr == mutation_reader::forwarding::no); + //parse_assert(fwd_mr == mutation_reader::forwarding::no); } } @@ -1351,7 +1350,7 @@ private: _read_enabled = false; return make_ready_future<>(); } - SCYLLA_ASSERT(_index_reader->element_kind() == indexable_element::partition); + parse_assert(_index_reader->element_kind() == indexable_element::partition, _sst->get_filename()); return skip_to(_index_reader->element_kind(), start).then([this] { _sst->get_stats().on_partition_seek(); }); @@ -1431,7 +1430,7 @@ private: if (!pos || pos->is_before_all_fragments(*_schema)) { return make_ready_future<>(); } - SCYLLA_ASSERT (_current_partition_key); + parse_assert(bool(_current_partition_key), _sst->get_filename()); return [this] { if (!_index_in_current_partition) { _index_in_current_partition = true; @@ -1449,7 +1448,7 @@ private: // The reversing data source will notice the skip and update the data ranges // from which it prepares the data given to us. - SCYLLA_ASSERT(_reversed_read_sstable_position); + parse_assert(_reversed_read_sstable_position, _sst->get_filename()); auto ip = _index_reader->data_file_positions(); if (ip.end >= *_reversed_read_sstable_position) { // The reversing data source was already ahead (in reverse - its position was smaller) @@ -1545,7 +1544,7 @@ private: } auto [begin, end] = _index_reader->data_file_positions(); - SCYLLA_ASSERT(end); + parse_assert(bool(end), _sst->get_filename()); sstlog.trace("sstable_reader: {}: data file range [{}, {})", fmt::ptr(this), begin, *end); @@ -1617,11 +1616,11 @@ public: _partition_finished = true; _before_partition = true; _end_of_stream = false; - SCYLLA_ASSERT(_index_reader); + parse_assert(bool(_index_reader), _sst->get_filename()); auto f1 = _index_reader->advance_to(pr); return f1.then([this] { auto [start, end] = _index_reader->data_file_positions(); - SCYLLA_ASSERT(end); + parse_assert(bool(end), _sst->get_filename()); if (start != *end) { _read_enabled = true; _index_in_current_partition = true; @@ -2070,7 +2069,7 @@ public: case bound_kind_m::excl_end_incl_start: return consume_range_tombstone(ecp, bound_kind::incl_start, start_tombstone); default: - SCYLLA_ASSERT(false && "Invalid boundary type"); + on_parse_error(format("Invalid boundary type", static_cast>(kind)), {}); } } diff --git a/sstables/mx/types.hh b/sstables/mx/types.hh index 610d81035f..0b90db8e99 100644 --- a/sstables/mx/types.hh +++ b/sstables/mx/types.hh @@ -8,7 +8,7 @@ #pragma once -#include "utils/assert.hh" +#include "sstables/exceptions.hh" #include "clustering_bounds_comparator.hh" #include @@ -63,7 +63,7 @@ inline bool is_start(bound_kind_m kind) { } inline bound_kind to_bound_kind(bound_kind_m kind) { - SCYLLA_ASSERT(is_bound_kind(kind)); + parse_assert(is_bound_kind(kind)); using underlying_type = std::underlying_type_t; return bound_kind{static_cast(kind)}; } @@ -74,12 +74,12 @@ inline bound_kind_m to_bound_kind_m(bound_kind kind) { } inline bound_kind boundary_to_start_bound(bound_kind_m kind) { - SCYLLA_ASSERT(is_boundary_between_adjacent_intervals(kind)); + parse_assert(is_boundary_between_adjacent_intervals(kind)); return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::excl_start : bound_kind::incl_start; } inline bound_kind boundary_to_end_bound(bound_kind_m kind) { - SCYLLA_ASSERT(is_boundary_between_adjacent_intervals(kind)); + parse_assert(is_boundary_between_adjacent_intervals(kind)); return (kind == bound_kind_m::incl_end_excl_start) ? bound_kind::incl_end : bound_kind::excl_end; } diff --git a/sstables/partition_index_cache.hh b/sstables/partition_index_cache.hh index 85b2a043be..330a416615 100644 --- a/sstables/partition_index_cache.hh +++ b/sstables/partition_index_cache.hh @@ -208,7 +208,7 @@ public: return with_allocator(_region.allocator(), [&] { auto it_and_flag = _cache.emplace(key, this, key); entry &cp = *it_and_flag.first; - SCYLLA_ASSERT(it_and_flag.second); + parse_assert(it_and_flag.second); try { return share(cp); } catch (...) { diff --git a/sstables/promoted_index_blocks_reader.hh b/sstables/promoted_index_blocks_reader.hh index 87487d2e60..026b1c6188 100644 --- a/sstables/promoted_index_blocks_reader.hh +++ b/sstables/promoted_index_blocks_reader.hh @@ -8,7 +8,6 @@ #pragma once -#include "utils/assert.hh" #include "consumer.hh" #include "column_translation.hh" #include "sstables/mx/parsers.hh" @@ -173,7 +172,7 @@ public: std::visit([this, &data] (auto& ctx) mutable { return process_state(data, ctx); }, _ctx); if (_mode == consuming_mode::consume_until) { - SCYLLA_ASSERT(_pos); + parse_assert(bool(_pos)); auto cmp_with_start = [this, pos_cmp = promoted_index_block_compare(_s)] (position_in_partition_view pos, const promoted_index_block& block) -> bool { return pos_cmp(pos, block.start(_s)); diff --git a/sstables/random_access_reader.hh b/sstables/random_access_reader.hh index 467e1f7de1..12c71c6ee9 100644 --- a/sstables/random_access_reader.hh +++ b/sstables/random_access_reader.hh @@ -8,7 +8,7 @@ #pragma once -#include "utils/assert.hh" +#include "exceptions.hh" #include #include #include @@ -26,7 +26,7 @@ protected: virtual input_stream open_at(uint64_t pos) = 0; void set(input_stream in) { - SCYLLA_ASSERT(!_in); + parse_assert(!_in); _in = std::make_unique>(std::move(in)); } diff --git a/sstables/scanning_clustered_index_cursor.hh b/sstables/scanning_clustered_index_cursor.hh index 287d585dc6..6242e995a1 100644 --- a/sstables/scanning_clustered_index_cursor.hh +++ b/sstables/scanning_clustered_index_cursor.hh @@ -8,7 +8,6 @@ #pragma once -#include "utils/assert.hh" #include "sstables/index_entry.hh" #include "sstables/promoted_index_blocks_reader.hh" #include "schema/schema.hh" @@ -97,7 +96,7 @@ private: // End open marker can be only engaged in SSTables 3.x ('mc' format) and never in ka/la auto end_pos = prev->end(_s); position_in_partition_view* open_rt_pos = std::get_if(&end_pos); - SCYLLA_ASSERT(open_rt_pos); + parse_assert(bool(open_rt_pos)); return skip_info{offset, tombstone(*prev->end_open_marker()), position_in_partition{*open_rt_pos}}; } } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 0b48b957f9..bafd4d81d3 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -34,7 +34,6 @@ #include #include -#include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/to_string.hh" #include "data_dictionary/storage_options.hh" @@ -106,6 +105,20 @@ thread_local utils::updateable_value global_cache_index_pages(true); logging::logger sstlog("sstable"); +[[noreturn]] void on_parse_error(sstring message, std::optional filename) { + auto make_exception = [&] { + if (message.empty()) { + message = "parse_assert() failed"; + } + if (filename) { + return malformed_sstable_exception(message, *filename); + } + return malformed_sstable_exception(message); + }; + auto ex = std::make_exception_ptr(make_exception()); + on_internal_error(sstlog, std::move(ex)); +} + template const char* nullsafe_typename(T* x) noexcept { try { @@ -897,7 +910,7 @@ void file_writer::close() { // to work, because file stream would step on unaligned IO and S3 upload // stream would send completion message to the server and would lose any // subsequent write. - SCYLLA_ASSERT(!_closed && "file_writer already closed"); + parse_assert(!_closed, _component, "file_writer already closed"); std::exception_ptr ex; try { _out.flush().get(); @@ -1329,7 +1342,7 @@ future<> sstable::open_or_create_data(open_flags oflags, file_open_options optio future<> sstable::open_data(sstable_open_config cfg) noexcept { co_await open_or_create_data(open_flags::ro); co_await update_info_for_opened_data(cfg); - SCYLLA_ASSERT(!_shards.empty()); + parse_assert(!_shards.empty(), get_filename()); auto* sm = _components->scylla_metadata->data.get(); if (sm) { // Sharding information uses a lot of memory and once we're doing with this computation we will no longer use it. @@ -1366,7 +1379,7 @@ future<> sstable::update_info_for_opened_data(sstable_open_config cfg) { auto size = co_await _index_file.size(); _index_file_size = size; - SCYLLA_ASSERT(!_cached_index_file); + parse_assert(!_cached_index_file, get_filename()); _cached_index_file = seastar::make_shared(_index_file, _manager.get_cache_tracker().get_index_cached_file_stats(), _manager.get_cache_tracker().get_lru(), @@ -1643,7 +1656,7 @@ sstable::load_owner_shards(const dht::sharder& sharder) { } void prepare_summary(summary& s, uint64_t expected_partition_count, uint32_t min_index_interval) { - SCYLLA_ASSERT(expected_partition_count >= 1); + parse_assert(expected_partition_count >= 1); s.header.min_index_interval = min_index_interval; s.header.sampling_level = downsampling::BASE_SAMPLING_LEVEL; @@ -1665,7 +1678,7 @@ future<> seal_summary(summary& s, s.header.size = s.entries.size(); s.header.size_at_full_sampling = sstable::get_size_at_full_sampling(state.partition_count, s.header.min_index_interval); - SCYLLA_ASSERT(first_key); // assume non-empty sstable + parse_assert(bool(first_key), {}, "attempted to seal summary of empty sstable"); s.first_key.value = first_key->get_bytes(); if (last_key) { @@ -2238,7 +2251,8 @@ sstring sstable::component_basename(const sstring& ks, const sstring& cf, versio case sstable::version_types::me: return v + "-" + g + "-" + f + "-" + component; } - SCYLLA_ASSERT(0 && "invalid version"); + on_internal_error(sstlog, seastar::format("invalid version {} for sstable: table={}.{}, generation={}, format={}, component={}", + static_cast::type>(version), ks, cf, g, f, component)); } sstring sstable::component_basename(const sstring& ks, const sstring& cf, version_types version, generation_type generation,