/* * Copyright 2016-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include "utils/assert.hh" #include #include "commitlog_types.hh" #include "mutation/frozen_mutation.hh" #include "schema/schema_fwd.hh" #include "replay_position.hh" namespace detail { using buffer_type = fragmented_temporary_buffer; using base_iterator = typename std::vector>::const_iterator; static constexpr auto sector_overhead_size = sizeof(uint32_t) + sizeof(db::segment_id_type); // iterator adaptor to enable splitting normal // frag-buffer temporary buffer objects into // sub-disk-page sized chunks. class sector_split_iterator { base_iterator _iter, _end; char* _ptr; size_t _size; size_t _sector_size; public: sector_split_iterator(const sector_split_iterator&) noexcept; sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size); sector_split_iterator(base_iterator i, base_iterator e, size_t sector_size, size_t overhead); sector_split_iterator(); char* get_write() const { return _ptr; } size_t size() const { return _size; } char* begin() { return _ptr; } char* end() { return _ptr + _size; } const char* begin() const { return _ptr; } const char* end() const { return _ptr + _size; } bool operator==(const sector_split_iterator& rhs) const { return _iter == rhs._iter && _ptr == rhs._ptr; } auto& operator*() const { return *this; } auto* operator->() const { return this; } sector_split_iterator& operator++(); sector_split_iterator operator++(int); }; } class commitlog_entry { std::optional _mapping; frozen_mutation _mutation; public: commitlog_entry(std::optional mapping, frozen_mutation&& mutation) : _mapping(std::move(mapping)), _mutation(std::move(mutation)) { } const std::optional& mapping() const { return _mapping; } const frozen_mutation& mutation() const & { return _mutation; } frozen_mutation&& mutation() && { return std::move(_mutation); } }; class commitlog_entry_writer { public: using force_sync = db::commitlog_force_sync; private: schema_ptr _schema; const frozen_mutation& _mutation; bool _with_schema = true; size_t _size = std::numeric_limits::max(); force_sync _sync; private: template void serialize(Output&) const; void compute_size(); public: commitlog_entry_writer(schema_ptr s, const frozen_mutation& fm, force_sync sync) : _schema(std::move(s)), _mutation(fm), _sync(sync) {} void set_with_schema(bool value) { if (std::exchange(_with_schema, value) != value || _size == std::numeric_limits::max()) { compute_size(); } } bool with_schema() const { return _with_schema; } schema_ptr schema() const { return _schema; } size_t size() const { SCYLLA_ASSERT(_size != std::numeric_limits::max()); return _size; } size_t mutation_size() const { return _mutation.representation().size(); } force_sync sync() const { return _sync; } using ostream = typename seastar::memory_output_stream; void write(ostream& out) const; }; class commitlog_entry_reader { commitlog_entry _ce; public: commitlog_entry_reader(const fragmented_temporary_buffer& buffer); const std::optional& get_column_mapping() const { return _ce.mapping(); } const frozen_mutation& mutation() const & { return _ce.mutation(); } frozen_mutation&& mutation() && { return std::move(_ce).mutation(); } };