Boost is external to the project so treat its headers as "system" headers and include them with angle brackets. Closes scylladb/scylladb#25619
401 lines
11 KiB
C++
401 lines
11 KiB
C++
/*
|
|
* Copyright 2016-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
#pragma once
|
|
|
|
#include <seastar/core/sstring.hh>
|
|
#include <optional>
|
|
#include "utils/assert.hh"
|
|
#include "utils/managed_bytes.hh"
|
|
#include "bytes_ostream.hh"
|
|
#include <seastar/core/simple-stream.hh>
|
|
#include <boost/variant/variant.hpp>
|
|
#include "bytes_ostream.hh"
|
|
#include "utils/fragment_range.hh"
|
|
#include <variant>
|
|
|
|
#include <type_traits>
|
|
|
|
namespace ser {
|
|
|
|
/// A fragmented view of an opaque buffer in a stream of serialised data
|
|
///
|
|
/// This class allows reading large, fragmented blobs serialised by the IDL
|
|
/// infrastructure without linearising or copying them. The view remains valid
|
|
/// as long as the underlying IDL-serialised buffer is alive.
|
|
///
|
|
/// Satisfies FragmentRange concept.
|
|
template<typename FragmentIterator>
|
|
class buffer_view {
|
|
bytes_view _first;
|
|
size_t _total_size;
|
|
FragmentIterator _next;
|
|
public:
|
|
using fragment_type = bytes_view;
|
|
|
|
struct implementation {
|
|
bytes_view current;
|
|
FragmentIterator next;
|
|
size_t size;
|
|
};
|
|
|
|
class iterator {
|
|
bytes_view _current;
|
|
size_t _left = 0;
|
|
FragmentIterator _next;
|
|
public:
|
|
using iterator_category = std::input_iterator_tag;
|
|
using value_type = bytes_view;
|
|
using pointer = const bytes_view*;
|
|
using reference = const bytes_view&;
|
|
using difference_type = std::ptrdiff_t;
|
|
|
|
iterator() = default;
|
|
iterator(bytes_view current, size_t left, FragmentIterator next)
|
|
: _current(current), _left(left), _next(next) { }
|
|
|
|
bytes_view operator*() const {
|
|
return _current;
|
|
}
|
|
const bytes_view* operator->() const {
|
|
return &_current;
|
|
}
|
|
|
|
iterator& operator++() {
|
|
_left -= _current.size();
|
|
if (_left) {
|
|
auto next_view = bytes_view(reinterpret_cast<const bytes::value_type*>((*_next).begin()),
|
|
(*_next).size());
|
|
auto next_size = std::min(_left, next_view.size());
|
|
_current = bytes_view(next_view.data(), next_size);
|
|
++_next;
|
|
}
|
|
return *this;
|
|
}
|
|
iterator operator++(int) {
|
|
iterator it(*this);
|
|
operator++();
|
|
return it;
|
|
}
|
|
|
|
bool operator==(const iterator& other) const {
|
|
return _left == other._left;
|
|
}
|
|
};
|
|
using const_iterator = iterator;
|
|
|
|
explicit buffer_view(bytes_view current)
|
|
: _first(current), _total_size(current.size()) { }
|
|
|
|
buffer_view(bytes_view current, size_t size, FragmentIterator it)
|
|
: _first(current), _total_size(size), _next(it)
|
|
{
|
|
if (_first.size() > _total_size) {
|
|
_first.remove_suffix(_first.size() - _total_size);
|
|
}
|
|
}
|
|
|
|
explicit buffer_view(typename seastar::memory_input_stream<FragmentIterator>::simple stream)
|
|
: buffer_view(bytes_view(reinterpret_cast<const int8_t*>(stream.begin()), stream.size()))
|
|
{ }
|
|
|
|
explicit buffer_view(typename seastar::memory_input_stream<FragmentIterator>::fragmented stream)
|
|
: buffer_view(bytes_view(reinterpret_cast<const int8_t*>(stream.first_fragment_data()), stream.first_fragment_size()),
|
|
stream.size(), stream.fragment_iterator())
|
|
{ }
|
|
|
|
iterator begin() const {
|
|
return iterator(_first, _total_size, _next);
|
|
}
|
|
iterator end() const {
|
|
return iterator();
|
|
}
|
|
|
|
size_t size_bytes() const {
|
|
return _total_size;
|
|
}
|
|
bool empty() const {
|
|
return !_total_size;
|
|
}
|
|
|
|
// FragmentedView implementation
|
|
void remove_prefix(size_t n) {
|
|
while (n >= _first.size() && n > 0) {
|
|
n -= _first.size();
|
|
remove_current();
|
|
}
|
|
_total_size -= n;
|
|
_first.remove_prefix(n);
|
|
}
|
|
void remove_current() {
|
|
_total_size -= _first.size();
|
|
if (_total_size) {
|
|
auto next_data = reinterpret_cast<const bytes::value_type*>((*_next).begin());
|
|
size_t next_size = std::min(_total_size, (*_next).size());
|
|
_first = bytes_view(next_data, next_size);
|
|
++_next;
|
|
} else {
|
|
_first = bytes_view();
|
|
}
|
|
}
|
|
buffer_view prefix(size_t n) const {
|
|
auto tmp = *this;
|
|
tmp._total_size = std::min(tmp._total_size, n);
|
|
tmp._first = tmp._first.substr(0, n);
|
|
return tmp;
|
|
}
|
|
bytes_view current_fragment() {
|
|
return _first;
|
|
}
|
|
|
|
bytes linearize() const {
|
|
bytes b(bytes::initialized_later(), size_bytes());
|
|
auto dst = b.begin();
|
|
for (bytes_view fragment : *this) {
|
|
dst = std::copy(fragment.begin(), fragment.end(), dst);
|
|
}
|
|
return b;
|
|
}
|
|
|
|
template<typename Function>
|
|
decltype(auto) with_linearized(Function&& fn) const
|
|
{
|
|
bytes b;
|
|
bytes_view bv;
|
|
if (_first.size() != _total_size) {
|
|
b = linearize();
|
|
bv = b;
|
|
} else {
|
|
bv = _first;
|
|
}
|
|
return fn(bv);
|
|
}
|
|
|
|
implementation extract_implementation() const {
|
|
return implementation {
|
|
.current = _first,
|
|
.next = _next,
|
|
.size = _total_size,
|
|
};
|
|
}
|
|
};
|
|
static_assert(FragmentedView<buffer_view<bytes_ostream::fragment_iterator>>);
|
|
|
|
using size_type = uint32_t;
|
|
|
|
template<typename T, typename Input>
|
|
requires std::is_integral_v<T>
|
|
inline T deserialize_integral(Input& input) {
|
|
T data;
|
|
input.read(reinterpret_cast<char*>(&data), sizeof(T));
|
|
return le_to_cpu(data);
|
|
}
|
|
|
|
template<typename T, typename Output>
|
|
requires std::is_integral_v<T>
|
|
inline void serialize_integral(Output& output, T data) {
|
|
data = cpu_to_le(data);
|
|
output.write(reinterpret_cast<const char*>(&data), sizeof(T));
|
|
}
|
|
|
|
template<typename T>
|
|
struct serializer;
|
|
|
|
template<typename T>
|
|
struct integral_serializer {
|
|
template<typename Input>
|
|
static T read(Input& v) {
|
|
return deserialize_integral<T>(v);
|
|
}
|
|
template<typename Output>
|
|
static void write(Output& out, T v) {
|
|
serialize_integral(out, v);
|
|
}
|
|
template<typename Input>
|
|
static void skip(Input& v) {
|
|
read(v);
|
|
}
|
|
};
|
|
|
|
template<> struct serializer<bool> {
|
|
template <typename Input>
|
|
static bool read(Input& i) {
|
|
return deserialize_integral<uint8_t>(i);
|
|
}
|
|
template< typename Output>
|
|
static void write(Output& out, bool v) {
|
|
serialize_integral(out, uint8_t(v));
|
|
}
|
|
template <typename Input>
|
|
static void skip(Input& i) {
|
|
read(i);
|
|
}
|
|
|
|
};
|
|
template<> struct serializer<int8_t> : public integral_serializer<int8_t> {};
|
|
template<> struct serializer<uint8_t> : public integral_serializer<uint8_t> {};
|
|
template<> struct serializer<int16_t> : public integral_serializer<int16_t> {};
|
|
template<> struct serializer<uint16_t> : public integral_serializer<uint16_t> {};
|
|
template<> struct serializer<int32_t> : public integral_serializer<int32_t> {};
|
|
template<> struct serializer<uint32_t> : public integral_serializer<uint32_t> {};
|
|
template<> struct serializer<int64_t> : public integral_serializer<int64_t> {};
|
|
template<> struct serializer<uint64_t> : public integral_serializer<uint64_t> {};
|
|
|
|
template<typename Output>
|
|
void safe_serialize_as_uint32(Output& output, uint64_t data);
|
|
|
|
template<typename T, typename Output>
|
|
inline void serialize(Output& out, const T& v) {
|
|
serializer<T>::write(out, v);
|
|
};
|
|
|
|
template<typename T, typename Output>
|
|
inline void serialize(Output& out, const std::reference_wrapper<T> v) {
|
|
serializer<T>::write(out, v.get());
|
|
}
|
|
|
|
template<typename T, typename Input>
|
|
inline auto deserialize(Input& in, std::type_identity<T> t) {
|
|
return serializer<T>::read(in);
|
|
}
|
|
|
|
template<typename T, typename Input>
|
|
inline void skip(Input& v, std::type_identity<T>) {
|
|
return serializer<T>::skip(v);
|
|
}
|
|
|
|
template<typename T>
|
|
size_type get_sizeof(const T& obj);
|
|
|
|
template<typename T>
|
|
void set_size(seastar::measuring_output_stream& os, const T& obj);
|
|
|
|
template<typename Stream, typename T>
|
|
void set_size(Stream& os, const T& obj);
|
|
|
|
template<typename Buffer, typename T>
|
|
Buffer serialize_to_buffer(const T& v, size_t head_space = 0);
|
|
|
|
template<typename T, typename Buffer>
|
|
T deserialize_from_buffer(const Buffer&, std::type_identity<T>, size_t head_space = 0);
|
|
|
|
template<typename Output, typename ...T>
|
|
void serialize(Output& out, const boost::variant<T...>& v);
|
|
|
|
template<typename Input, typename ...T>
|
|
boost::variant<T...> deserialize(Input& in, std::type_identity<boost::variant<T...>>);
|
|
|
|
template<typename Output, typename ...T>
|
|
void serialize(Output& out, const std::variant<T...>& v);
|
|
|
|
template<typename Input, typename ...T>
|
|
std::variant<T...> deserialize(Input& in, std::type_identity<std::variant<T...>>);
|
|
|
|
struct unknown_variant_type {
|
|
size_type index;
|
|
sstring data;
|
|
};
|
|
|
|
template<typename Output>
|
|
void serialize(Output& out, const unknown_variant_type& v);
|
|
|
|
template<typename Input>
|
|
unknown_variant_type deserialize(Input& in, std::type_identity<unknown_variant_type>);
|
|
|
|
template <typename T>
|
|
struct normalize {
|
|
using type = T;
|
|
};
|
|
|
|
template <>
|
|
struct normalize<bytes_view> {
|
|
using type = bytes;
|
|
};
|
|
|
|
template <>
|
|
struct normalize<managed_bytes> {
|
|
using type = bytes;
|
|
};
|
|
|
|
template <>
|
|
struct normalize<bytes_ostream> {
|
|
using type = bytes;
|
|
};
|
|
|
|
template <typename T, typename U>
|
|
struct is_equivalent : std::is_same<typename normalize<std::remove_const_t<std::remove_reference_t<T>>>::type, typename normalize<std::remove_const_t <std::remove_reference_t<U>>>::type> {
|
|
};
|
|
|
|
template <typename T, typename U>
|
|
struct is_equivalent<std::reference_wrapper<T>, U> : is_equivalent<T, U> {
|
|
};
|
|
|
|
template <typename T, typename U>
|
|
struct is_equivalent<T, std::reference_wrapper<U>> : is_equivalent<T, U> {
|
|
};
|
|
|
|
template <typename T, typename U>
|
|
struct is_equivalent<std::optional<T>, std::optional<U>> : is_equivalent<T, U> {
|
|
};
|
|
|
|
template <typename T, typename U, bool>
|
|
struct is_equivalent_arity;
|
|
|
|
template <typename ...T, typename ...U>
|
|
struct is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, false> : std::false_type {
|
|
};
|
|
|
|
template <typename ...T, typename ...U>
|
|
struct is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, true> {
|
|
static constexpr bool value = (is_equivalent<T, U>::value && ...);
|
|
};
|
|
|
|
template <typename ...T, typename ...U>
|
|
struct is_equivalent<std::tuple<T...>, std::tuple<U...>> : is_equivalent_arity<std::tuple<T...>, std::tuple<U...>, sizeof...(T) == sizeof...(U)> {
|
|
};
|
|
|
|
template <typename ...T, typename ...U>
|
|
struct is_equivalent<std::variant<T...>, std::variant<U...>> : is_equivalent<std::tuple<T...>, std::tuple<U...>> {
|
|
};
|
|
|
|
// gc_clock duration values were serialized as 32-bit prior to 3.1, and
|
|
// are serialized as 64-bit in 3.1.0.
|
|
//
|
|
// TTL values are capped to 20 years, which fits into 32 bits, so
|
|
// truncation is not a concern.
|
|
|
|
inline bool gc_clock_using_3_1_0_serialization = false;
|
|
|
|
template <typename Output>
|
|
void
|
|
serialize_gc_clock_duration_value(Output& out, int64_t v) {
|
|
if (!gc_clock_using_3_1_0_serialization) {
|
|
// This should have been caught by the CQL layer, so this is just
|
|
// for extra safety.
|
|
SCYLLA_ASSERT(int32_t(v) == v);
|
|
serializer<int32_t>::write(out, v);
|
|
} else {
|
|
serializer<int64_t>::write(out, v);
|
|
}
|
|
}
|
|
|
|
template <typename Input>
|
|
int64_t
|
|
deserialize_gc_clock_duration_value(Input& in) {
|
|
if (!gc_clock_using_3_1_0_serialization) {
|
|
return serializer<int32_t>::read(in);
|
|
} else {
|
|
return serializer<int64_t>::read(in);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
/*
|
|
* Import the auto generated forward declaration code
|
|
*/
|