From 6e9d6f02dfdffed410ff493b63f81a248df6901c Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 28 Apr 2026 15:30:17 +0200 Subject: [PATCH] logstor: rewrite segment seq num from streaming when receiving a segment from streaming, allocate a new sequence number for the segment and rewrite the header with the sequence number, to put it in the same ordering of the receiving shard. --- replica/logstor/segment_manager.cc | 65 +++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 05cbaebbae..14fa533f7f 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -2196,6 +2196,51 @@ future<> segment_manager::await_pending_writes() { class segment_data_sink_impl : public data_sink_impl { seg_ptr _segment; + std::vector _pending_data; + std::optional _initial_header_size; + bool _header_rewritten = false; + + write_buffer::buffer_header read_buffer_header() const { + simple_memory_input_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + return ser::deserialize(bh_stream, std::type_identity{}); + } + + void maybe_parse_initial_header() { + if (_initial_header_size || _pending_data.size() < write_buffer::buffer_header_size) { + return; + } + + auto bh = read_buffer_header(); + if (!write_buffer::validate_header(bh)) { + throw std::runtime_error("Invalid streamed logstor buffer header"); + } + + size_t header_size = write_buffer::buffer_header_size; + if (bh.kind == segment_kind::full) { + header_size += write_buffer::segment_header_size; + } + _initial_header_size = header_size; + } + + void rewrite_buffer_header() { + auto bh = read_buffer_header(); + + logstor_logger.trace("Rewriting buffer header for segment {} seq {} with seq {}", _segment->id(), bh.segment_seq, _segment->seq_num()); + + bh.segment_seq = _segment->seq_num(); + bh.crc = bh.calculate_crc(); + + simple_memory_output_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + ser::serialize(bh_stream, bh); + } + + future<> flush_pending_data() { + if (_pending_data.empty()) { + co_return; + } + co_await _segment->append(bytes_view(reinterpret_cast(_pending_data.data()), _pending_data.size())); + _pending_data.clear(); + } public: segment_data_sink_impl(seg_ptr segment) : _segment(std::move(segment)) @@ -2203,11 +2248,29 @@ public: virtual future<> put(std::span> data) override { for (auto& buf : data) { - co_await _segment->append(bytes_view((const int8_t*)buf.get(), buf.size())); + if (buf.empty()) { + continue; + } + + if (_header_rewritten) { + co_await _segment->append(bytes_view(reinterpret_cast(buf.get()), buf.size())); + continue; + } + + _pending_data.insert(_pending_data.end(), buf.get(), buf.get() + buf.size()); + maybe_parse_initial_header(); + if (_initial_header_size && _pending_data.size() >= *_initial_header_size) { + rewrite_buffer_header(); + _header_rewritten = true; + co_await flush_pending_data(); + } } } virtual future<> close() override { + if (!_header_rewritten && !_pending_data.empty()) { + throw std::runtime_error("Truncated streamed logstor segment header"); + } co_return; }