sstables: add non-contiguous parsing of byte strings to the primitive_consumer
Currently, the primitive_consumer parses all values in contiguous buffers. A string of bytes may be very long, so parsing it in a single buffer can cause a big allocation. This patch allows parsing into fragmented_temporary_buffers instead of temporary_buffers. Signed-off-by: Wojciech Mitros <wojciech.mitros@scylladb.com>
This commit is contained in:
@@ -31,6 +31,8 @@
|
||||
#include <seastar/net/byteorder.hh>
|
||||
#include "bytes.hh"
|
||||
#include "reader_permit.hh"
|
||||
#include "utils/fragmented_temporary_buffer.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
|
||||
#include <variant>
|
||||
|
||||
@@ -84,11 +86,14 @@ private:
|
||||
READING_U16,
|
||||
READING_U32,
|
||||
READING_U64,
|
||||
READING_BYTES_CONTIGUOUS,
|
||||
READING_BYTES,
|
||||
READING_U16_BYTES,
|
||||
READING_UNSIGNED_VINT,
|
||||
READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS,
|
||||
READING_UNSIGNED_VINT_LENGTH_BYTES,
|
||||
READING_UNSIGNED_VINT_WITH_LEN,
|
||||
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS,
|
||||
READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN,
|
||||
READING_SIGNED_VINT,
|
||||
READING_SIGNED_VINT_WITH_LEN,
|
||||
@@ -114,9 +119,10 @@ private:
|
||||
} _read_int;
|
||||
|
||||
// state for READING_BYTES prestate
|
||||
temporary_buffer<char> _read_bytes;
|
||||
temporary_buffer<char>* _read_bytes_where; // which temporary_buffer to set, _key or _val?
|
||||
|
||||
size_t _read_bytes_len = 0;
|
||||
std::vector<temporary_buffer<char>> _read_bytes;
|
||||
temporary_buffer<char>* _read_bytes_where_contiguous; // which buffer to set, _key, _val, _cell_path or _pk?
|
||||
fragmented_temporary_buffer* _read_bytes_where;
|
||||
inline read_status read_partial_int(temporary_buffer<char>& data, prestate next_state) {
|
||||
std::copy(data.begin(), data.end(), _read_int.bytes);
|
||||
_pos = data.size();
|
||||
@@ -138,8 +144,10 @@ private:
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes.get_write());
|
||||
_read_bytes = std::vector<temporary_buffer<char>>();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
std::copy(data.begin(), data.end(), _read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_prestate = ReadingVintWithLen;
|
||||
@@ -150,13 +158,13 @@ private:
|
||||
template <typename VintType, typename T>
|
||||
inline read_status read_vint_with_len(temporary_buffer<char>& data, T& dest) {
|
||||
static_assert(std::is_same_v<T, typename VintType::value_type>, "Destination type mismatch");
|
||||
const auto n = std::min(_read_bytes.size() - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes.size()) {
|
||||
if (_pos == _read_bytes_len) {
|
||||
dest = VintType::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -203,15 +211,35 @@ public:
|
||||
return read_partial_int(data, prestate::READING_U64);
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
if (data.size() >= len) {
|
||||
inline read_status read_bytes_contiguous(temporary_buffer<char>& data, uint32_t len, temporary_buffer<char>& where) {
|
||||
if (data.size() >= len) {
|
||||
where = data.share(0, len);
|
||||
data.trim_front(len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
|
||||
std::copy(data.begin(), data.end(),_read_bytes.get_write());
|
||||
_read_bytes = std::vector<temporary_buffer<char>>();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_prestate = prestate::READING_BYTES_CONTIGUOUS;
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
inline read_status read_bytes(temporary_buffer<char>& data, uint32_t len, fragmented_temporary_buffer& where) {
|
||||
_read_bytes = std::vector<temporary_buffer<char>>();
|
||||
if (data.size() >= len) {
|
||||
_read_bytes.push_back(data.share(0, len));
|
||||
data.trim_front(len);
|
||||
where = fragmented_temporary_buffer(std::move(_read_bytes), len);
|
||||
return read_status::ready;
|
||||
} else {
|
||||
// copy what we have so far, read the rest later
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(data.share(), _permit));
|
||||
_read_bytes_len = len;
|
||||
_read_bytes_where = &where;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
@@ -223,10 +251,10 @@ public:
|
||||
if (data.size() >= sizeof(uint16_t)) {
|
||||
_u16 = consume_be<uint16_t>(data);
|
||||
} else {
|
||||
_read_bytes_where = &where;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
return read_partial_int(data, prestate::READING_U16_BYTES);
|
||||
}
|
||||
return read_bytes(data, uint32_t{_u16}, where);
|
||||
return read_bytes_contiguous(data, uint32_t{_u16}, where);
|
||||
}
|
||||
inline read_status read_unsigned_vint(temporary_buffer<char>& data) {
|
||||
return read_vint<
|
||||
@@ -240,7 +268,32 @@ public:
|
||||
prestate::READING_SIGNED_VINT,
|
||||
prestate::READING_SIGNED_VINT_WITH_LEN>(data, _i64);
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
inline read_status read_unsigned_vint_length_bytes_contiguous(temporary_buffer<char>& data, temporary_buffer<char>& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS;
|
||||
_read_bytes_where_contiguous = &where;
|
||||
return read_status::waiting;
|
||||
} else {
|
||||
const vint_size_type len = unsigned_vint::serialized_size_from_first_byte(*data.begin());
|
||||
if (data.size() >= len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(data.get_write()), data.size()));
|
||||
data.trim_front(len);
|
||||
return read_bytes_contiguous(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes = std::vector<temporary_buffer<char>>();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_read_bytes_where_contiguous = &where;
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS;
|
||||
return read_status::waiting;
|
||||
}
|
||||
}
|
||||
}
|
||||
inline read_status read_unsigned_vint_length_bytes(temporary_buffer<char>& data, fragmented_temporary_buffer& where) {
|
||||
if (data.empty()) {
|
||||
_prestate = prestate::READING_UNSIGNED_VINT_LENGTH_BYTES;
|
||||
_read_bytes_where = &where;
|
||||
@@ -253,8 +306,10 @@ public:
|
||||
data.trim_front(len);
|
||||
return read_bytes(data, static_cast<uint32_t>(_u64), where);
|
||||
} else {
|
||||
_read_bytes = make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit);
|
||||
std::copy(data.begin(), data.end(), _read_bytes.get_write());
|
||||
_read_bytes = std::vector<temporary_buffer<char>>();
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(temporary_buffer<char>(len), _permit));
|
||||
std::copy(data.begin(), data.end(),_read_bytes.front().get_write());
|
||||
_read_bytes_len = len;
|
||||
_pos = data.size();
|
||||
data.trim(0);
|
||||
_read_bytes_where = &where;
|
||||
@@ -301,6 +356,12 @@ public:
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_CONTIGUOUS:
|
||||
if (read_unsigned_vint_length_bytes_contiguous(data, *_read_bytes_where_contiguous) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES:
|
||||
if (read_unsigned_vint_length_bytes(data, *_read_bytes_where) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
@@ -311,14 +372,29 @@ public:
|
||||
return read_vint_with_len<unsigned_vint>(data, _u64);
|
||||
case prestate::READING_SIGNED_VINT_WITH_LEN:
|
||||
return read_vint_with_len<signed_vint>(data, _i64);
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
|
||||
const auto n = std::min(_read_bytes.size() - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.get_write() + _pos);
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN_CONTIGUOUS: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes.size()) {
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.get_write()), _read_bytes.size()));
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
if (read_bytes_contiguous(data, _u64, *_read_bytes_where_contiguous) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case prestate::READING_UNSIGNED_VINT_LENGTH_BYTES_WITH_LEN: {
|
||||
const auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy_n(data.begin(), n, _read_bytes.front().get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
_u64 = unsigned_vint::deserialize(
|
||||
bytes_view(reinterpret_cast<bytes::value_type*>(_read_bytes.front().get_write()), _read_bytes_len));
|
||||
if (read_bytes(data, _u64, *_read_bytes_where) == read_status::ready) {
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
@@ -326,14 +402,25 @@ public:
|
||||
}
|
||||
break;
|
||||
}
|
||||
case prestate::READING_BYTES: {
|
||||
auto n = std::min(_read_bytes.size() - _pos, data.size());
|
||||
std::copy(data.begin(), data.begin() + n,
|
||||
_read_bytes.get_write() + _pos);
|
||||
case prestate::READING_BYTES_CONTIGUOUS: {
|
||||
auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
std::copy(data.begin(), data.begin() + n, _read_bytes.front().get_write() + _pos);
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes.size()) {
|
||||
*_read_bytes_where = std::move(_read_bytes);
|
||||
if (_pos == _read_bytes_len) {
|
||||
*_read_bytes_where_contiguous = std::move(_read_bytes.front());
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case prestate::READING_BYTES: {
|
||||
auto n = std::min(_read_bytes_len - _pos, data.size());
|
||||
_read_bytes.push_back(make_tracked_temporary_buffer(data.share(0, n), _permit));
|
||||
data.trim_front(n);
|
||||
_pos += n;
|
||||
if (_pos == _read_bytes_len) {
|
||||
*_read_bytes_where = fragmented_temporary_buffer(std::move(_read_bytes), _read_bytes_len);
|
||||
_prestate = prestate::NONE;
|
||||
return read_status::ready;
|
||||
}
|
||||
@@ -357,7 +444,7 @@ public:
|
||||
if (process_int(data, sizeof(uint16_t))) {
|
||||
_u16 = net::ntoh(_read_int.uint16);
|
||||
_prestate = prestate::NONE;
|
||||
return read_bytes(data, _u16, *_read_bytes_where);
|
||||
return read_bytes_contiguous(data, _u16, *_read_bytes_where_contiguous);
|
||||
}
|
||||
break;
|
||||
case prestate::READING_U32:
|
||||
|
||||
@@ -160,7 +160,7 @@ public:
|
||||
}
|
||||
case state::KEY_BYTES:
|
||||
sstlog.trace("{}: pos {} state {}", fmt::ptr(this), current_pos(), state::KEY_BYTES);
|
||||
if (this->read_bytes(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
|
||||
if (this->read_bytes_contiguous(data, this->_u16, _key) != continuous_data_consumer::read_status::ready) {
|
||||
_state = state::POSITION;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -334,7 +334,7 @@ private:
|
||||
break;
|
||||
}
|
||||
case state::CELL_VALUE_BYTES:
|
||||
if (read_bytes(data, _u32, _val) != read_status::ready) {
|
||||
if (read_bytes_contiguous(data, _u32, _val) != read_status::ready) {
|
||||
_state = state::CELL_VALUE_BYTES_2;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -181,9 +181,9 @@ public:
|
||||
}
|
||||
read_status status = read_status::waiting;
|
||||
if (auto len = get_ck_block_value_length()) {
|
||||
status = _primitive.read_bytes(data, *len, column_value);
|
||||
status = _primitive.read_bytes_contiguous(data, *len, column_value);
|
||||
} else {
|
||||
status = _primitive.read_unsigned_vint_length_bytes(data, column_value);
|
||||
status = _primitive.read_unsigned_vint_length_bytes_contiguous(data, column_value);
|
||||
}
|
||||
if (status != read_status::ready) {
|
||||
_state = state::CK_BLOCK_END;
|
||||
|
||||
@@ -465,9 +465,9 @@ private:
|
||||
}
|
||||
read_status status = read_status::waiting;
|
||||
if (auto len = get_ck_block_value_length()) {
|
||||
status = read_bytes(data, *len, _column_value);
|
||||
status = read_bytes_contiguous(data, *len, _column_value);
|
||||
} else {
|
||||
status = read_unsigned_vint_length_bytes(data, _column_value);
|
||||
status = read_unsigned_vint_length_bytes_contiguous(data, _column_value);
|
||||
}
|
||||
if (status != read_status::ready) {
|
||||
_state = state::CK_BLOCK_END;
|
||||
@@ -671,7 +671,7 @@ private:
|
||||
case state::COLUMN_CELL_PATH:
|
||||
column_cell_path_label:
|
||||
if (!is_column_simple()) {
|
||||
if (read_unsigned_vint_length_bytes(data, _cell_path) != read_status::ready) {
|
||||
if (read_unsigned_vint_length_bytes_contiguous(data, _cell_path) != read_status::ready) {
|
||||
_state = state::COLUMN_VALUE;
|
||||
break;
|
||||
}
|
||||
@@ -687,9 +687,9 @@ private:
|
||||
}
|
||||
read_status status = read_status::waiting;
|
||||
if (auto len = get_column_value_length()) {
|
||||
status = read_bytes(data, *len, _column_value);
|
||||
status = read_bytes_contiguous(data, *len, _column_value);
|
||||
} else {
|
||||
status = read_unsigned_vint_length_bytes(data, _column_value);
|
||||
status = read_unsigned_vint_length_bytes_contiguous(data, _column_value);
|
||||
}
|
||||
if (status != read_status::ready) {
|
||||
_state = state::COLUMN_END;
|
||||
|
||||
@@ -104,7 +104,7 @@ private:
|
||||
return;
|
||||
}
|
||||
case state_k_l::START_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
|
||||
if (this->read_bytes_contiguous(data, this->_u16, ctx.start) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_k_l::END_NAME_LENGTH;
|
||||
return;
|
||||
}
|
||||
@@ -114,7 +114,7 @@ private:
|
||||
return;
|
||||
}
|
||||
case state_k_l::END_NAME_BYTES:
|
||||
if (this->read_bytes(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
|
||||
if (this->read_bytes_contiguous(data, this->_u16, ctx.end) != continuous_data_consumer::read_status::ready) {
|
||||
ctx.state = state_k_l::OFFSET;
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user