/* * Copyright 2016-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include #include "serializer.hh" #include "enum_set.hh" #include "utils/chunked_vector.hh" #include "utils/input_stream.hh" #include #include "utils/small_vector.hh" #include #include #include #include #include "utils/log.hh" namespace seastar { extern logger seastar_logger; } namespace ser { template void set_size(seastar::measuring_output_stream& os, const T& obj) { serialize(os, uint32_t(0)); } template void set_size(Stream& os, const T& obj) { serialize(os, get_sizeof(obj)); } template void safe_serialize_as_uint32(Output& out, uint64_t data) { if (data > std::numeric_limits::max()) { throw std::runtime_error("Size is too big for serialization"); } serialize(out, uint32_t(data)); } template constexpr bool can_serialize_fast() { return !std::is_same::value && std::is_integral::value && (sizeof(T) == 1 || __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__); } template struct serialize_array_helper; template struct serialize_array_helper { template static void doit(Output& out, const Container& v) { out.write(reinterpret_cast(v.data()), v.size() * sizeof(T)); } }; template struct serialize_array_helper { template static void doit(Output& out, const Container& v) { for (auto&& e : v) { serialize(out, e); } } }; template inline void serialize_array(Output& out, const Container& v) { serialize_array_helper(), T>::doit(out, v); } template struct container_traits; template struct container_traits> { struct back_emplacer { absl::btree_set& c; back_emplacer(absl::btree_set& c_) : c(c_) {} void operator()(T&& v) { c.emplace(std::move(v)); } }; }; template struct container_traits> { struct back_emplacer { std::unordered_set& c; back_emplacer(std::unordered_set& c_) : c(c_) {} void operator()(T&& v) { c.emplace(std::move(v)); } }; }; template struct container_traits> { struct back_emplacer { std::list& c; back_emplacer(std::list& c_) : c(c_) {} void operator()(T&& v) { c.emplace_back(std::move(v)); } }; void resize(std::list& c, size_t size) { c.resize(size); } }; template struct container_traits> { struct back_emplacer { std::vector& c; back_emplacer(std::vector& c_) : c(c_) {} void operator()(T&& v) { c.emplace_back(std::move(v)); } }; void resize(std::vector& c, size_t size) { c.resize(size); } }; template struct container_traits> { struct back_emplacer { utils::small_vector& c; back_emplacer(utils::small_vector& c_) : c(c_) {} void operator()(T&& v) { c.emplace_back(std::move(v)); } }; void resize(utils::small_vector& c, size_t size) { c.resize(size); } }; template struct container_traits> { struct back_emplacer { utils::chunked_vector& c; back_emplacer(utils::chunked_vector& c_) : c(c_) {} void operator()(T&& v) { c.emplace_back(std::move(v)); } }; void resize(utils::chunked_vector& c, size_t size) { c.resize(size); } }; template struct container_traits> { struct back_emplacer { std::array& c; size_t idx = 0; back_emplacer(std::array& c_) : c(c_) {} void operator()(T&& v) { c[idx++] = std::move(v); } }; void resize(std::array& c, size_t size) {} }; template struct deserialize_array_helper; template struct deserialize_array_helper { template static void doit(Input& in, Container& v, size_t sz) { container_traits t; t.resize(v, sz); in.read(reinterpret_cast(v.data()), v.size() * sizeof(T)); } template static void skip(Input& in, size_t sz) { in.skip(sz * sizeof(T)); } }; template struct deserialize_array_helper { template static void doit(Input& in, Container& v, size_t sz) { typename container_traits::back_emplacer be(v); while (sz--) { be(deserialize(in, std::type_identity())); } } template static void skip(Input& in, size_t sz) { while (sz--) { serializer::skip(in); } } }; template inline void deserialize_array(Input& in, Container& v, size_t sz) { deserialize_array_helper(), T>::doit(in, v, sz); } template inline void skip_array(Input& in, size_t sz) { deserialize_array_helper(), T>::skip(in, sz); } namespace idl::serializers::internal { template struct vector_serializer { using value_type = typename Vector::value_type; template static Vector read(Input& in) { auto sz = deserialize(in, std::type_identity()); Vector v; v.reserve(sz); deserialize_array(in, v, sz); return v; } template static void write(Output& out, const Vector& v) { safe_serialize_as_uint32(out, v.size()); serialize_array(out, v); } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); skip_array(in, sz); } }; } template struct serializer> { template static std::list read(Input& in) { auto sz = deserialize(in, std::type_identity()); std::list v; deserialize_array_helper::doit(in, v, sz); return v; } template static void write(Output& out, const std::list& v) { safe_serialize_as_uint32(out, v.size()); serialize_array_helper::doit(out, v); } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); skip_array(in, sz); } }; template struct serializer> { template static absl::btree_set read(Input& in) { auto sz = deserialize(in, std::type_identity()); absl::btree_set v; deserialize_array_helper::doit(in, v, sz); return v; } template static void write(Output& out, const absl::btree_set& v) { safe_serialize_as_uint32(out, v.size()); serialize_array_helper::doit(out, v); } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); skip_array(in, sz); } }; template struct serializer> { template static std::unordered_set read(Input& in) { auto sz = deserialize(in, std::type_identity()); std::unordered_set v; v.reserve(sz); deserialize_array_helper::doit(in, v, sz); return v; } template static void write(Output& out, const std::unordered_set& v) { safe_serialize_as_uint32(out, v.size()); serialize_array_helper::doit(out, v); } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); skip_array(in, sz); } }; template struct serializer> : idl::serializers::internal::vector_serializer> { }; template struct serializer> : idl::serializers::internal::vector_serializer> { }; template struct serializer> : idl::serializers::internal::vector_serializer> { }; template struct serializer> { template static std::chrono::duration read(Input& in) { return std::chrono::duration(deserialize(in, std::type_identity())); } template static void write(Output& out, const std::chrono::duration& d) { serialize(out, d.count()); } template static void skip(Input& in) { read(in); } }; template struct serializer> { using value_type = std::chrono::time_point; template static value_type read(Input& in) { return typename Clock::time_point(Duration(deserialize(in, std::type_identity()))); } template static void write(Output& out, const value_type& v) { serialize(out, uint64_t(v.time_since_epoch().count())); } template static void skip(Input& in) { read(in); } }; template struct serializer> { template static std::array read(Input& in) { std::array v; deserialize_array(in, v, N); return v; } template static void write(Output& out, const std::array& v) { serialize_array(out, v); } template static void skip(Input& in) { skip_array(in, N); } }; template struct serializer> { template static std::map read(Input& in) { auto sz = deserialize(in, std::type_identity()); std::map m; while (sz--) { K k = deserialize(in, std::type_identity()); V v = deserialize(in, std::type_identity()); m.emplace(k, v); } return m; } template static void write(Output& out, const std::map& v) { safe_serialize_as_uint32(out, v.size()); for (auto&& e : v) { serialize(out, e.first); serialize(out, e.second); } } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); while (sz--) { serializer::skip(in); serializer::skip(in); } } }; template struct serializer> { template static std::unordered_map read(Input& in) { auto sz = deserialize(in, std::type_identity()); std::unordered_map m; m.reserve(sz); while (sz--) { auto k = deserialize(in, std::type_identity()); auto v = deserialize(in, std::type_identity()); m.emplace(std::move(k), std::move(v)); } return m; } template static void write(Output& out, const std::unordered_map& v) { safe_serialize_as_uint32(out, v.size()); for (auto&& e : v) { serialize(out, e.first); serialize(out, e.second); } } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); while (sz--) { serializer::skip(in); serializer::skip(in); } } }; template struct serializer> { template static bool_class read(Input& in) { return bool_class(deserialize(in, std::type_identity())); } template static void write(Output& out, bool_class v) { serialize(out, bool(v)); } template static void skip(Input& in) { read(in); } }; template class deserialized_bytes_proxy { Stream _stream; template friend class deserialized_bytes_proxy; public: explicit deserialized_bytes_proxy(Stream stream) : _stream(std::move(stream)) { } template requires std::convertible_to deserialized_bytes_proxy(deserialized_bytes_proxy proxy) : _stream(std::move(proxy._stream)) { } auto view() const { if constexpr (std::is_same_v) { return bytes_view(reinterpret_cast(_stream.begin()), _stream.size()); } else { using iterator_type = typename Stream::iterator_type ; static_assert(FragmentRange>); return seastar::with_serialized_stream(_stream, seastar::make_visitor( [&] (typename seastar::memory_input_stream::simple stream) { return buffer_view(bytes_view(reinterpret_cast(stream.begin()), stream.size())); }, [&] (typename seastar::memory_input_stream::fragmented stream) { return buffer_view(bytes_view(reinterpret_cast(stream.first_fragment_data()), stream.first_fragment_size()), stream.size(), stream.fragment_iterator()); } )); } } [[gnu::always_inline]] operator bytes() && { bytes v(bytes::initialized_later(), _stream.size()); _stream.read(reinterpret_cast(v.begin()), _stream.size()); return v; } [[gnu::always_inline]] operator managed_bytes() && { managed_bytes mb(managed_bytes::initialized_later(), _stream.size()); for (bytes_mutable_view frag : fragment_range(managed_bytes_mutable_view(mb))) { _stream.read(reinterpret_cast(frag.data()), frag.size()); } return mb; } [[gnu::always_inline]] operator bytes_ostream() && { bytes_ostream v; _stream.copy_to(v); return v; } }; template<> struct serializer> { template static temporary_buffer read(Input& in) { auto sz = deserialize(in, std::type_identity()); auto v = temporary_buffer(sz); in.read(reinterpret_cast(v.get_write()), sz); return v; } template static void write(Output& out, const temporary_buffer& v) { safe_serialize_as_uint32(out, uint32_t(v.size())); out.write(reinterpret_cast(v.get()), v.size()); } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); in.skip(sz); } }; template<> struct serializer { template static deserialized_bytes_proxy read(Input& in) { auto sz = deserialize(in, std::type_identity()); return deserialized_bytes_proxy(in.read_substream(sz)); } template static void write(Output& out, bytes_view v) { safe_serialize_as_uint32(out, uint32_t(v.size())); out.write(reinterpret_cast(v.begin()), v.size()); } template static void write(Output& out, const bytes& v) { write(out, static_cast(v)); } template static void write(Output& out, const managed_bytes& mb) { safe_serialize_as_uint32(out, uint32_t(mb.size())); for (bytes_view frag : fragment_range(managed_bytes_view(mb))) { out.write(reinterpret_cast(frag.data()), frag.size()); } } template static void write(Output& out, const bytes_ostream& v) { safe_serialize_as_uint32(out, uint32_t(v.size())); for (bytes_view frag : v.fragments()) { out.write(reinterpret_cast(frag.begin()), frag.size()); } } template requires FragmentRange static void write_fragmented(Output& out, FragmentedBuffer&& fragments) { safe_serialize_as_uint32(out, uint32_t(fragments.size_bytes())); for (bytes_view frag : fragments) { out.write(reinterpret_cast(frag.begin()), frag.size()); } } template static void skip(Input& in) { auto sz = deserialize(in, std::type_identity()); in.skip(sz); } }; template void serialize(Output& out, const bytes_view& v) { serializer::write(out, v); } template void serialize(Output& out, const managed_bytes& v) { serializer::write(out, v); } template void serialize(Output& out, const bytes_ostream& v) { serializer::write(out, v); } template bytes_ostream deserialize(Input& in, std::type_identity) { return serializer::read(in); } template requires FragmentRange void serialize_fragmented(Output& out, FragmentedBuffer&& v) { serializer::write_fragmented(out, std::forward(v)); } template struct serializer> { template static std::optional read(Input& in) { std::optional v; auto b = deserialize(in, std::type_identity()); if (b) { v.emplace(deserialize(in, std::type_identity())); } return v; } template static void write(Output& out, const std::optional& v) { serialize(out, bool(v)); if (v) { serialize(out, v.value()); } } template static void skip(Input& in) { auto present = deserialize(in, std::type_identity()); if (present) { serializer::skip(in); } } }; extern logging::logger serlog; // Warning: assumes that pointer is never null template struct serializer> { template static seastar::lw_shared_ptr read(Input& in) { return seastar::make_lw_shared(deserialize(in, std::type_identity())); } template static void write(Output& out, const seastar::lw_shared_ptr& v) { if (!v) { on_internal_error(serlog, "Unexpected nullptr while serializing a pointer"); } serialize(out, *v); } template static void skip(Input& in) { serializer::skip(in); } }; template<> struct serializer { template static sstring read(Input& in) { auto sz = deserialize(in, std::type_identity()); sstring v = uninitialized_string(sz); in.read(v.data(), sz); return v; } template static void write(Output& out, const sstring& v) { safe_serialize_as_uint32(out, uint32_t(v.size())); out.write(v.data(), v.size()); } template static void skip(Input& in) { in.skip(deserialize(in, std::type_identity())); } }; template struct serializer> { template static std::unique_ptr read(Input& in) { std::unique_ptr v; auto b = deserialize(in, std::type_identity()); if (b) { v = std::make_unique(deserialize(in, std::type_identity())); } return v; } template static void write(Output& out, const std::unique_ptr& v) { serialize(out, bool(v)); if (v) { serialize(out, *v); } } template static void skip(Input& in) { auto present = deserialize(in, std::type_identity()); if (present) { serializer::skip(in); } } }; template struct serializer> { template static enum_set read(Input& in) { return enum_set::from_mask(deserialize(in, std::type_identity())); } template static void write(Output& out, enum_set v) { serialize(out, uint64_t(v.mask())); } template static void skip(Input& in) { read(in); } }; template<> struct serializer { template static std::monostate read(Input& in) { return std::monostate{}; } template static void write(Output& out, std::monostate v) {} template static void skip(Input& in) { } }; template size_type get_sizeof(const T& obj) { seastar::measuring_output_stream ms; serialize(ms, obj); auto size = ms.size(); if (size > std::numeric_limits::max()) { throw std::runtime_error("Object is too big for get_sizeof"); } return size; } template Buffer serialize_to_buffer(const T& v, size_t head_space) { seastar::measuring_output_stream measure; ser::serialize(measure, v); Buffer ret(typename Buffer::initialized_later(), measure.size() + head_space); seastar::simple_output_stream out(reinterpret_cast(ret.begin()), ret.size(), head_space); ser::serialize(out, v); return ret; } template T deserialize_from_buffer(const Buffer& buf, std::type_identity type, size_t head_space) { seastar::simple_input_stream in(reinterpret_cast(buf.begin() + head_space), buf.size() - head_space); return deserialize(in, std::move(type)); } inline utils::input_stream as_input_stream(bytes_view b) { return utils::input_stream::simple(reinterpret_cast(b.begin()), b.size()); } inline utils::input_stream as_input_stream(const bytes_ostream& b) { if (b.is_linearized()) { return as_input_stream(b.view()); } return utils::input_stream::fragmented(b.fragments().begin(), b.size()); } template inline auto as_input_stream(View v) { return fragmented_memory_input_stream(fragment_range(v).begin(), v.size_bytes()); } template void serialize(Output& out, const boost::variant& v) {} template boost::variant deserialize(Input& in, std::type_identity>) { return boost::variant(); } template void serialize(Output& out, const std::variant& v) { static_assert(std::variant_size_v> < 256); size_t type_index = v.index(); serialize(out, uint8_t(type_index)); std::visit([&out] (const auto& member) { serialize(out, member); }, v); } template T deserialize_std_variant(Input& in, std::type_identity t, size_t idx, std::index_sequence) { T v; (void)((I == idx ? v.template emplace(deserialize(in, std::type_identity>())), true : false) || ...); return v; } template std::variant deserialize(Input& in, std::type_identity> v) { size_t idx = deserialize(in, std::type_identity()); return deserialize_std_variant(in, v, idx, std::make_index_sequence()); } template void serialize(Output& out, const unknown_variant_type& v) { out.write(v.data.begin(), v.data.size()); } template unknown_variant_type deserialize(Input& in, std::type_identity) { return seastar::with_serialized_stream(in, [] (auto& in) { auto size = deserialize(in, std::type_identity()); auto index = deserialize(in, std::type_identity()); auto sz = size - sizeof(size_type) * 2; sstring v = uninitialized_string(sz); in.read(v.data(), sz); return unknown_variant_type{ index, std::move(v) }; }); } // Class for iteratively deserializing a frozen vector // using a range. // Use begin() and end() to iterate through the frozen vector, // deserializing (or skipping) one element at a time. template class vector_deserializer { public: using value_type = T; using input_stream = utils::input_stream; private: input_stream _in; size_t _size; utils::chunked_vector _substreams; void fill_substreams() requires (!IsForward) { input_stream in = _in; input_stream in2 = _in; for (size_t i = 0; i < size(); ++i) { size_t old_size = in.size(); serializer::skip(in); size_t new_size = in.size(); _substreams.push_back(in2.read_substream(old_size - new_size)); } } struct forward_iterator_data { input_stream _in = simple_input_stream(); void skip() { serializer::skip(_in); } value_type deserialize_next() { return deserialize(_in, std::type_identity()); } }; struct reverse_iterator_data { std::reverse_iterator::const_iterator> _substream_it; void skip() { ++_substream_it; } value_type deserialize_next() { input_stream is = *_substream_it; ++_substream_it; return deserialize(is, std::type_identity()); } }; public: vector_deserializer() noexcept : _in(simple_input_stream()) , _size(0) { } explicit vector_deserializer(input_stream in) : _in(std::move(in)) , _size(deserialize(_in, std::type_identity())) { if constexpr (!IsForward) { fill_substreams(); } } // Get the number of items in the vector size_t size() const noexcept { return _size; } bool empty() const noexcept { return _size == 0; } // Input iterator class iterator { // _idx is the distance from .begin(). It is used only for comparing iterators. size_t _idx = 0; bool _consumed = false; std::conditional_t _data; iterator(input_stream in, size_t idx) noexcept requires(IsForward) : _idx(idx) , _data{in} { } iterator(decltype(reverse_iterator_data::_substream_it) substreams, size_t idx) noexcept requires(!IsForward) : _idx(idx) , _data{substreams} { } friend class vector_deserializer; public: using iterator_category = std::input_iterator_tag; using value_type = T; using pointer = value_type*; using reference = value_type&; using difference_type = ssize_t; iterator() noexcept = default; bool operator==(const iterator& it) const noexcept { return _idx == it._idx; } // Deserializes and returns the item, effectively incrementing the iterator.. value_type operator*() const { auto zis = const_cast(this); zis->_idx++; zis->_consumed = true; return zis->_data.deserialize_next(); } iterator& operator++() { if (!_consumed) { _data.skip(); ++_idx; } else { _consumed = false; } return *this; } iterator operator++(int) { auto pre = *this; ++*this; return pre; } ssize_t operator-(const iterator& it) const noexcept { return _idx - it._idx; } }; using const_iterator = iterator; static_assert(std::input_iterator); static_assert(std::sentinel_for); iterator begin() noexcept requires(IsForward) { return {_in, 0}; } const_iterator begin() const noexcept requires(IsForward) { return {_in, 0}; } const_iterator cbegin() const noexcept requires(IsForward) { return {_in, 0}; } iterator end() noexcept requires(IsForward) { return {_in, _size}; } const_iterator end() const noexcept requires(IsForward) { return {_in, _size}; } const_iterator cend() const noexcept requires(IsForward) { return {_in, _size}; } iterator begin() noexcept requires(!IsForward) { return {_substreams.crbegin(), 0}; } const_iterator begin() const noexcept requires(!IsForward) { return {_substreams.crbegin(), 0}; } const_iterator cbegin() const noexcept requires(!IsForward) { return {_substreams.crbegin(), 0}; } iterator end() noexcept requires(!IsForward) { return {_substreams.crend(), _size}; } const_iterator end() const noexcept requires(!IsForward) { return {_substreams.crend(), _size}; } const_iterator cend() const noexcept requires(!IsForward) { return {_substreams.crend(), _size}; } }; static_assert(std::ranges::range>); }