sstables: add non-contiguous parsing of byte strings to the primitive_consumer

Currently, the primitive_consumer parses all values in contiguous buffers.
A string of bytes may be very long, so parsing it in a single buffer
can cause a big allocation. This patch allows parsing into
fragmented_temporary_buffers instead of temporary_buffers.

Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>
This commit is contained in:
Wojciech Mitros
2021-02-28 19:53:41 +01:00
parent 3f529b2860
commit b1b5bda848
6 changed files with 128 additions and 41 deletions

View File

@@ -31,6 +31,8 @@
#include <seastar/net/byteorder.hh>
#include "bytes.hh"
#include "reader_permit.hh"
#include "utils/fragmented_temporary_buffer.hh"
#include "utils/overloaded_functor.hh"
#include <variant>
@@ -84,11 +86,14 @@ private:
READING_U16,
READING_U32,
READING_U64,
READING_BYTES_CONTIGUOUS,
READING_BYTES,
READING_U16_BYTES,
READING_UNSIGNED_VINT,
READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS,
READING_UNSIGNED_VINT_LENGTH_BYTES,
READING_UNSIGNED_VINT_WITH_LEN,
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS,
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN,
READING_SIGNED_VINT,
READING_SIGNED_VINT_WITH_LEN,
@@ -114,9 +119,10 @@ private:
} _read_int;
// state for READING_BYTES prestate
temporary_buffer<char> _read_bytes;
temporary_buffer<char>* _read_bytes_where; // which temporary_buffer to set, _key or _val?
size_t _read_bytes_len = 0;
std::vector<temporary_buffer<char>> _read_bytes;
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
fragmented_temporary_buffer* _read_bytes_where;
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) {
std::copy(data.begin(), data.end(), _read_int.bytes);
_pos = data.size();
@@ -138,8 +144,10 @@ private:
data.trim_front(len);
return read_status::ready;
} else {
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(), _read_bytes.get_write());
_read_bytes = std::vector<temporary_buffer<char>>();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_prestate = ReadingVintWithLen;
@@ -150,13 +158,13 @@ private:
template <typename VintType, typename T>
inline read_status read_vint_with_len(temporary_buffer<char>& data, T& dest) {
static_assert(std::is_same_v<T, typename VintType::value_type>, "Destination type mismatch");
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
if (_pos == _read_bytes_len) {
dest = VintType::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -203,15 +211,35 @@ public:
return read_partial_int(data, prestate::READING_U64);
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
if (data.size() >= len) {
where = data.share(0, len);
data.trim_front(len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(),_read_bytes.get_write());
_read_bytes = std::vector<temporary_buffer<char>>();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_read_bytes_where_contiguous = &where;
_pos = data.size();
data.trim(0);
_prestate = prestate::READING_BYTES_CONTIGUOUS;
return read_status::waiting;
}
}
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
_read_bytes = std::vector<temporary_buffer<char>>();
if (data.size() >= len) {
_read_bytes.push_back(data.share(0, len));
data.trim_front(len);
where = fragmented_temporary_buffer(std::move(_read_bytes), len);
return read_status::ready;
} else {
// copy what we have so far, read the rest later
_read_bytes.push_back(make_tracked_temporary_buffer(data.share(), _permit));
_read_bytes_len = len;
_read_bytes_where = &where;
_pos = data.size();
data.trim(0);
@@ -223,10 +251,10 @@ public:
if (data.size() >= sizeof(uint16_t)) {
_u16 = consume_be<uint16_t>(data);
} else {
_read_bytes_where = &where;
_read_bytes_where_contiguous = &where;
return read_partial_int(data, prestate::READING_U16_BYTES);
}
return read_bytes(data, uint32_t{_u16}, where);
return read_bytes_contiguous(data, uint32_t{_u16}, where);
}
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
return read_vint<
@@ -240,7 +268,32 @@ public:
prestate::READING_SIGNED_VINT,
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
_read_bytes_where_contiguous = &where;
return read_status::waiting;
} else {
const vint_size_type len = unsigned_vint::serialized_size_from_first_byte(*data.begin());
if (data.size() >= len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(data.get_write()), data.size()));
data.trim_front(len);
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes = std::vector<temporary_buffer<char>>();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_read_bytes_where_contiguous = &where;
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS;
return read_status::waiting;
}
}
}
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
if (data.empty()) {
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
_read_bytes_where = &where;
@@ -253,8 +306,10 @@ public:
data.trim_front(len);
return read_bytes(data, static_cast<uint32_t>(_u64), where);
} else {
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
std::copy(data.begin(), data.end(), _read_bytes.get_write());
_read_bytes = std::vector<temporary_buffer<char>>();
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
_read_bytes_len = len;
_pos = data.size();
data.trim(0);
_read_bytes_where = &where;
@@ -301,6 +356,12 @@ public:
return read_status::ready;
}
break;
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS:
if (read_unsigned_vint_length_bytes_contiguous(data, *_read_bytes_where_contiguous) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
}
break;
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES:
if (read_unsigned_vint_length_bytes(data, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
@@ -311,14 +372,29 @@ public:
return read_vint_with_len<unsigned_vint>(data, _u64);
case prestate::READING_SIGNED_VINT_WITH_LEN:
return read_vint_with_len<signed_vint>(data, _i64);
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
const auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
}
}
break;
}
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
const auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
_u64 = unsigned_vint::deserialize(
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
_prestate = prestate::NONE;
return read_status::ready;
@@ -326,14 +402,25 @@ public:
}
break;
}
case prestate::READING_BYTES: {
auto n = std::min(_read_bytes.size() - _pos, data.size());
std::copy(data.begin(), data.begin() + n,
_read_bytes.get_write() + _pos);
case prestate::READING_BYTES_CONTIGUOUS: {
auto n = std::min(_read_bytes_len - _pos, data.size());
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes.size()) {
*_read_bytes_where = std::move(_read_bytes);
if (_pos == _read_bytes_len) {
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
_prestate = prestate::NONE;
return read_status::ready;
}
break;
}
case prestate::READING_BYTES: {
auto n = std::min(_read_bytes_len - _pos, data.size());
_read_bytes.push_back(make_tracked_temporary_buffer(data.share(0, n), _permit));
data.trim_front(n);
_pos += n;
if (_pos == _read_bytes_len) {
*_read_bytes_where = fragmented_temporary_buffer(std::move(_read_bytes), _read_bytes_len);
_prestate = prestate::NONE;
return read_status::ready;
}
@@ -357,7 +444,7 @@ public:
if (process_int(data, sizeof(uint16_t))) {
_u16 = net::ntoh(_read_int.uint16);
_prestate = prestate::NONE;
return read_bytes(data, _u16, *_read_bytes_where);
return read_bytes_contiguous(data, _u16, *_read_bytes_where_contiguous);
}
break;
case prestate::READING_U32:

View File

@@ -160,7 +160,7 @@ public:
}
case state::KEY_BYTES:
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_BYTES);
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
_state = state::POSITION;
break;
}

View File

@@ -334,7 +334,7 @@ private:
break;
}
case state::CELL_VALUE_BYTES:
if (read_bytes(data, _u32, _val) != read_status::ready) {
if (read_bytes_contiguous(data, _u32, _val) != read_status::ready) {
_state = state::CELL_VALUE_BYTES_2;
break;
}

View File

@@ -181,9 +181,9 @@ public:
}
read_status status = read_status::waiting;
if (auto len = get_ck_block_value_length()) {
status = _primitive.read_bytes(data, *len, column_value);
status = _primitive.read_bytes_contiguous(data, *len, column_value);
} else {
status = _primitive.read_unsigned_vint_length_bytes(data, column_value);
status = _primitive.read_unsigned_vint_length_bytes_contiguous(data, column_value);
}
if (status != read_status::ready) {
_state = state::CK_BLOCK_END;

View File

@@ -465,9 +465,9 @@ private:
}
read_status status = read_status::waiting;
if (auto len = get_ck_block_value_length()) {
status = read_bytes(data, *len, _column_value);
status = read_bytes_contiguous(data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(data, _column_value);
status = read_unsigned_vint_length_bytes_contiguous(data, _column_value);
}
if (status != read_status::ready) {
_state = state::CK_BLOCK_END;
@@ -671,7 +671,7 @@ private:
case state::COLUMN_CELL_PATH:
column_cell_path_label:
if (!is_column_simple()) {
if (read_unsigned_vint_length_bytes(data, _cell_path) != read_status::ready) {
if (read_unsigned_vint_length_bytes_contiguous(data, _cell_path) != read_status::ready) {
_state = state::COLUMN_VALUE;
break;
}
@@ -687,9 +687,9 @@ private:
}
read_status status = read_status::waiting;
if (auto len = get_column_value_length()) {
status = read_bytes(data, *len, _column_value);
status = read_bytes_contiguous(data, *len, _column_value);
} else {
status = read_unsigned_vint_length_bytes(data, _column_value);
status = read_unsigned_vint_length_bytes_contiguous(data, _column_value);
}
if (status != read_status::ready) {
_state = state::COLUMN_END;

View File

@@ -104,7 +104,7 @@ private:
return;
}
case state_k_l::START_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::END_NAME_LENGTH;
return;
}
@@ -114,7 +114,7 @@ private:
return;
}
case state_k_l::END_NAME_BYTES:
if (this->read_bytes(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
if (this->read_bytes_contiguous(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
ctx.state = state_k_l::OFFSET;
return;
}