diff --git a/sstables/row.hh b/sstables/row.hh index fb440c4d26..4afdca3dac 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -241,6 +241,7 @@ private: } _state = state::ROW_START; row_consumer& _consumer; + shared_sstable _sst; temporary_buffer _key; temporary_buffer _val; @@ -270,6 +271,14 @@ public: // leave only the unprocessed part. The caller must handle calling // process() again, and/or refilling the buffer, as needed. data_consumer::processing_result process_state(temporary_buffer& data) { + try { + return do_process_state(data); + } catch (malformed_sstable_exception& exp) { + throw malformed_sstable_exception(exp.what(), _sst->get_filename()); + } + } +private: + data_consumer::processing_result do_process_state(temporary_buffer& data) { #if 0 // Testing hack: call process() for tiny chunks separately, to verify // that primitive types crossing input buffer are handled correctly. @@ -508,13 +517,15 @@ public: return row_consumer::proceed::yes; } +public: data_consume_rows_context(const schema&, - const shared_sstable&, + const shared_sstable& sst, row_consumer& consumer, input_stream&& input, uint64_t start, uint64_t maxlen) : continuous_data_consumer(std::move(input), start, maxlen) - , _consumer(consumer) { + , _consumer(consumer) + , _sst(sst) { } void verify_end_state() { @@ -610,6 +621,7 @@ private: } _state = state::PARTITION_START; consumer_m& _consumer; + shared_sstable _sst; const serialization_header& _header; column_translation _column_translation; @@ -753,6 +765,14 @@ public: } data_consumer::processing_result process_state(temporary_buffer& data) { + try { + return do_process_state(data); + } catch (malformed_sstable_exception& exp) { + throw malformed_sstable_exception(exp.what(), _sst->get_filename()); + } + } +private: + data_consumer::processing_result do_process_state(temporary_buffer& data) { switch (_state) { case state::PARTITION_START: partition_start_label: @@ -1277,6 +1297,7 @@ public: return row_consumer::proceed::yes; } +public: data_consume_rows_context_m(const schema& s, const shared_sstable& sst, @@ -1286,6 +1307,7 @@ public: uint64_t maxlen) : continuous_data_consumer(std::move(input), start, maxlen) , _consumer(consumer) + , _sst(sst) , _header(sst->get_serialization_header()) , _column_translation(sst->get_column_translation(s, _header)) , _liveness(_header)