Merge "Improve read index performance further" from Glauber

"This patch improves the read_indexes performance by an extra 16 %.
The total gain so far is now 98 %, and although there are still things
I believe we can do to improve it further, I consider a 2-fold increase
sufficient to declare Issue #94 fixed.

So:

Fixes #94

The speed up is achieved by converting the reader to the NSM. To do that, I had
to commonize most parts of the NSM. I had attempted this before, and for this
new cycle, I had a new tool to aid me in this task: the sstable performance
microbenchmark.

Every change to the NSM was individually tested to make sure the performance
of the read path was not regressing. When it did regress, I took alternate
approaches and tried my best to discuss the whys in the changelogs, with
the appropriate result.

So I can be quite confident in affirming that we are not taking any drop
here, while read_index performance is increased significantly"
This commit is contained in:
Avi Kivity
2015-08-29 11:28:03 +03:00
7 changed files with 583 additions and 449 deletions

251
sstables/consumer.hh Normal file
View File

@@ -0,0 +1,251 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*
*/
#pragma once
#include <net/byteorder.hh>
#include "core/future.hh"
#include "core/iostream.hh"
#include "sstables/exceptions.hh"
template<typename T>
static inline T consume_be(temporary_buffer<char>& p) {
T i = net::ntoh(*unaligned_cast<const T*>(p.get()));
p.trim_front(sizeof(T));
return i;
}
namespace data_consumer {
enum class proceed { yes, no };
template <typename StateProcessor>
class continuous_data_consumer {
using proceed = data_consumer::proceed;
StateProcessor& _state_processor;
protected:
input_stream<char> _input;
// remaining length of input to read (if <0, continue until end of file).
int64_t _remain;
// state machine progress:
enum class prestate {
NONE,
READING_U16,
READING_U32,
READING_U64,
READING_BYTES,
} _prestate = prestate::NONE;
// state for non-NONE prestates
uint32_t _pos;
// state for READING_U16, READING_U32, READING_U64 prestate
uint16_t _u16;
uint32_t _u32;
uint64_t _u64;
union {
char bytes[sizeof(uint64_t)];
uint64_t uint64;
uint32_t uint32;
uint16_t uint16;
} _read_int;
// state for READING_BYTES prestate
temporary_buffer<char> _read_bytes;
temporary_buffer<char>* _read_bytes_where; // which temporary_buffer to set, _key or _val?
enum class read_status { ready, waiting };
// Read a 16-bit integer into _u16. If the whole thing is in the buffer
// (this is the common case), do this immediately. Otherwise, remember
// what we have in the buffer, and remember to continue later by using
// a "prestate":
inline read_status read_16(temporary_buffer<char>& data) {
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
return read_status::ready;
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U16;
return read_status::waiting;
}
}
inline read_status read_32(temporary_buffer<char>& data) {
if (data.size() >= sizeof(uint32_t)) {
_u32 = consume_be<uint32_t>(data);
return read_status::ready;
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U32;
return read_status::waiting;
}
}
inline read_status read_64(temporary_buffer<char>& data) {
if (data.size() >= sizeof(uint64_t)) {
_u64 = consume_be<uint64_t>(data);
return read_status::ready;
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U64;
return read_status::waiting;
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
where = data.share(0, len);
data.trim_front(len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes = temporary_buffer<char>(len);
std::copy(data.begin(), data.end(),_read_bytes.get_write());
_read_bytes_where = &where;
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_BYTES;
return read_status::waiting;
}
}
inline void process_buffer(temporary_buffer<char>& data) {
while (__builtin_expect((_prestate != prestate::NONE), 0)) {
do_process_buffer(data);
}
}
private:
// This is separated so that the compiler can inline "process_buffer". Because this chunk is too big,
// it usually won't if this is part of the main function
void do_process_buffer(temporary_buffer<char>& data) {
// We're in the middle of reading a basic type, which crossed
// an input buffer. Resume that read before continuing to
// handle the current state:
if (_prestate == prestate::READING_BYTES) {
auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy(data.begin(), data.begin() + n,
_read_bytes.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
*_read_bytes_where = std::move(_read_bytes);
_prestate = prestate::NONE;
}
} else {
// in the middle of reading an integer
unsigned len;
switch (_prestate) {
case prestate::READING_U16:
len = sizeof(uint16_t);
break;
case prestate::READING_U32:
len = sizeof(uint32_t);
break;
case prestate::READING_U64:
len = sizeof(uint64_t);
break;
default:
throw sstables::malformed_sstable_exception("unknown prestate");
}
assert(_pos < len);
auto n = std::min((size_t)(len - _pos), data.size());
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
data.trim_front(n);
_pos += n;
if (_pos == len) {
// done reading the integer, store it in _u16, _u32 or _u64:
switch (_prestate) {
case prestate::READING_U16:
_u16 = net::ntoh(_read_int.uint16);
break;
case prestate::READING_U32:
_u32 = net::ntoh(_read_int.uint32);
break;
case prestate::READING_U64:
_u64 = net::ntoh(_read_int.uint64);
break;
default:
throw sstables::malformed_sstable_exception(
"unknown prestate");
}
_prestate = prestate::NONE;
}
}
}
void verify_end_state() {
_state_processor.verify_end_state();
}
public:
continuous_data_consumer(StateProcessor& state_processor, input_stream<char>&& input, uint64_t maxlen)
: _state_processor(state_processor), _input(std::move(input)), _remain(maxlen) {}
continuous_data_consumer(continuous_data_consumer&&) = delete;
continuous_data_consumer(const continuous_data_consumer&) = default;
template<typename Consumer>
future<> consume_input(Consumer& c) {
return _input.consume(c);
}
// some states do not consume input (its only exists to perform some
// action when finishing to read a primitive type via a prestate, in
// the rare case that a primitive type crossed a buffer). Such
// non-consuming states need to run even if the data buffer is empty.
bool non_consuming() {
return _state_processor.non_consuming();
}
inline proceed process(temporary_buffer<char>& data) {
while (data || non_consuming()) {
process_buffer(data);
auto ret = _state_processor.process_state(data);
if (__builtin_expect(ret == proceed::no, 0)) {
return ret;
}
}
return proceed::yes;
}
using unconsumed_remainder = input_stream<char>::unconsumed_remainder;
// called by input_stream::consume():
future<unconsumed_remainder>
operator()(temporary_buffer<char> data) {
if (_remain >= 0 && data.size() >= (uint64_t)_remain) {
// We received more data than we actually care about, so process
// the beginning of the buffer, and return the rest to the stream
auto segment = data.share(0, _remain);
process(segment);
data.trim_front(_remain - segment.size());
_remain -= (_remain - segment.size());
if (_remain == 0) {
verify_end_state();
}
return make_ready_future<unconsumed_remainder>(std::move(data));
} else if (data.empty()) {
// End of file
verify_end_state();
return make_ready_future<unconsumed_remainder>(std::move(data));
} else {
// We can process the entire buffer (if the consumer wants to).
auto orig_data_size = data.size();
if (process(data) == proceed::yes) {
assert(data.size() == 0);
if (_remain >= 0) {
_remain -= orig_data_size;
}
return make_ready_future<unconsumed_remainder>();
} else {
if (_remain >= 0) {
_remain -= orig_data_size - data.size();
}
return make_ready_future<unconsumed_remainder>(std::move(data));
}
}
}
};
}

22
sstables/exceptions.hh Normal file
View File

@@ -0,0 +1,22 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*
*/
#pragma once
namespace sstables {
class malformed_sstable_exception : public std::exception {
sstring _msg;
public:
malformed_sstable_exception(sstring s) : _msg(s) {}
const char *what() const noexcept {
return _msg.c_str();
}
};
struct bufsize_mismatch_exception : malformed_sstable_exception {
bufsize_mismatch_exception(size_t size, size_t expected) :
malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected))
{}
};
}

118
sstables/index_reader.hh Normal file
View File

@@ -0,0 +1,118 @@
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include "sstables.hh"
#include "consumer.hh"
namespace sstables {
class index_consumer {
uint64_t max_quantity;
public:
index_list indexes;
index_consumer(uint64_t q) : max_quantity(q) {
indexes.reserve(q);
}
bool should_continue() {
return indexes.size() < max_quantity;
}
void consume_entry(index_entry&& ie) {
indexes.push_back(std::move(ie));
}
};
class index_consume_entry_context: public data_consumer::continuous_data_consumer<index_consume_entry_context> {
using proceed = data_consumer::proceed;
private:
index_consumer& _consumer;
enum class state {
START,
KEY_SIZE,
KEY_BYTES,
POSITION,
PROMOTED_SIZE,
PROMOTED_BYTES,
CONSUME_ENTRY,
} _state = state::START;
temporary_buffer<char> _key;
temporary_buffer<char> _promoted;
static inline bytes to_bytes(temporary_buffer<char>& b) {
using byte = bytes_view::value_type;
auto s = bytes(reinterpret_cast<const byte*>(b.get()), b.size());
b.release();
return s;
}
public:
void verify_end_state() {
}
bool non_consuming() const {
return ((_state == state::CONSUME_ENTRY) || (_state == state::START) ||
((_state == state::PROMOTED_BYTES) && (_prestate == prestate::NONE)));
}
proceed process_state(temporary_buffer<char>& data) {
switch (_state) {
// START comes first, to make the handling of the 0-quantity case simpler
case state::START:
if (!_consumer.should_continue()) {
return proceed::no;
}
_state = state::KEY_SIZE;
break;
case state::KEY_SIZE:
if (read_16(data) != read_status::ready) {
_state = state::KEY_BYTES;
break;
}
case state::KEY_BYTES:
if (read_bytes(data, _u16, _key) != read_status::ready) {
_state = state::POSITION;
break;
}
case state::POSITION:
if (read_64(data) != read_status::ready) {
_state = state::PROMOTED_SIZE;
break;
}
case state::PROMOTED_SIZE:
if (read_32(data) != read_status::ready) {
_state = state::PROMOTED_BYTES;
break;
}
case state::PROMOTED_BYTES:
if (read_bytes(data, _u32, _promoted) != read_status::ready) {
_state = state::CONSUME_ENTRY;
break;
}
case state::CONSUME_ENTRY: {
index_entry ie;
ie.key.value = to_bytes(_key);
ie.position = _u64;
ie.promoted_index.value = to_bytes(_promoted);
_consumer.consume_entry(std::move(ie));
_state = state::START;
break;
}
default:
throw malformed_sstable_exception("unknown state");
}
return proceed::yes;
}
index_consume_entry_context(index_consumer& consumer,
input_stream<char>&& input, uint64_t maxlen)
: continuous_data_consumer(*this, std::move(input), maxlen)
, _consumer(consumer)
{}
};
}

View File

@@ -3,33 +3,14 @@
*/
#include "sstables.hh"
template<typename T>
static inline T consume_be(temporary_buffer<char>& p) {
T i = net::ntoh(*unaligned_cast<const T*>(p.get()));
p.trim_front(sizeof(T));
return i;
}
#include "consumer.hh"
namespace sstables {
// data_consume_rows_context remembers the context that an ongoing
// data_consume_rows() future is in.
class data_consume_rows_context {
class data_consume_rows_context : public data_consumer::continuous_data_consumer<data_consume_rows_context> {
private:
row_consumer& _consumer;
input_stream<char> _input;
// remaining length of input to read (if <0, continue until end of file).
int64_t _remain;
// state machine progress:
enum class prestate {
NONE,
READING_U16,
READING_U32,
READING_U64,
READING_BYTES,
} _prestate = prestate::NONE;
enum class state {
ROW_START,
ROW_KEY_BYTES,
@@ -53,31 +34,8 @@ private:
RANGE_TOMBSTONE_4,
RANGE_TOMBSTONE_5,
} _state = state::ROW_START;
// some states do not consume input (its only exists to perform some
// action when finishing to read a primitive type via a prestate, in
// the rare case that a primitive type crossed a buffer). Such
// non-consuming states need to run even if the data buffer is empty.
static bool non_consuming(state s, prestate ps) {
return (((s == state::DELETION_TIME_3)
|| (s == state::CELL_VALUE_BYTES_2)
|| (s == state::ATOM_START_2)
|| (s == state::EXPIRING_CELL_3)) && (ps == prestate::NONE));
}
// state for non-NONE prestates
uint32_t _pos;
// state for READING_U16, READING_U32, READING_U64 prestate
uint16_t _u16;
uint32_t _u32;
uint64_t _u64;
union {
char bytes[sizeof(uint64_t)];
uint64_t uint64;
uint32_t uint32;
uint16_t uint16;
} _read_int;
// state for READING_BYTES prestate
temporary_buffer<char> _read_bytes;
temporary_buffer<char>* _read_bytes_where; // which temporary_buffer to set, _key or _val?
row_consumer& _consumer;
temporary_buffer<char> _key;
temporary_buffer<char> _val;
@@ -94,119 +52,19 @@ private:
}
public:
data_consume_rows_context(row_consumer& consumer,
input_stream<char> && input, uint64_t maxlen) :
_consumer(consumer), _input(std::move(input)), _remain(maxlen) {
}
template<typename Consumer>
future<> consume_input(Consumer& c) {
return _input.consume(c);
bool non_consuming() const {
return (((_state == state::DELETION_TIME_3)
|| (_state == state::CELL_VALUE_BYTES_2)
|| (_state == state::ATOM_START_2)
|| (_state == state::EXPIRING_CELL_3)) && (_prestate == prestate::NONE));
}
void verify_end_state() {
if (_state != state::ROW_START || _prestate != prestate::NONE) {
throw malformed_sstable_exception("end of input, but not end of row");
}
}
using unconsumed_remainder = input_stream<char>::unconsumed_remainder;
// called by input_stream::consume():
future<unconsumed_remainder>
operator()(temporary_buffer<char> data) {
if (_remain >= 0 && data.size() >= (uint64_t)_remain) {
// We received more data than we actually care about, so process
// the beginning of the buffer, and return the rest to the stream
auto segment = data.share(0, _remain);
process(segment);
data.trim_front(_remain - segment.size());
_remain -= (_remain - segment.size());
if (_remain == 0) {
verify_end_state();
}
return make_ready_future<unconsumed_remainder>(std::move(data));
} else if (data.empty()) {
// End of file
verify_end_state();
return make_ready_future<unconsumed_remainder>(std::move(data));
} else {
// We can process the entire buffer (if the consumer wants to).
auto orig_data_size = data.size();
if (process(data) == row_consumer::proceed::yes) {
assert(data.size() == 0);
if (_remain >= 0) {
_remain -= orig_data_size;
}
return make_ready_future<unconsumed_remainder>();
} else {
if (_remain >= 0) {
_remain -= orig_data_size - data.size();
}
return make_ready_future<unconsumed_remainder>(std::move(data));
}
}
}
private:
// Read a 16-bit integer into _u16. If the whole thing is in the buffer
// (this is the common case), do this immediately. Otherwise, remember
// what we have in the buffer, and remember to continue later by using
// a "prestate":
inline void read_16(temporary_buffer<char>& data, state next_state) {
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U16;
}
_state = next_state;
}
inline void read_32(temporary_buffer<char>& data, state next_state) {
if (data.size() >= sizeof(uint32_t)) {
_u32 = consume_be<uint32_t>(data);
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U32;
}
_state = next_state;
}
inline void read_64(temporary_buffer<char>& data, state next_state) {
if (data.size() >= sizeof(uint64_t)) {
_u64 = consume_be<uint64_t>(data);
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U64;
}
_state = next_state;
}
inline void read_bytes(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where, state next_state) {
if (data.size() >= len) {
where = data.share(0, len);
data.trim_front(len);
} else {
// copy what we have so far, read the rest later
_read_bytes = temporary_buffer<char>(_u16);
std::copy(data.begin(), data.end(),_read_bytes.get_write());
_read_bytes_where = &where;
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_BYTES;
}
_state = next_state;
}
public:
// process() feeds the given data into the state machine.
// The consumer may request at any point (e.g., after reading a whole
// row) to stop the processing, in which case we trim the buffer to
// leave only the unprocessed part. The caller must handle calling
// process() again, and/or refilling the buffer, as needed.
row_consumer::proceed process(temporary_buffer<char>& data) {
row_consumer::proceed process_state(temporary_buffer<char>& data) {
#if 0
// Testing hack: call process() for tiny chunks separately, to verify
// that primitive types crossing input buffer are handled correctly.
@@ -224,126 +82,43 @@ public:
return row_consumer::proceed::yes;
}
#endif
while (data || non_consuming(_state, _prestate)) {
if (_prestate != prestate::NONE) {
// We're in the middle of reading a basic type, which crossed
// an input buffer. Resume that read before continuing to
// handle the current state:
if (_prestate == prestate::READING_BYTES) {
auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy(data.begin(), data.begin() + n,
_read_bytes.get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
*_read_bytes_where = std::move(_read_bytes);
_prestate = prestate::NONE;
}
} else {
// in the middle of reading an integer
unsigned len;
switch (_prestate) {
case prestate::READING_U16:
len = sizeof(uint16_t);
break;
case prestate::READING_U32:
len = sizeof(uint32_t);
break;
case prestate::READING_U64:
len = sizeof(uint64_t);
break;
default:
throw malformed_sstable_exception("unknown prestate");
}
assert(_pos < len);
auto n = std::min((size_t)(len - _pos), data.size());
std::copy(data.begin(), data.begin() + n, _read_int.bytes + _pos);
data.trim_front(n);
_pos += n;
if (_pos == len) {
// done reading the integer, store it in _u16, _u32 or _u64:
switch (_prestate) {
case prestate::READING_U16:
_u16 = net::ntoh(_read_int.uint16);
break;
case prestate::READING_U32:
_u32 = net::ntoh(_read_int.uint32);
break;
case prestate::READING_U64:
_u64 = net::ntoh(_read_int.uint64);
break;
default:
throw malformed_sstable_exception(
"unknown prestate");
}
_prestate = prestate::NONE;
}
}
continue;
}
switch (_state) {
case state::ROW_START:
// read 2-byte key length into _u16
read_16(data, state::ROW_KEY_BYTES);
break;
case state::ROW_KEY_BYTES:
// After previously reading 16-bit length, read key's bytes.
read_bytes(data, _u16, _key, state::DELETION_TIME);
break;
case state::DELETION_TIME:
if (data.size() >= sizeof(uint32_t) + sizeof(uint64_t)) {
// If we can read the entire deletion time at once, we can
// skip the DELETION_TIME_2 and DELETION_TIME_3 states.
deletion_time del;
del.local_deletion_time = consume_be<uint32_t>(data);
del.marked_for_delete_at = consume_be<uint64_t>(data);
_consumer.consume_row_start(to_bytes_view(_key), del);
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_state = state::ATOM_START;
} else {
read_32(data, state::DELETION_TIME_2);
}
break;
case state::DELETION_TIME_2:
read_64(data, state::DELETION_TIME_3);
break;
case state::DELETION_TIME_3: {
deletion_time del;
del.local_deletion_time = _u32;
del.marked_for_delete_at = _u64;
_consumer.consume_row_start(to_bytes_view(_key), del);
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_state = state::ATOM_START;
switch (_state) {
case state::ROW_START:
// read 2-byte key length into _u16
if (read_16(data) != read_status::ready) {
_state = state::ROW_KEY_BYTES;
break;
}
case state::ATOM_START:
// TODO: use read_16() here too. have read_16 return true if read now.
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
if (_u16 == 0) {
// end of row marker
_state = state::ROW_START;
if (_consumer.consume_row_end() ==
row_consumer::proceed::no) {
return row_consumer::proceed::no;
}
} else {
_state = state::ATOM_NAME_BYTES;
}
} else {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_U16;
_state = state::ATOM_START_2;
}
case state::ROW_KEY_BYTES:
// After previously reading 16-bit length, read key's bytes.
if (read_bytes(data, _u16, _key) != read_status::ready) {
_state = state::DELETION_TIME;
break;
case state::ATOM_START_2:
}
case state::DELETION_TIME:
if (read_32(data) != read_status::ready) {
_state = state::DELETION_TIME_2;
break;
}
// fallthrough
case state::DELETION_TIME_2:
if (read_64(data) != read_status::ready) {
_state = state::DELETION_TIME_3;
break;
}
// fallthrough
case state::DELETION_TIME_3: {
deletion_time del;
del.local_deletion_time = _u32;
del.marked_for_delete_at = _u64;
_consumer.consume_row_start(to_bytes_view(_key), del);
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_state = state::ATOM_START;
}
case state::ATOM_START:
if (read_16(data) == read_status::ready) {
if (_u16 == 0) {
// end of row marker
_state = state::ROW_START;
@@ -354,105 +129,88 @@ public:
} else {
_state = state::ATOM_NAME_BYTES;
}
break;
case state::ATOM_NAME_BYTES:
read_bytes(data, _u16, _key, state::ATOM_MASK);
break;
case state::ATOM_MASK: {
auto mask = consume_be<uint8_t>(data);
enum mask_type {
DELETION_MASK = 0x01,
EXPIRATION_MASK = 0x02,
COUNTER_MASK = 0x04,
COUNTER_UPDATE_MASK = 0x08,
RANGE_TOMBSTONE_MASK = 0x10,
};
if (mask & RANGE_TOMBSTONE_MASK) {
_state = state::RANGE_TOMBSTONE;
} else if (mask & COUNTER_MASK) {
// FIXME: see ColumnSerializer.java:deserializeColumnBody
throw malformed_sstable_exception("FIXME COUNTER_MASK");
} else if (mask & EXPIRATION_MASK) {
_deleted = false;
_state = state::EXPIRING_CELL;
} else {
// FIXME: see ColumnSerializer.java:deserializeColumnBody
if (mask & COUNTER_UPDATE_MASK) {
throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK");
}
_ttl = _expiration = 0;
_deleted = mask & DELETION_MASK;
_state = state::CELL;
} else {
_state = state::ATOM_START_2;
}
break;
case state::ATOM_START_2:
if (_u16 == 0) {
// end of row marker
_state = state::ROW_START;
if (_consumer.consume_row_end() ==
row_consumer::proceed::no) {
return row_consumer::proceed::no;
}
} else {
_state = state::ATOM_NAME_BYTES;
}
break;
case state::ATOM_NAME_BYTES:
if (read_bytes(data, _u16, _key) != read_status::ready) {
_state = state::ATOM_MASK;
break;
}
case state::EXPIRING_CELL:
if (data.size() >= sizeof(uint32_t) + sizeof(uint32_t)) {
_ttl = consume_be<uint32_t>(data);
_expiration = consume_be<uint32_t>(data);
_state = state::CELL;
} else {
read_32(data, state::EXPIRING_CELL_2);
case state::ATOM_MASK: {
auto mask = consume_be<uint8_t>(data);
enum mask_type {
DELETION_MASK = 0x01,
EXPIRATION_MASK = 0x02,
COUNTER_MASK = 0x04,
COUNTER_UPDATE_MASK = 0x08,
RANGE_TOMBSTONE_MASK = 0x10,
};
if (mask & RANGE_TOMBSTONE_MASK) {
_state = state::RANGE_TOMBSTONE;
} else if (mask & COUNTER_MASK) {
// FIXME: see ColumnSerializer.java:deserializeColumnBody
throw malformed_sstable_exception("FIXME COUNTER_MASK");
} else if (mask & EXPIRATION_MASK) {
_deleted = false;
_state = state::EXPIRING_CELL;
} else {
// FIXME: see ColumnSerializer.java:deserializeColumnBody
if (mask & COUNTER_UPDATE_MASK) {
throw malformed_sstable_exception("FIXME COUNTER_UPDATE_MASK");
}
break;
case state::EXPIRING_CELL_2:
_ttl = _u32;
read_32(data, state::EXPIRING_CELL_3);
break;
case state::EXPIRING_CELL_3:
_expiration = _u32;
_ttl = _expiration = 0;
_deleted = mask & DELETION_MASK;
_state = state::CELL;
}
break;
}
case state::EXPIRING_CELL:
if (read_32(data) != read_status::ready) {
_state = state::EXPIRING_CELL_2;
break;
case state::CELL:
if (data.size() >= sizeof(uint64_t) + sizeof(uint32_t)) {
_u64 = consume_be<uint64_t>(data);
_u32 = consume_be<uint32_t>(data);
_state = state::CELL_VALUE_BYTES;
} else {
read_64(data, state::CELL_2);
}
}
// fallthrough
case state::EXPIRING_CELL_2:
_ttl = _u32;
if (read_32(data) != read_status::ready) {
_state = state::EXPIRING_CELL_3;
break;
case state::CELL_2:
read_32(data, state::CELL_VALUE_BYTES);
}
// fallthrough
case state::EXPIRING_CELL_3:
_expiration = _u32;
_state = state::CELL;
case state::CELL: {
if (read_64(data) != read_status::ready) {
_state = state::CELL_2;
break;
case state::CELL_VALUE_BYTES:
// TODO: use read_bytes(data, _u32, _key, state::ATOM_START), but need to know if it was successful to decide on next state and on running consumer
if (data.size() >= _u32) {
// If the whole string is in our buffer, great, we don't
// need to copy, and can skip the CELL_VALUE_BYTES_2 state
_val = data.share(0, _u32);
data.trim_front(_u32);
// finally pass it to the consumer:
if (_deleted) {
if (_val.size() != 4) {
throw malformed_sstable_exception("deleted cell expects local_deletion_time value");
}
deletion_time del;
del.local_deletion_time = consume_be<uint32_t>(_val);
del.marked_for_delete_at = _u64;
_consumer.consume_deleted_cell(to_bytes_view(_key), del);
} else {
_consumer.consume_cell(to_bytes_view(_key),
to_bytes_view(_val), _u64, _ttl, _expiration);
}
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_val.release();
_state = state::ATOM_START;
} else {
// copy what we have so far, read the rest later
_read_bytes = temporary_buffer<char>(_u32);
std::copy(data.begin(), data.end(),
_read_bytes.get_write());
_read_bytes_where = &_val;
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_BYTES;
_state = state::CELL_VALUE_BYTES_2;
}
}
}
case state::CELL_2:
if (read_32(data) != read_status::ready) {
_state = state::CELL_VALUE_BYTES;
break;
case state::CELL_VALUE_BYTES_2:
}
case state::CELL_VALUE_BYTES:
if (read_bytes(data, _u32, _val) == read_status::ready) {
// If the whole string is in our buffer, great, we don't
// need to copy, and can skip the CELL_VALUE_BYTES_2 state.
//
// finally pass it to the consumer:
if (_deleted) {
if (_val.size() != 4) {
throw malformed_sstable_exception("deleted cell expects local_deletion_time value");
@@ -470,39 +228,80 @@ public:
_key.release();
_val.release();
_state = state::ATOM_START;
break;
case state::RANGE_TOMBSTONE:
read_16(data, state::RANGE_TOMBSTONE_2);
break;
case state::RANGE_TOMBSTONE_2:
// read the end column into _val.
read_bytes(data, _u16, _val, state::RANGE_TOMBSTONE_3);
break;
case state::RANGE_TOMBSTONE_3:
read_32(data, state::RANGE_TOMBSTONE_4);
break;
case state::RANGE_TOMBSTONE_4:
read_64(data, state::RANGE_TOMBSTONE_5);
break;
case state::RANGE_TOMBSTONE_5:
{
} else {
_state = state::CELL_VALUE_BYTES_2;
}
break;
case state::CELL_VALUE_BYTES_2:
if (_deleted) {
if (_val.size() != 4) {
throw malformed_sstable_exception("deleted cell expects local_deletion_time value");
}
deletion_time del;
del.local_deletion_time = _u32;
del.local_deletion_time = consume_be<uint32_t>(_val);
del.marked_for_delete_at = _u64;
_consumer.consume_range_tombstone(to_bytes_view(_key),
to_bytes_view(_val), del);
_key.release();
_val.release();
_state = state::ATOM_START;
_consumer.consume_deleted_cell(to_bytes_view(_key), del);
} else {
_consumer.consume_cell(to_bytes_view(_key),
to_bytes_view(_val), _u64, _ttl, _expiration);
}
// after calling the consume function, we can release the
// buffers we held for it.
_key.release();
_val.release();
_state = state::ATOM_START;
break;
case state::RANGE_TOMBSTONE:
if (read_16(data) != read_status::ready) {
_state = state::RANGE_TOMBSTONE_2;
break;
}
default:
throw malformed_sstable_exception("unknown state");
case state::RANGE_TOMBSTONE_2:
// read the end column into _val.
if (read_bytes(data, _u16, _val) != read_status::ready) {
_state = state::RANGE_TOMBSTONE_3;
break;
}
case state::RANGE_TOMBSTONE_3:
if (read_32(data) != read_status::ready) {
_state = state::RANGE_TOMBSTONE_4;
break;
}
case state::RANGE_TOMBSTONE_4:
if (read_64(data) != read_status::ready) {
_state = state::RANGE_TOMBSTONE_5;
break;
}
case state::RANGE_TOMBSTONE_5:
{
deletion_time del;
del.local_deletion_time = _u32;
del.marked_for_delete_at = _u64;
_consumer.consume_range_tombstone(to_bytes_view(_key),
to_bytes_view(_val), del);
_key.release();
_val.release();
_state = state::ATOM_START;
break;
}
default:
throw malformed_sstable_exception("unknown state");
}
return row_consumer::proceed::yes;
}
data_consume_rows_context(row_consumer& consumer,
input_stream<char> && input, uint64_t maxlen) :
continuous_data_consumer(*this, std::move(input), maxlen)
, _consumer(consumer) {
}
void verify_end_state() {
if (_state != state::ROW_START || _prestate != prestate::NONE) {
throw malformed_sstable_exception("end of input, but not end of row");
}
}
};
// data_consume_rows() and data_consume_rows_at_once() both can read just a

View File

@@ -8,6 +8,7 @@
#include "bytes.hh"
#include "key.hh"
#include "core/temporary_buffer.hh"
#include "consumer.hh"
// sstables::data_consume_row feeds the contents of a single row into a
// row_consumer object:
@@ -28,7 +29,7 @@
// is called.]
class row_consumer {
public:
enum class proceed { yes, no };
using proceed = data_consumer::proceed;
// Consume the row's key and deletion_time. The latter determines if the
// row is a tombstone, and if so, when it has been deleted.

View File

@@ -19,6 +19,7 @@
#include "sstables.hh"
#include "compress.hh"
#include "unimplemented.hh"
#include "index_reader.hh"
#include <boost/algorithm/string.hpp>
#include <regex>
#include <core/align.hh>
@@ -110,12 +111,6 @@ static typename Map::key_type reverse_map(const typename Map::mapped_type& value
throw std::out_of_range("unable to reverse map");
}
struct bufsize_mismatch_exception : malformed_sstable_exception {
bufsize_mismatch_exception(size_t size, size_t expected) :
malformed_sstable_exception(sprint("Buffer improperly sized to hold requested data. Got: %ld. Expected: %ld", size, expected))
{}
};
// This should be used every time we use read_exactly directly.
//
// read_exactly is a lot more convenient of an interface to use, because we'll
@@ -762,56 +757,12 @@ future<index_list> sstable::read_indexes(uint64_t summary_idx) {
estimated_size = std::min(uint64_t(sstable_buffer_size), align_up(estimated_size, uint64_t(8 << 10)));
struct reader {
uint64_t count = 0;
std::vector<index_entry> indexes;
shared_file_random_access_reader stream;
reader(file f, uint64_t quantity, uint64_t estimated_size) : stream(f, estimated_size) { indexes.reserve(quantity); }
};
auto r = make_lw_shared<reader>(_index_file, quantity, estimated_size);
r->stream.seek(position);
auto end = [r, quantity] { return r->count >= quantity; };
return do_until(end, [this, r] {
r->indexes.emplace_back();
auto fut = parse(r->stream, r->indexes.back());
return std::move(fut).then_wrapped([this, r] (future<> f) mutable {
try {
f.get();
r->count++;
} catch (bufsize_mismatch_exception &e) {
// We have optimistically emplaced back one element of the
// vector. If we have failed to parse, we should remove it
// so size() gives us the right picture.
r->indexes.pop_back();
// FIXME: If the file ends at an index boundary, there is
// no problem. Essentially, we can't know how many indexes
// are in a sampling group, so there isn't really any way
// to know, other than reading.
//
// If, however, we end in the middle of an index, this is a
// corrupted file. This code is not perfect because we only
// know that an exception happened, and it happened due to
// eof. We don't really know if eof happened at the index
// boundary. To know that, we would have to keep track of
// the real position of the stream (including what's
// already in the buffer) before we start to read the
// index, and after. We won't go through such complexity at
// the moment.
if (r->stream.eof()) {
r->count = std::numeric_limits<std::remove_reference<decltype(r->count)>::type>::max();
} else {
throw e;
}
}
return make_ready_future<>();
return do_with(index_consumer(quantity), [this, position, estimated_size] (index_consumer& ic) {
auto stream = make_file_input_stream(this->_index_file, position, estimated_size);
auto ctx = make_lw_shared<index_consume_entry_context>(ic, std::move(stream), this->index_size() - position);
return ctx->consume_input(*ctx).then([ctx, &ic] {
return make_ready_future<index_list>(std::move(ic.indexes));
});
}).then([r] {
return make_ready_future<index_list>(std::move(r->indexes));
});
}

View File

@@ -27,6 +27,7 @@
#include "writer.hh"
#include "metadata_collector.hh"
#include "filter.hh"
#include "exceptions.hh"
namespace sstables {
@@ -89,15 +90,6 @@ public:
class key;
class malformed_sstable_exception : public std::exception {
sstring _msg;
public:
malformed_sstable_exception(sstring s) : _msg(s) {}
const char *what() const noexcept {
return _msg.c_str();
}
};
using index_list = std::vector<index_entry>;
class sstable {