diff --git a/sstables/consumer.hh b/sstables/consumer.hh new file mode 100644 index 0000000000..89bc3c7c9e --- /dev/null +++ b/sstables/consumer.hh @@ -0,0 +1,251 @@ +/* +* Copyright (C) 2015 Cloudius Systems, Ltd. +* +*/ + +#pragma once + +#include +#include "core/future.hh" +#include "core/iostream.hh" +#include "sstables/exceptions.hh" + +template +static inline T consume_be(temporary_buffer& p) { + T i = net::ntoh(*unaligned_cast(p.get())); + p.trim_front(sizeof(T)); + return i; +} + +namespace data_consumer { +enum class proceed { yes, no }; + +template +class continuous_data_consumer { + using proceed = data_consumer::proceed; + StateProcessor& _state_processor; +protected: + input_stream _input; + // remaining length of input to read (if <0, continue until end of file). + int64_t _remain; + + // state machine progress: + enum class prestate { + NONE, + READING_U16, + READING_U32, + READING_U64, + READING_BYTES, + } _prestate = prestate::NONE; + + // state for non-NONE prestates + uint32_t _pos; + // state for READING_U16, READING_U32, READING_U64 prestate + uint16_t _u16; + uint32_t _u32; + uint64_t _u64; + union { + char bytes[sizeof(uint64_t)]; + uint64_t uint64; + uint32_t uint32; + uint16_t uint16; + } _read_int; + // state for READING_BYTES prestate + temporary_buffer _read_bytes; + temporary_buffer* _read_bytes_where; // which temporary_buffer to set, _key or _val? + + enum class read_status { ready, waiting }; + // Read a 16-bit integer into _u16. If the whole thing is in the buffer + // (this is the common case), do this immediately. Otherwise, remember + // what we have in the buffer, and remember to continue later by using + // a "prestate": + inline read_status read_16(temporary_buffer& data) { + if (data.size() >= sizeof(uint16_t)) { + _u16 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U16; + return read_status::waiting; + } + } + inline read_status read_32(temporary_buffer& data) { + if (data.size() >= sizeof(uint32_t)) { + _u32 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U32; + return read_status::waiting; + } + } + inline read_status read_64(temporary_buffer& data) { + if (data.size() >= sizeof(uint64_t)) { + _u64 = consume_be(data); + return read_status::ready; + } else { + std::copy(data.begin(), data.end(), _read_int.bytes); + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_U64; + return read_status::waiting; + } + } + inline read_status read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where) { + if (data.size() >= len) { + where = data.share(0, len); + data.trim_front(len); + return read_status::ready; + } else { + // copy what we have so far, read the rest later + _read_bytes = temporary_buffer(len); + std::copy(data.begin(), data.end(),_read_bytes.get_write()); + _read_bytes_where = &where; + _pos = data.size(); + data.trim(0); + _prestate = prestate::READING_BYTES; + return read_status::waiting; + } + } + + inline void process_buffer(temporary_buffer& data) { + while (__builtin_expect((_prestate != prestate::NONE), 0)) { + do_process_buffer(data); + } + } +private: + // This is separated so that the compiler can inline "process_buffer". Because this chunk is too big, + // it usually won't if this is part of the main function + void do_process_buffer(temporary_buffer& data) { + // We're in the middle of reading a basic type, which crossed + // an input buffer. Resume that read before continuing to + // handle the current state: + if (_prestate == prestate::READING_BYTES) { + auto n = std::min(_read_bytes.size() - _pos, data.size()); + std::copy(data.begin(), data.begin() + n, + _read_bytes.get_write() + _pos); + data.trim_front(n); + _pos += n; + if (_pos == _read_bytes.size()) { + *_read_bytes_where = std::move(_read_bytes); + _prestate = prestate::NONE; + } + } else { + // in the middle of reading an integer + unsigned len; + switch (_prestate) { + case prestate::READING_U16: + len = sizeof(uint16_t); + break; + case prestate::READING_U32: + len = sizeof(uint32_t); + break; + case prestate::READING_U64: + len = sizeof(uint64_t); + break; + default: + throw sstables::malformed_sstable_exception("unknown prestate"); + } + assert(_pos < len); + auto n = std::min((size_t)(len - _pos), data.size()); + std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); + data.trim_front(n); + _pos += n; + if (_pos == len) { + // done reading the integer, store it in _u16, _u32 or _u64: + switch (_prestate) { + case prestate::READING_U16: + _u16 = net::ntoh(_read_int.uint16); + break; + case prestate::READING_U32: + _u32 = net::ntoh(_read_int.uint32); + break; + case prestate::READING_U64: + _u64 = net::ntoh(_read_int.uint64); + break; + default: + throw sstables::malformed_sstable_exception( + "unknown prestate"); + } + _prestate = prestate::NONE; + } + } + } + + void verify_end_state() { + _state_processor.verify_end_state(); + } +public: + continuous_data_consumer(StateProcessor& state_processor, input_stream&& input, uint64_t maxlen) + : _state_processor(state_processor), _input(std::move(input)), _remain(maxlen) {} + + + continuous_data_consumer(continuous_data_consumer&&) = delete; + continuous_data_consumer(const continuous_data_consumer&) = default; + + template + future<> consume_input(Consumer& c) { + return _input.consume(c); + } + + // some states do not consume input (its only exists to perform some + // action when finishing to read a primitive type via a prestate, in + // the rare case that a primitive type crossed a buffer). Such + // non-consuming states need to run even if the data buffer is empty. + bool non_consuming() { + return _state_processor.non_consuming(); + } + + inline proceed process(temporary_buffer& data) { + while (data || non_consuming()) { + process_buffer(data); + auto ret = _state_processor.process_state(data); + if (__builtin_expect(ret == proceed::no, 0)) { + return ret; + } + } + return proceed::yes; + } + + using unconsumed_remainder = input_stream::unconsumed_remainder; + // called by input_stream::consume(): + future + operator()(temporary_buffer data) { + if (_remain >= 0 && data.size() >= (uint64_t)_remain) { + // We received more data than we actually care about, so process + // the beginning of the buffer, and return the rest to the stream + auto segment = data.share(0, _remain); + process(segment); + data.trim_front(_remain - segment.size()); + _remain -= (_remain - segment.size()); + if (_remain == 0) { + verify_end_state(); + } + return make_ready_future(std::move(data)); + } else if (data.empty()) { + // End of file + verify_end_state(); + return make_ready_future(std::move(data)); + } else { + // We can process the entire buffer (if the consumer wants to). + auto orig_data_size = data.size(); + if (process(data) == proceed::yes) { + assert(data.size() == 0); + if (_remain >= 0) { + _remain -= orig_data_size; + } + return make_ready_future(); + } else { + if (_remain >= 0) { + _remain -= orig_data_size - data.size(); + } + return make_ready_future(std::move(data)); + } + } + } +}; +} diff --git a/sstables/exceptions.hh b/sstables/exceptions.hh new file mode 100644 index 0000000000..ea102a4897 --- /dev/null +++ b/sstables/exceptions.hh @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + * + */ + +#pragma once +namespace sstables { +class malformed_sstable_exception : public std::exception { + sstring _msg; +public: + malformed_sstable_exception(sstring s) : _msg(s) {} + const char *what() const noexcept { + return _msg.c_str(); + } +}; + +struct bufsize_mismatch_exception : malformed_sstable_exception { + bufsize_mismatch_exception(size_t size, size_t expected) : + malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected)) + {} +}; +} diff --git a/sstables/index_reader.hh b/sstables/index_reader.hh new file mode 100644 index 0000000000..3964173bfb --- /dev/null +++ b/sstables/index_reader.hh @@ -0,0 +1,118 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once +#include "sstables.hh" +#include "consumer.hh" + +namespace sstables { + +class index_consumer { + uint64_t max_quantity; +public: + index_list indexes; + + index_consumer(uint64_t q) : max_quantity(q) { + indexes.reserve(q); + } + + bool should_continue() { + return indexes.size() < max_quantity; + } + void consume_entry(index_entry&& ie) { + indexes.push_back(std::move(ie)); + } +}; + +class index_consume_entry_context: public data_consumer::continuous_data_consumer { + using proceed = data_consumer::proceed; +private: + index_consumer& _consumer; + + enum class state { + START, + KEY_SIZE, + KEY_BYTES, + POSITION, + PROMOTED_SIZE, + PROMOTED_BYTES, + CONSUME_ENTRY, + } _state = state::START; + + temporary_buffer _key; + temporary_buffer _promoted; + + static inline bytes to_bytes(temporary_buffer& b) { + using byte = bytes_view::value_type; + auto s = bytes(reinterpret_cast(b.get()), b.size()); + b.release(); + return s; + } + +public: + void verify_end_state() { + } + + bool non_consuming() const { + return ((_state == state::CONSUME_ENTRY) || (_state == state::START) || + ((_state == state::PROMOTED_BYTES) && (_prestate == prestate::NONE))); + } + + proceed process_state(temporary_buffer& data) { + switch (_state) { + // START comes first, to make the handling of the 0-quantity case simpler + case state::START: + if (!_consumer.should_continue()) { + return proceed::no; + } + _state = state::KEY_SIZE; + break; + case state::KEY_SIZE: + if (read_16(data) != read_status::ready) { + _state = state::KEY_BYTES; + break; + } + case state::KEY_BYTES: + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::POSITION; + break; + } + case state::POSITION: + if (read_64(data) != read_status::ready) { + _state = state::PROMOTED_SIZE; + break; + } + case state::PROMOTED_SIZE: + if (read_32(data) != read_status::ready) { + _state = state::PROMOTED_BYTES; + break; + } + case state::PROMOTED_BYTES: + if (read_bytes(data, _u32, _promoted) != read_status::ready) { + _state = state::CONSUME_ENTRY; + break; + } + case state::CONSUME_ENTRY: { + index_entry ie; + ie.key.value = to_bytes(_key); + ie.position = _u64; + ie.promoted_index.value = to_bytes(_promoted); + _consumer.consume_entry(std::move(ie)); + _state = state::START; + break; + } + default: + throw malformed_sstable_exception("unknown state"); + } + return proceed::yes; + } + + index_consume_entry_context(index_consumer& consumer, + input_stream&& input, uint64_t maxlen) + : continuous_data_consumer(*this, std::move(input), maxlen) + , _consumer(consumer) + {} + +}; +} diff --git a/sstables/row.cc b/sstables/row.cc index 8fcc378992..7424182d7c 100644 --- a/sstables/row.cc +++ b/sstables/row.cc @@ -3,33 +3,14 @@ */ #include "sstables.hh" - -template -static inline T consume_be(temporary_buffer& p) { - T i = net::ntoh(*unaligned_cast(p.get())); - p.trim_front(sizeof(T)); - return i; -} +#include "consumer.hh" namespace sstables { // data_consume_rows_context remembers the context that an ongoing // data_consume_rows() future is in. -class data_consume_rows_context { +class data_consume_rows_context : public data_consumer::continuous_data_consumer { private: - row_consumer& _consumer; - input_stream _input; - // remaining length of input to read (if <0, continue until end of file). - int64_t _remain; - - // state machine progress: - enum class prestate { - NONE, - READING_U16, - READING_U32, - READING_U64, - READING_BYTES, - } _prestate = prestate::NONE; enum class state { ROW_START, ROW_KEY_BYTES, @@ -53,31 +34,8 @@ private: RANGE_TOMBSTONE_4, RANGE_TOMBSTONE_5, } _state = state::ROW_START; - // some states do not consume input (its only exists to perform some - // action when finishing to read a primitive type via a prestate, in - // the rare case that a primitive type crossed a buffer). Such - // non-consuming states need to run even if the data buffer is empty. - static bool non_consuming(state s, prestate ps) { - return (((s == state::DELETION_TIME_3) - || (s == state::CELL_VALUE_BYTES_2) - || (s == state::ATOM_START_2) - || (s == state::EXPIRING_CELL_3)) && (ps == prestate::NONE)); - } - // state for non-NONE prestates - uint32_t _pos; - // state for READING_U16, READING_U32, READING_U64 prestate - uint16_t _u16; - uint32_t _u32; - uint64_t _u64; - union { - char bytes[sizeof(uint64_t)]; - uint64_t uint64; - uint32_t uint32; - uint16_t uint16; - } _read_int; - // state for READING_BYTES prestate - temporary_buffer _read_bytes; - temporary_buffer* _read_bytes_where; // which temporary_buffer to set, _key or _val? + + row_consumer& _consumer; temporary_buffer _key; temporary_buffer _val; @@ -94,119 +52,19 @@ private: } public: - data_consume_rows_context(row_consumer& consumer, - input_stream && input, uint64_t maxlen) : - _consumer(consumer), _input(std::move(input)), _remain(maxlen) { - } - template - future<> consume_input(Consumer& c) { - return _input.consume(c); + bool non_consuming() const { + return (((_state == state::DELETION_TIME_3) + || (_state == state::CELL_VALUE_BYTES_2) + || (_state == state::ATOM_START_2) + || (_state == state::EXPIRING_CELL_3)) && (_prestate == prestate::NONE)); } - void verify_end_state() { - if (_state != state::ROW_START || _prestate != prestate::NONE) { - throw malformed_sstable_exception("end of input, but not end of row"); - } - } - - using unconsumed_remainder = input_stream::unconsumed_remainder; - // called by input_stream::consume(): - future - operator()(temporary_buffer data) { - if (_remain >= 0 && data.size() >= (uint64_t)_remain) { - // We received more data than we actually care about, so process - // the beginning of the buffer, and return the rest to the stream - auto segment = data.share(0, _remain); - process(segment); - data.trim_front(_remain - segment.size()); - _remain -= (_remain - segment.size()); - if (_remain == 0) { - verify_end_state(); - } - return make_ready_future(std::move(data)); - } else if (data.empty()) { - // End of file - verify_end_state(); - return make_ready_future(std::move(data)); - } else { - // We can process the entire buffer (if the consumer wants to). - auto orig_data_size = data.size(); - if (process(data) == row_consumer::proceed::yes) { - assert(data.size() == 0); - if (_remain >= 0) { - _remain -= orig_data_size; - } - return make_ready_future(); - } else { - if (_remain >= 0) { - _remain -= orig_data_size - data.size(); - } - return make_ready_future(std::move(data)); - } - } - } - -private: - // Read a 16-bit integer into _u16. If the whole thing is in the buffer - // (this is the common case), do this immediately. Otherwise, remember - // what we have in the buffer, and remember to continue later by using - // a "prestate": - inline void read_16(temporary_buffer& data, state next_state) { - if (data.size() >= sizeof(uint16_t)) { - _u16 = consume_be(data); - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U16; - } - _state = next_state; - } - inline void read_32(temporary_buffer& data, state next_state) { - if (data.size() >= sizeof(uint32_t)) { - _u32 = consume_be(data); - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U32; - } - _state = next_state; - } - inline void read_64(temporary_buffer& data, state next_state) { - if (data.size() >= sizeof(uint64_t)) { - _u64 = consume_be(data); - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U64; - } - _state = next_state; - } - inline void read_bytes(temporary_buffer& data, uint32_t len, temporary_buffer& where, state next_state) { - if (data.size() >= len) { - where = data.share(0, len); - data.trim_front(len); - } else { - // copy what we have so far, read the rest later - _read_bytes = temporary_buffer(_u16); - std::copy(data.begin(), data.end(),_read_bytes.get_write()); - _read_bytes_where = &where; - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_BYTES; - } - _state = next_state; - } - -public: // process() feeds the given data into the state machine. // The consumer may request at any point (e.g., after reading a whole // row) to stop the processing, in which case we trim the buffer to // leave only the unprocessed part. The caller must handle calling // process() again, and/or refilling the buffer, as needed. - row_consumer::proceed process(temporary_buffer& data) { + row_consumer::proceed process_state(temporary_buffer& data) { #if 0 // Testing hack: call process() for tiny chunks separately, to verify // that primitive types crossing input buffer are handled correctly. @@ -224,126 +82,43 @@ public: return row_consumer::proceed::yes; } #endif - while (data || non_consuming(_state, _prestate)) { - if (_prestate != prestate::NONE) { - // We're in the middle of reading a basic type, which crossed - // an input buffer. Resume that read before continuing to - // handle the current state: - if (_prestate == prestate::READING_BYTES) { - auto n = std::min(_read_bytes.size() - _pos, data.size()); - std::copy(data.begin(), data.begin() + n, - _read_bytes.get_write() + _pos); - data.trim_front(n); - _pos += n; - if (_pos == _read_bytes.size()) { - *_read_bytes_where = std::move(_read_bytes); - _prestate = prestate::NONE; - } - } else { - // in the middle of reading an integer - unsigned len; - switch (_prestate) { - case prestate::READING_U16: - len = sizeof(uint16_t); - break; - case prestate::READING_U32: - len = sizeof(uint32_t); - break; - case prestate::READING_U64: - len = sizeof(uint64_t); - break; - default: - throw malformed_sstable_exception("unknown prestate"); - } - assert(_pos < len); - auto n = std::min((size_t)(len - _pos), data.size()); - std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos); - data.trim_front(n); - _pos += n; - if (_pos == len) { - // done reading the integer, store it in _u16, _u32 or _u64: - switch (_prestate) { - case prestate::READING_U16: - _u16 = net::ntoh(_read_int.uint16); - break; - case prestate::READING_U32: - _u32 = net::ntoh(_read_int.uint32); - break; - case prestate::READING_U64: - _u64 = net::ntoh(_read_int.uint64); - break; - default: - throw malformed_sstable_exception( - "unknown prestate"); - } - _prestate = prestate::NONE; - } - } - continue; - } - - switch (_state) { - case state::ROW_START: - // read 2-byte key length into _u16 - read_16(data, state::ROW_KEY_BYTES); - break; - case state::ROW_KEY_BYTES: - // After previously reading 16-bit length, read key's bytes. - read_bytes(data, _u16, _key, state::DELETION_TIME); - break; - case state::DELETION_TIME: - if (data.size() >= sizeof(uint32_t) + sizeof(uint64_t)) { - // If we can read the entire deletion time at once, we can - // skip the DELETION_TIME_2 and DELETION_TIME_3 states. - deletion_time del; - del.local_deletion_time = consume_be(data); - del.marked_for_delete_at = consume_be(data); - _consumer.consume_row_start(to_bytes_view(_key), del); - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _state = state::ATOM_START; - } else { - read_32(data, state::DELETION_TIME_2); - } - break; - case state::DELETION_TIME_2: - read_64(data, state::DELETION_TIME_3); - break; - case state::DELETION_TIME_3: { - deletion_time del; - del.local_deletion_time = _u32; - del.marked_for_delete_at = _u64; - _consumer.consume_row_start(to_bytes_view(_key), del); - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _state = state::ATOM_START; + switch (_state) { + case state::ROW_START: + // read 2-byte key length into _u16 + if (read_16(data) != read_status::ready) { + _state = state::ROW_KEY_BYTES; break; } - case state::ATOM_START: - // TODO: use read_16() here too. have read_16 return true if read now. - if (data.size() >= sizeof(uint16_t)) { - _u16 = consume_be(data); - if (_u16 == 0) { - // end of row marker - _state = state::ROW_START; - if (_consumer.consume_row_end() == - row_consumer::proceed::no) { - return row_consumer::proceed::no; - } - } else { - _state = state::ATOM_NAME_BYTES; - } - } else { - std::copy(data.begin(), data.end(), _read_int.bytes); - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_U16; - _state = state::ATOM_START_2; - } + case state::ROW_KEY_BYTES: + // After previously reading 16-bit length, read key's bytes. + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::DELETION_TIME; break; - case state::ATOM_START_2: + } + case state::DELETION_TIME: + if (read_32(data) != read_status::ready) { + _state = state::DELETION_TIME_2; + break; + } + // fallthrough + case state::DELETION_TIME_2: + if (read_64(data) != read_status::ready) { + _state = state::DELETION_TIME_3; + break; + } + // fallthrough + case state::DELETION_TIME_3: { + deletion_time del; + del.local_deletion_time = _u32; + del.marked_for_delete_at = _u64; + _consumer.consume_row_start(to_bytes_view(_key), del); + // after calling the consume function, we can release the + // buffers we held for it. + _key.release(); + _state = state::ATOM_START; + } + case state::ATOM_START: + if (read_16(data) == read_status::ready) { if (_u16 == 0) { // end of row marker _state = state::ROW_START; @@ -354,105 +129,88 @@ public: } else { _state = state::ATOM_NAME_BYTES; } - break; - case state::ATOM_NAME_BYTES: - read_bytes(data, _u16, _key, state::ATOM_MASK); - break; - case state::ATOM_MASK: { - auto mask = consume_be(data); - enum mask_type { - DELETION_MASK = 0x01, - EXPIRATION_MASK = 0x02, - COUNTER_MASK = 0x04, - COUNTER_UPDATE_MASK = 0x08, - RANGE_TOMBSTONE_MASK = 0x10, - }; - if (mask & RANGE_TOMBSTONE_MASK) { - _state = state::RANGE_TOMBSTONE; - } else if (mask & COUNTER_MASK) { - // FIXME: see ColumnSerializer.java:deserializeColumnBody - throw malformed_sstable_exception("FIXME COUNTER_MASK"); - } else if (mask & EXPIRATION_MASK) { - _deleted = false; - _state = state::EXPIRING_CELL; - } else { - // FIXME: see ColumnSerializer.java:deserializeColumnBody - if (mask & COUNTER_UPDATE_MASK) { - throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK"); - } - _ttl = _expiration = 0; - _deleted = mask & DELETION_MASK; - _state = state::CELL; + } else { + _state = state::ATOM_START_2; + } + break; + case state::ATOM_START_2: + if (_u16 == 0) { + // end of row marker + _state = state::ROW_START; + if (_consumer.consume_row_end() == + row_consumer::proceed::no) { + return row_consumer::proceed::no; } + } else { + _state = state::ATOM_NAME_BYTES; + } + break; + case state::ATOM_NAME_BYTES: + if (read_bytes(data, _u16, _key) != read_status::ready) { + _state = state::ATOM_MASK; break; } - case state::EXPIRING_CELL: - if (data.size() >= sizeof(uint32_t) + sizeof(uint32_t)) { - _ttl = consume_be(data); - _expiration = consume_be(data); - _state = state::CELL; - } else { - read_32(data, state::EXPIRING_CELL_2); + case state::ATOM_MASK: { + auto mask = consume_be(data); + enum mask_type { + DELETION_MASK = 0x01, + EXPIRATION_MASK = 0x02, + COUNTER_MASK = 0x04, + COUNTER_UPDATE_MASK = 0x08, + RANGE_TOMBSTONE_MASK = 0x10, + }; + if (mask & RANGE_TOMBSTONE_MASK) { + _state = state::RANGE_TOMBSTONE; + } else if (mask & COUNTER_MASK) { + // FIXME: see ColumnSerializer.java:deserializeColumnBody + throw malformed_sstable_exception("FIXME COUNTER_MASK"); + } else if (mask & EXPIRATION_MASK) { + _deleted = false; + _state = state::EXPIRING_CELL; + } else { + // FIXME: see ColumnSerializer.java:deserializeColumnBody + if (mask & COUNTER_UPDATE_MASK) { + throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK"); } - break; - case state::EXPIRING_CELL_2: - _ttl = _u32; - read_32(data, state::EXPIRING_CELL_3); - break; - case state::EXPIRING_CELL_3: - _expiration = _u32; + _ttl = _expiration = 0; + _deleted = mask & DELETION_MASK; _state = state::CELL; + } + break; + } + case state::EXPIRING_CELL: + if (read_32(data) != read_status::ready) { + _state = state::EXPIRING_CELL_2; break; - case state::CELL: - if (data.size() >= sizeof(uint64_t) + sizeof(uint32_t)) { - _u64 = consume_be(data); - _u32 = consume_be(data); - _state = state::CELL_VALUE_BYTES; - } else { - read_64(data, state::CELL_2); - } + } + // fallthrough + case state::EXPIRING_CELL_2: + _ttl = _u32; + if (read_32(data) != read_status::ready) { + _state = state::EXPIRING_CELL_3; break; - case state::CELL_2: - read_32(data, state::CELL_VALUE_BYTES); + } + // fallthrough + case state::EXPIRING_CELL_3: + _expiration = _u32; + _state = state::CELL; + case state::CELL: { + if (read_64(data) != read_status::ready) { + _state = state::CELL_2; break; - case state::CELL_VALUE_BYTES: - // TODO: use read_bytes(data, _u32, _key, state::ATOM_START), but need to know if it was successful to decide on next state and on running consumer - if (data.size() >= _u32) { - // If the whole string is in our buffer, great, we don't - // need to copy, and can skip the CELL_VALUE_BYTES_2 state - _val = data.share(0, _u32); - data.trim_front(_u32); - // finally pass it to the consumer: - if (_deleted) { - if (_val.size() != 4) { - throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); - } - deletion_time del; - del.local_deletion_time = consume_be(_val); - del.marked_for_delete_at = _u64; - _consumer.consume_deleted_cell(to_bytes_view(_key), del); - } else { - _consumer.consume_cell(to_bytes_view(_key), - to_bytes_view(_val), _u64, _ttl, _expiration); - } - // after calling the consume function, we can release the - // buffers we held for it. - _key.release(); - _val.release(); - _state = state::ATOM_START; - } else { - // copy what we have so far, read the rest later - _read_bytes = temporary_buffer(_u32); - std::copy(data.begin(), data.end(), - _read_bytes.get_write()); - _read_bytes_where = &_val; - _pos = data.size(); - data.trim(0); - _prestate = prestate::READING_BYTES; - _state = state::CELL_VALUE_BYTES_2; - } + } + } + case state::CELL_2: + if (read_32(data) != read_status::ready) { + _state = state::CELL_VALUE_BYTES; break; - case state::CELL_VALUE_BYTES_2: + } + case state::CELL_VALUE_BYTES: + if (read_bytes(data, _u32, _val) == read_status::ready) { + // If the whole string is in our buffer, great, we don't + // need to copy, and can skip the CELL_VALUE_BYTES_2 state. + // + // finally pass it to the consumer: if (_deleted) { if (_val.size() != 4) { throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); @@ -470,39 +228,80 @@ public: _key.release(); _val.release(); _state = state::ATOM_START; - break; - case state::RANGE_TOMBSTONE: - read_16(data, state::RANGE_TOMBSTONE_2); - break; - case state::RANGE_TOMBSTONE_2: - // read the end column into _val. - read_bytes(data, _u16, _val, state::RANGE_TOMBSTONE_3); - break; - case state::RANGE_TOMBSTONE_3: - read_32(data, state::RANGE_TOMBSTONE_4); - break; - case state::RANGE_TOMBSTONE_4: - read_64(data, state::RANGE_TOMBSTONE_5); - break; - case state::RANGE_TOMBSTONE_5: - { + } else { + _state = state::CELL_VALUE_BYTES_2; + } + break; + case state::CELL_VALUE_BYTES_2: + if (_deleted) { + if (_val.size() != 4) { + throw malformed_sstable_exception("deleted cell expects local_deletion_time value"); + } deletion_time del; - del.local_deletion_time = _u32; + del.local_deletion_time = consume_be(_val); del.marked_for_delete_at = _u64; - _consumer.consume_range_tombstone(to_bytes_view(_key), - to_bytes_view(_val), del); - _key.release(); - _val.release(); - _state = state::ATOM_START; + _consumer.consume_deleted_cell(to_bytes_view(_key), del); + } else { + _consumer.consume_cell(to_bytes_view(_key), + to_bytes_view(_val), _u64, _ttl, _expiration); + } + // after calling the consume function, we can release the + // buffers we held for it. + _key.release(); + _val.release(); + _state = state::ATOM_START; + break; + case state::RANGE_TOMBSTONE: + if (read_16(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_2; break; } - - default: - throw malformed_sstable_exception("unknown state"); + case state::RANGE_TOMBSTONE_2: + // read the end column into _val. + if (read_bytes(data, _u16, _val) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_3; + break; } + case state::RANGE_TOMBSTONE_3: + if (read_32(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_4; + break; + } + case state::RANGE_TOMBSTONE_4: + if (read_64(data) != read_status::ready) { + _state = state::RANGE_TOMBSTONE_5; + break; + } + case state::RANGE_TOMBSTONE_5: + { + deletion_time del; + del.local_deletion_time = _u32; + del.marked_for_delete_at = _u64; + _consumer.consume_range_tombstone(to_bytes_view(_key), + to_bytes_view(_val), del); + _key.release(); + _val.release(); + _state = state::ATOM_START; + break; } + default: + throw malformed_sstable_exception("unknown state"); + } + return row_consumer::proceed::yes; } + + data_consume_rows_context(row_consumer& consumer, + input_stream && input, uint64_t maxlen) : + continuous_data_consumer(*this, std::move(input), maxlen) + , _consumer(consumer) { + } + + void verify_end_state() { + if (_state != state::ROW_START || _prestate != prestate::NONE) { + throw malformed_sstable_exception("end of input, but not end of row"); + } + } }; // data_consume_rows() and data_consume_rows_at_once() both can read just a diff --git a/sstables/row.hh b/sstables/row.hh index 74865e1cf0..df45671948 100644 --- a/sstables/row.hh +++ b/sstables/row.hh @@ -8,6 +8,7 @@ #include "bytes.hh" #include "key.hh" #include "core/temporary_buffer.hh" +#include "consumer.hh" // sstables::data_consume_row feeds the contents of a single row into a // row_consumer object: @@ -28,7 +29,7 @@ // is called.] class row_consumer { public: - enum class proceed { yes, no }; + using proceed = data_consumer::proceed; // Consume the row's key and deletion_time. The latter determines if the // row is a tombstone, and if so, when it has been deleted. diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 59a33c0f21..142b7719a6 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -19,6 +19,7 @@ #include "sstables.hh" #include "compress.hh" #include "unimplemented.hh" +#include "index_reader.hh" #include #include #include @@ -110,12 +111,6 @@ static typename Map::key_type reverse_map(const typename Map::mapped_type& value throw std::out_of_range("unable to reverse map"); } -struct bufsize_mismatch_exception : malformed_sstable_exception { - bufsize_mismatch_exception(size_t size, size_t expected) : - malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected)) - {} -}; - // This should be used every time we use read_exactly directly. // // read_exactly is a lot more convenient of an interface to use, because we'll @@ -762,56 +757,12 @@ future sstable::read_indexes(uint64_t summary_idx) { estimated_size = std::min(uint64_t(sstable_buffer_size), align_up(estimated_size, uint64_t(8 << 10))); - struct reader { - uint64_t count = 0; - std::vector indexes; - shared_file_random_access_reader stream; - reader(file f, uint64_t quantity, uint64_t estimated_size) : stream(f, estimated_size) { indexes.reserve(quantity); } - }; - - auto r = make_lw_shared(_index_file, quantity, estimated_size); - - r->stream.seek(position); - - auto end = [r, quantity] { return r->count >= quantity; }; - - return do_until(end, [this, r] { - r->indexes.emplace_back(); - auto fut = parse(r->stream, r->indexes.back()); - return std::move(fut).then_wrapped([this, r] (future<> f) mutable { - try { - f.get(); - r->count++; - } catch (bufsize_mismatch_exception &e) { - // We have optimistically emplaced back one element of the - // vector. If we have failed to parse, we should remove it - // so size() gives us the right picture. - r->indexes.pop_back(); - - // FIXME: If the file ends at an index boundary, there is - // no problem. Essentially, we can't know how many indexes - // are in a sampling group, so there isn't really any way - // to know, other than reading. - // - // If, however, we end in the middle of an index, this is a - // corrupted file. This code is not perfect because we only - // know that an exception happened, and it happened due to - // eof. We don't really know if eof happened at the index - // boundary. To know that, we would have to keep track of - // the real position of the stream (including what's - // already in the buffer) before we start to read the - // index, and after. We won't go through such complexity at - // the moment. - if (r->stream.eof()) { - r->count = std::numeric_limitscount)>::type>::max(); - } else { - throw e; - } - } - return make_ready_future<>(); + return do_with(index_consumer(quantity), [this, position, estimated_size] (index_consumer& ic) { + auto stream = make_file_input_stream(this->_index_file, position, estimated_size); + auto ctx = make_lw_shared(ic, std::move(stream), this->index_size() - position); + return ctx->consume_input(*ctx).then([ctx, &ic] { + return make_ready_future(std::move(ic.indexes)); }); - }).then([r] { - return make_ready_future(std::move(r->indexes)); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 69e53068cd..5a5fa35674 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -27,6 +27,7 @@ #include "writer.hh" #include "metadata_collector.hh" #include "filter.hh" +#include "exceptions.hh" namespace sstables { @@ -89,15 +90,6 @@ public: class key; -class malformed_sstable_exception : public std::exception { - sstring _msg; -public: - malformed_sstable_exception(sstring s) : _msg(s) {} - const char *what() const noexcept { - return _msg.c_str(); - } -}; - using index_list = std::vector; class sstable {