logstor: rewrite segment seq num from streaming

when receiving a segment from streaming, allocate a new sequence number
for the segment and rewrite the header with the sequence number, to put
it in the same ordering of the receiving shard.
This commit is contained in:
Michael Litvak
2026-04-28 15:30:17 +02:00
parent 9e0741e5b7
commit 6e9d6f02df

View File

@@ -2196,6 +2196,51 @@ future<> segment_manager::await_pending_writes() {
class segment_data_sink_impl : public data_sink_impl {
seg_ptr _segment;
std::vector<char> _pending_data;
std::optional<size_t> _initial_header_size;
bool _header_rewritten = false;
write_buffer::buffer_header read_buffer_header() const {
simple_memory_input_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size);
return ser::deserialize(bh_stream, std::type_identity<write_buffer::buffer_header>{});
}
void maybe_parse_initial_header() {
if (_initial_header_size || _pending_data.size() < write_buffer::buffer_header_size) {
return;
}
auto bh = read_buffer_header();
if (!write_buffer::validate_header(bh)) {
throw std::runtime_error("Invalid streamed logstor buffer header");
}
size_t header_size = write_buffer::buffer_header_size;
if (bh.kind == segment_kind::full) {
header_size += write_buffer::segment_header_size;
}
_initial_header_size = header_size;
}
void rewrite_buffer_header() {
auto bh = read_buffer_header();
logstor_logger.trace("Rewriting buffer header for segment {} seq {} with seq {}", _segment->id(), bh.segment_seq, _segment->seq_num());
bh.segment_seq = _segment->seq_num();
bh.crc = bh.calculate_crc();
simple_memory_output_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size);
ser::serialize<write_buffer::buffer_header>(bh_stream, bh);
}
future<> flush_pending_data() {
if (_pending_data.empty()) {
co_return;
}
co_await _segment->append(bytes_view(reinterpret_cast<const int8_t*>(_pending_data.data()), _pending_data.size()));
_pending_data.clear();
}
public:
segment_data_sink_impl(seg_ptr segment)
: _segment(std::move(segment))
@@ -2203,11 +2248,29 @@ public:
virtual future<> put(std::span<temporary_buffer<char>> data) override {
for (auto& buf : data) {
co_await _segment->append(bytes_view((const int8_t*)buf.get(), buf.size()));
if (buf.empty()) {
continue;
}
if (_header_rewritten) {
co_await _segment->append(bytes_view(reinterpret_cast<const int8_t*>(buf.get()), buf.size()));
continue;
}
_pending_data.insert(_pending_data.end(), buf.get(), buf.get() + buf.size());
maybe_parse_initial_header();
if (_initial_header_size && _pending_data.size() >= *_initial_header_size) {
rewrite_buffer_header();
_header_rewritten = true;
co_await flush_pending_data();
}
}
}
virtual future<> close() override {
if (!_header_rewritten && !_pending_data.empty()) {
throw std::runtime_error("Truncated streamed logstor segment header");
}
co_return;
}