Files
scylladb/utils/fragmented_temporary_buffer.hh
Dario Mirovic 51995af258 transport: replace throwing protocol_exception with returns
Replace throwing `protocol_exception` with returning it as a result
or an exceptional future in the transport server module. The goal is
to improve performance.

Most of the `protocol_exception` throws were made from
`fragmented_temporary_buffer` module, by passing `exception_thrower()`
to its `read*` methods. `fragmented_temporary_buffer` is changed so
that it now accepts an exception creator, not exception thrower.
`fragmented_temporary_buffer_concepts::ExceptionCreator` concept replaced
`fragmented_temporary_buffer_concepts::ExceptionThrower` and all
methods that have been throwing now return failed result of type
`utils::result_with_exception_ptr`. This change is then propagated to the callers.

The scope of this patch is `protocol_exception`, so commitlog just calls
`.value()` method on the result. If the result failed, that will throw the
exception from the result, as defined by `utils::result_with_exception_ptr_throw_policy`.
This means that the behavior of commitlog module stays the same.

transport server module handles results gracefully. All the caller functions
that return non-future value `T` now return `utils::result_with_exception_ptr<T>`.
When the caller is a function that returns a future, and it receives
failed result, `make_exception_future(std::move(failed_result).value())`
is returned. The rest of the callstack up to the transport server `handle_error`
function is already working without throwing, and that's how zero throws is
achieved.

Fixes: #24567
2025-08-28 23:31:36 +02:00

533 lines
18 KiB
C++

/*
* Copyright (C) 2018-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <vector>
#include <seastar/core/iostream.hh>
#include <seastar/core/format.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/core/simple-stream.hh>
#include "bytes.hh"
#include "bytes_ostream.hh"
#include "contiguous_shared_buffer.hh"
#include "fragment_range.hh"
#include "result.hh"
/// Fragmented buffer consisting of multiple Buffer objects.
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer {
using vector_type = std::vector<Buffer>;
vector_type _fragments;
size_t _size_bytes = 0;
public:
static constexpr size_t default_fragment_size = 128 * 1024;
class view;
class istream;
class reader;
using ostream = seastar::memory_output_stream<typename vector_type::iterator>;
basic_fragmented_buffer() = default;
basic_fragmented_buffer(std::vector<Buffer> fragments, size_t size_bytes) noexcept
: _fragments(std::move(fragments)), _size_bytes(size_bytes)
{ }
basic_fragmented_buffer(const char* str, size_t size)
{
*this = allocate_to_fit(size);
size_t pos = 0;
for (auto& frag : _fragments) {
std::memcpy(frag.get_write(), str + pos, frag.size());
pos += frag.size();
}
}
explicit operator view() const noexcept;
istream get_istream() const noexcept;
ostream get_ostream() noexcept {
if (_fragments.size() != 1) {
return typename ostream::fragmented(_fragments.begin(), _size_bytes);
}
auto& current = *_fragments.begin();
return typename ostream::simple(reinterpret_cast<char*>(current.get_write()), current.size());
}
using const_fragment_iterator = typename vector_type::const_iterator;
const_fragment_iterator begin() const {
return _fragments.begin();
}
const_fragment_iterator end() const {
return _fragments.end();
}
size_t size_bytes() const { return _size_bytes; }
bool empty() const { return !_size_bytes; }
// Linear complexity, invalidates views and istreams
void remove_prefix(size_t n) noexcept {
_size_bytes -= n;
auto it = _fragments.begin();
while (it->size() < n) {
n -= it->size();
++it;
}
if (n) {
it->trim_front(n);
}
_fragments.erase(_fragments.begin(), it);
}
// Linear complexity, invalidates views and istreams
void remove_suffix(size_t n) noexcept {
_size_bytes -= n;
auto it = _fragments.rbegin();
while (it->size() < n) {
n -= it->size();
++it;
}
if (n) {
it->trim(it->size() - n);
}
_fragments.erase(it.base(), _fragments.end());
}
// Creates a fragmented buffer of a specified size, supplied as a parameter.
// Max chunk size is limited to 128kb (the same limit as `bytes_stream` has).
static basic_fragmented_buffer allocate_to_fit(size_t data_size) {
constexpr size_t max_fragment_size = default_fragment_size; // 128KB
const size_t full_fragment_count = data_size / max_fragment_size; // number of max-sized fragments
const size_t last_fragment_size = data_size % max_fragment_size;
std::vector<Buffer> fragments;
fragments.reserve(full_fragment_count + !!last_fragment_size);
for (size_t i = 0; i < full_fragment_count; ++i) {
fragments.emplace_back(Buffer(max_fragment_size));
}
if (last_fragment_size) {
fragments.emplace_back(Buffer(last_fragment_size));
}
return basic_fragmented_buffer(std::move(fragments), data_size);
}
vector_type release() && noexcept {
return std::move(_fragments);
}
};
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::view {
vector_type::const_iterator _current;
const char* _current_position = nullptr;
size_t _current_size = 0;
size_t _total_size = 0;
public:
view() = default;
view(vector_type::const_iterator it, size_t position, size_t total_size)
: _current(it)
, _current_position(it->get() + position)
, _current_size(std::min(it->size() - position, total_size))
, _total_size(total_size)
{ }
explicit view(bytes_view bv) noexcept
: _current_position(reinterpret_cast<const char*>(bv.data()))
, _current_size(bv.size())
, _total_size(bv.size())
{ }
using fragment_type = bytes_view;
class iterator {
vector_type::const_iterator _it;
size_t _left = 0;
bytes_view _current;
public:
using iterator_category = std::forward_iterator_tag;
using value_type = bytes_view;
using difference_type = ptrdiff_t;
using pointer = const bytes_view*;
using reference = const bytes_view&;
iterator() = default;
iterator(vector_type::const_iterator it, bytes_view current, size_t left) noexcept
: _it(it)
, _left(left)
, _current(current)
{ }
reference operator*() const noexcept { return _current; }
pointer operator->() const noexcept { return &_current; }
iterator& operator++() noexcept {
_left -= _current.size();
if (_left) {
++_it;
_current = bytes_view(reinterpret_cast<const bytes::value_type*>(_it->get()),
std::min(_left, _it->size()));
}
return *this;
}
iterator operator++(int) noexcept {
auto it = *this;
operator++();
return it;
}
bool operator==(const iterator& other) const noexcept {
return _left == other._left;
}
};
using const_iterator = iterator;
iterator begin() const noexcept {
return iterator(_current,
bytes_view(reinterpret_cast<const bytes::value_type*>(_current_position), _current_size),
_total_size);
}
iterator end() const noexcept {
return iterator();
}
bool empty() const noexcept { return !size_bytes(); }
size_t size_bytes() const noexcept { return _total_size; }
void remove_prefix(size_t n) noexcept {
if (!_total_size) {
return;
}
while (n > _current_size) {
_total_size -= _current_size;
n -= _current_size;
++_current;
_current_size = std::min(_current->size(), _total_size);
_current_position = _current->get();
}
_total_size -= n;
_current_size -= n;
_current_position += n;
if (!_current_size && _total_size) {
++_current;
_current_size = std::min(_current->size(), _total_size);
_current_position = _current->get();
}
}
void remove_current() noexcept {
_total_size -= _current_size;
if (_total_size) {
++_current;
_current_size = std::min(_current->size(), _total_size);
_current_position = _current->get();
} else {
_current_size = 0;
_current_position = nullptr;
}
}
view prefix(size_t n) const {
auto tmp = *this;
tmp._total_size = std::min(tmp._total_size, n);
tmp._current_size = std::min(tmp._current_size, n);
return tmp;
}
bytes_view current_fragment() const noexcept {
return bytes_view(reinterpret_cast<const bytes_view::value_type*>(_current_position), _current_size);
}
// Invalidates iterators
void remove_suffix(size_t n) noexcept {
_total_size -= n;
_current_size = std::min(_current_size, _total_size);
}
bool operator==(const basic_fragmented_buffer::view& other) const noexcept {
auto this_it = begin();
auto other_it = other.begin();
if (empty() || other.empty()) {
return empty() && other.empty();
}
auto this_fragment = *this_it;
auto other_fragment = *other_it;
while (this_it != end() && other_it != other.end()) {
if (this_fragment.empty()) {
++this_it;
if (this_it != end()) {
this_fragment = *this_it;
}
}
if (other_fragment.empty()) {
++other_it;
if (other_it != other.end()) {
other_fragment = *other_it;
}
}
auto length = std::min(this_fragment.size(), other_fragment.size());
if (!std::equal(this_fragment.data(), this_fragment.data() + length, other_fragment.data())) {
return false;
}
this_fragment.remove_prefix(length);
other_fragment.remove_prefix(length);
}
return this_it == end() && other_it == other.end();
}
};
using fragmented_temporary_buffer = basic_fragmented_buffer<temporary_buffer<char>>;
static_assert(FragmentRange<fragmented_temporary_buffer::view>);
static_assert(FragmentedView<fragmented_temporary_buffer::view>);
template <ContiguousSharedBuffer Buffer>
inline basic_fragmented_buffer<Buffer>::operator view() const noexcept
{
if (!_size_bytes) {
return view();
}
return view(_fragments.begin(), 0, _size_bytes);
}
namespace fragmented_temporary_buffer_concepts {
template<typename T>
concept ExceptionCreator = requires(T obj, size_t n) {
{ obj.out_of_range(n, n) } -> utils::ExceptionPtrResult;
};
}
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::istream {
vector_type::const_iterator _current;
const char* _current_position;
const char* _current_end;
size_t _bytes_left = 0;
private:
size_t contig_remain() const {
return _current_end - _current_position;
}
void next_fragment() {
_bytes_left -= _current->size();
if (_bytes_left) {
_current++;
_current_position = _current->get();
_current_end = _current->get() + _current->size();
} else {
_current_position = nullptr;
_current_end = nullptr;
}
}
template<typename ExceptionCreator>
requires fragmented_temporary_buffer_concepts::ExceptionCreator<ExceptionCreator>
utils::result_with_exception_ptr<void>
check_out_of_range(ExceptionCreator& exceptions, size_t n) {
if (bytes_left() < n) [[unlikely]] {
return exceptions.out_of_range(n, bytes_left());
// Let's allow skipping this check if the user trusts its input
// data.
}
return bo::success();
}
template<typename T, typename ExceptionCreator>
[[gnu::noinline]] [[gnu::cold]]
utils::result_with_exception_ptr<T> read_slow(ExceptionCreator&& exceptions) {
auto check = check_out_of_range(exceptions, sizeof(T));
if (!check) [[unlikely]] {
return bo::failure(std::move(check).assume_error());
}
T obj;
size_t left = sizeof(T);
while (left) {
auto this_length = std::min<size_t>(left, _current_end - _current_position);
std::copy_n(_current_position, this_length, reinterpret_cast<char*>(&obj) + sizeof(T) - left);
left -= this_length;
if (left) {
next_fragment();
} else {
_current_position += this_length;
}
}
return obj;
}
[[gnu::noinline]] [[gnu::cold]]
void skip_slow(size_t n) noexcept {
auto left = std::min<size_t>(n, bytes_left());
while (left) {
auto this_length = std::min<size_t>(left, _current_end - _current_position);
left -= this_length;
if (left) {
next_fragment();
} else {
_current_position += this_length;
}
}
}
public:
struct default_exception_creator {
[[gnu::cold]]
static utils::result_with_exception_ptr<void> out_of_range(size_t attempted_read, size_t actual_left) {
return bo::failure(std::out_of_range(format("attempted to read {:d} bytes from a {:d} byte buffer", attempted_read, actual_left)));
}
};
istream(const vector_type& fragments, size_t total_size) noexcept
: _current(fragments.begin())
, _current_position(total_size ? _current->get() : nullptr)
, _current_end(total_size ? _current->get() + _current->size() : nullptr)
, _bytes_left(total_size)
{ }
size_t bytes_left() const noexcept {
return _bytes_left ? _bytes_left - (_current_position - _current->get()) : 0;
}
void skip(size_t n) noexcept {
if (contig_remain() < n) [[unlikely]] {
return skip_slow(n);
}
_current_position += n;
}
template<typename T, typename ExceptionCreator = default_exception_creator>
requires fragmented_temporary_buffer_concepts::ExceptionCreator<ExceptionCreator>
utils::result_with_exception_ptr<T> read(ExceptionCreator&& exceptions = default_exception_creator()) {
if (contig_remain() < sizeof(T)) [[unlikely]] {
return read_slow<T>(std::forward<ExceptionCreator>(exceptions));
}
T obj;
std::copy_n(_current_position, sizeof(T), reinterpret_cast<char*>(&obj));
_current_position += sizeof(T);
return obj;
}
template<typename Output, typename ExceptionCreator = default_exception_creator>
requires fragmented_temporary_buffer_concepts::ExceptionCreator<ExceptionCreator>
utils::result_with_exception_ptr<Output> read_to(size_t n, Output out, ExceptionCreator&& exceptions = default_exception_creator()) {
if (contig_remain() >= n) [[likely]] {
out = std::copy_n(_current_position, n, out);
_current_position += n;
return out;
}
auto range = check_out_of_range(exceptions, n);
if (!range) [[unlikely]] {
return bo::failure(std::move(range).assume_error());
}
out = std::copy(_current_position, _current_end, out);
n -= _current_end - _current_position;
next_fragment();
while (n > _current->size()) {
out = std::copy(_current_position, _current_end, out);
n -= _current->size();
next_fragment();
}
out = std::copy_n(_current_position, n, out);
_current_position += n;
return out;
}
template<typename ExceptionCreator = default_exception_creator>
requires fragmented_temporary_buffer_concepts::ExceptionCreator<ExceptionCreator>
utils::result_with_exception_ptr<view> read_view(size_t n, ExceptionCreator&& exceptions = default_exception_creator()) {
if (contig_remain() >= n) [[likely]] {
auto v = view(_current, _current_position - _current->get(), n);
_current_position += n;
return v;
}
auto range = check_out_of_range(exceptions, n);
if (!range) [[unlikely]] {
return bo::failure(std::move(range).assume_error());
}
auto v = view(_current, _current_position - _current->get(), n);
n -= _current_end - _current_position;
next_fragment();
while (n > _current->size()) {
n -= _current->size();
next_fragment();
}
_current_position += n;
return v;
}
template<typename ExceptionCreator = default_exception_creator>
requires fragmented_temporary_buffer_concepts::ExceptionCreator<ExceptionCreator>
utils::result_with_exception_ptr<bytes_view> read_bytes_view(size_t n, bytes_ostream& linearization_buffer, ExceptionCreator&& exceptions = default_exception_creator()) {
if (contig_remain() >= n) [[likely]] {
auto v = bytes_view(reinterpret_cast<const bytes::value_type*>(_current_position), n);
_current_position += n;
return v;
}
auto range = check_out_of_range(exceptions, n);
if (!range) [[unlikely]] {
return bo::failure(std::move(range).assume_error());
}
auto ptr = linearization_buffer.write_place_holder(n);
auto output = read_to(n, ptr, std::forward<ExceptionCreator>(exceptions));
if (!output) [[unlikely]] {
return bo::failure(std::move(output).assume_error());
}
return bytes_view(reinterpret_cast<const bytes::value_type*>(ptr), n);
}
};
template <ContiguousSharedBuffer Buffer>
inline basic_fragmented_buffer<Buffer>::istream basic_fragmented_buffer<Buffer>::get_istream() const noexcept // allow empty (ut for that)
{
return istream(_fragments, _size_bytes);
}
template <ContiguousSharedBuffer Buffer>
class basic_fragmented_buffer<Buffer>::reader {
using FragBuffer = basic_fragmented_buffer<Buffer>;
FragBuffer::vector_type _fragments;
size_t _left = 0;
public:
future<FragBuffer> read_exactly(input_stream<char>& in, size_t length) {
_fragments = FragBuffer::vector_type();
_left = length;
return repeat_until_value([this, length, &in] {
if (!_left) {
return make_ready_future<std::optional<FragBuffer>>(FragBuffer(std::move(_fragments), length));
}
return in.read_up_to(_left).then([this] (temporary_buffer<char> buf) {
if (buf.empty()) {
return std::make_optional(FragBuffer());
}
_left -= buf.size();
_fragments.emplace_back(Buffer(std::move(buf)));
return std::optional<FragBuffer>();
});
});
}
};
// The operator below is used only for logging
template <ContiguousSharedBuffer Buffer>
inline std::ostream& operator<<(std::ostream& out, const typename basic_fragmented_buffer<Buffer>::view& v) {
for (bytes_view frag : fragment_range(v)) {
out << to_hex(frag);
}
return out;
}