/* * Copyright 2016-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include "utils/assert.hh" #include "utils/managed_bytes.hh" #include "bytes_ostream.hh" #include #include #include "bytes_ostream.hh" #include "utils/fragment_range.hh" #include #include namespace ser { /// A fragmented view of an opaque buffer in a stream of serialised data /// /// This class allows reading large, fragmented blobs serialised by the IDL /// infrastructure without linearising or copying them. The view remains valid /// as long as the underlying IDL-serialised buffer is alive. /// /// Satisfies FragmentRange concept. template class buffer_view { bytes_view _first; size_t _total_size; FragmentIterator _next; public: using fragment_type = bytes_view; struct implementation { bytes_view current; FragmentIterator next; size_t size; }; class iterator { bytes_view _current; size_t _left = 0; FragmentIterator _next; public: using iterator_category = std::input_iterator_tag; using value_type = bytes_view; using pointer = const bytes_view*; using reference = const bytes_view&; using difference_type = std::ptrdiff_t; iterator() = default; iterator(bytes_view current, size_t left, FragmentIterator next) : _current(current), _left(left), _next(next) { } bytes_view operator*() const { return _current; } const bytes_view* operator->() const { return &_current; } iterator& operator++() { _left -= _current.size(); if (_left) { auto next_view = bytes_view(reinterpret_cast((*_next).begin()), (*_next).size()); auto next_size = std::min(_left, next_view.size()); _current = bytes_view(next_view.data(), next_size); ++_next; } return *this; } iterator operator++(int) { iterator it(*this); operator++(); return it; } bool operator==(const iterator& other) const { return _left == other._left; } }; using const_iterator = iterator; explicit buffer_view(bytes_view current) : _first(current), _total_size(current.size()) { } buffer_view(bytes_view current, size_t size, FragmentIterator it) : _first(current), _total_size(size), _next(it) { if (_first.size() > _total_size) { _first.remove_suffix(_first.size() - _total_size); } } explicit buffer_view(typename seastar::memory_input_stream::simple stream) : buffer_view(bytes_view(reinterpret_cast(stream.begin()), stream.size())) { } explicit buffer_view(typename seastar::memory_input_stream::fragmented stream) : buffer_view(bytes_view(reinterpret_cast(stream.first_fragment_data()), stream.first_fragment_size()), stream.size(), stream.fragment_iterator()) { } iterator begin() const { return iterator(_first, _total_size, _next); } iterator end() const { return iterator(); } size_t size_bytes() const { return _total_size; } bool empty() const { return !_total_size; } // FragmentedView implementation void remove_prefix(size_t n) { while (n >= _first.size() && n > 0) { n -= _first.size(); remove_current(); } _total_size -= n; _first.remove_prefix(n); } void remove_current() { _total_size -= _first.size(); if (_total_size) { auto next_data = reinterpret_cast((*_next).begin()); size_t next_size = std::min(_total_size, (*_next).size()); _first = bytes_view(next_data, next_size); ++_next; } else { _first = bytes_view(); } } buffer_view prefix(size_t n) const { auto tmp = *this; tmp._total_size = std::min(tmp._total_size, n); tmp._first = tmp._first.substr(0, n); return tmp; } bytes_view current_fragment() { return _first; } bytes linearize() const { bytes b(bytes::initialized_later(), size_bytes()); auto dst = b.begin(); for (bytes_view fragment : *this) { dst = std::copy(fragment.begin(), fragment.end(), dst); } return b; } template decltype(auto) with_linearized(Function&& fn) const { bytes b; bytes_view bv; if (_first.size() != _total_size) { b = linearize(); bv = b; } else { bv = _first; } return fn(bv); } implementation extract_implementation() const { return implementation { .current = _first, .next = _next, .size = _total_size, }; } }; static_assert(FragmentedView>); using size_type = uint32_t; template requires std::is_integral_v inline T deserialize_integral(Input& input) { T data; input.read(reinterpret_cast(&data), sizeof(T)); return le_to_cpu(data); } template requires std::is_integral_v inline void serialize_integral(Output& output, T data) { data = cpu_to_le(data); output.write(reinterpret_cast(&data), sizeof(T)); } template struct serializer; template struct integral_serializer { template static T read(Input& v) { return deserialize_integral(v); } template static void write(Output& out, T v) { serialize_integral(out, v); } template static void skip(Input& v) { read(v); } }; template<> struct serializer { template static bool read(Input& i) { return deserialize_integral(i); } template< typename Output> static void write(Output& out, bool v) { serialize_integral(out, uint8_t(v)); } template static void skip(Input& i) { read(i); } }; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template<> struct serializer : public integral_serializer {}; template void safe_serialize_as_uint32(Output& output, uint64_t data); template inline void serialize(Output& out, const T& v) { serializer::write(out, v); }; template inline void serialize(Output& out, const std::reference_wrapper v) { serializer::write(out, v.get()); } template inline auto deserialize(Input& in, std::type_identity t) { return serializer::read(in); } template inline void skip(Input& v, std::type_identity) { return serializer::skip(v); } template size_type get_sizeof(const T& obj); template void set_size(seastar::measuring_output_stream& os, const T& obj); template void set_size(Stream& os, const T& obj); template Buffer serialize_to_buffer(const T& v, size_t head_space = 0); template T deserialize_from_buffer(const Buffer&, std::type_identity, size_t head_space = 0); template void serialize(Output& out, const boost::variant& v); template boost::variant deserialize(Input& in, std::type_identity>); template void serialize(Output& out, const std::variant& v); template std::variant deserialize(Input& in, std::type_identity>); struct unknown_variant_type { size_type index; sstring data; }; template void serialize(Output& out, const unknown_variant_type& v); template unknown_variant_type deserialize(Input& in, std::type_identity); template struct normalize { using type = T; }; template <> struct normalize { using type = bytes; }; template <> struct normalize { using type = bytes; }; template <> struct normalize { using type = bytes; }; template struct is_equivalent : std::is_same>>::type, typename normalize>>::type> { }; template struct is_equivalent, U> : is_equivalent { }; template struct is_equivalent> : is_equivalent { }; template struct is_equivalent, std::optional> : is_equivalent { }; template struct is_equivalent_arity; template struct is_equivalent_arity, std::tuple, false> : std::false_type { }; template struct is_equivalent_arity, std::tuple, true> { static constexpr bool value = (is_equivalent::value && ...); }; template struct is_equivalent, std::tuple> : is_equivalent_arity, std::tuple, sizeof...(T) == sizeof...(U)> { }; template struct is_equivalent, std::variant> : is_equivalent, std::tuple> { }; // gc_clock duration values were serialized as 32-bit prior to 3.1, and // are serialized as 64-bit in 3.1.0. // // TTL values are capped to 20 years, which fits into 32 bits, so // truncation is not a concern. inline bool gc_clock_using_3_1_0_serialization = false; template void serialize_gc_clock_duration_value(Output& out, int64_t v) { if (!gc_clock_using_3_1_0_serialization) { // This should have been caught by the CQL layer, so this is just // for extra safety. SCYLLA_ASSERT(int32_t(v) == v); serializer::write(out, v); } else { serializer::write(out, v); } } template int64_t deserialize_gc_clock_duration_value(Input& in) { if (!gc_clock_using_3_1_0_serialization) { return serializer::read(in); } else { return serializer::read(in); } } } /* * Import the auto generated forward declaration code */