From 9e0741e5b71cac7adbcec151fc26e1b427cb336d Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 14 May 2026 19:21:50 +0200 Subject: [PATCH] logstor: add segment sequence number add a segment sequence number that is a global (per-shard) increasing number that is allocated when getting a new segment for write, and is written in buffer headers in the segment. this is useful when scanning a segment for ignoring buffers that belong to an older generation of that segment. it's also useful by creating a single ordering of all segments that respects the compaction ordering. on recovery, we keep the records from the newest segments among equal records. so if a record is rewritten by compaction, the recovery will choose the records from the new compacted segments. discard segments by writing zeroes to the segment header. --- docs/dev/logstor.md | 19 ++-- replica/compaction_group.hh | 2 +- replica/database.cc | 6 +- replica/database.hh | 4 +- replica/logstor/compaction.hh | 7 +- replica/logstor/logstor.cc | 2 +- replica/logstor/logstor.hh | 2 +- replica/logstor/segment_manager.cc | 144 ++++++++++++++++------------- replica/logstor/segment_manager.hh | 2 +- replica/logstor/types.hh | 33 ++++++- replica/logstor/utils.hh | 104 --------------------- replica/logstor/write_buffer.cc | 33 +++---- replica/logstor/write_buffer.hh | 32 ++++--- replica/table.cc | 4 +- 14 files changed, 169 insertions(+), 225 deletions(-) delete mode 100644 replica/logstor/utils.hh diff --git a/docs/dev/logstor.md b/docs/dev/logstor.md index c543d0b46d..e05c52e5d2 100644 --- a/docs/dev/logstor.md +++ b/docs/dev/logstor.md @@ -165,11 +165,14 @@ A serialized form of `write_buffer::buffer_header`. | Offset | Size | Field | Description | |--------|------|-------------|-------------| | 0 | 4 | `magic` | `0x4C475342` ("LGSB"). Used to detect valid buffers during recovery. | -| 4 | 4 | `data_size` | Size in bytes of all record data following the header(s). | -| 8 | 2 | `seg_gen` | Segment generation number. Incremented each time the segment slot is reused. Used during recovery to discard stale data. | -| 10 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | -| 11 | 1 | `version` | Version of the write buffer format. | -| 12 | 4 | `crc` | CRC32 of all other fields in the buffer header. Used for validating the header. | +| 4 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | +| 5 | 1 | `version` | Version of the write buffer format. | +| 6 | 2 | `reserved` | Reserved for future use. Currently written as zero and included in the CRC. | +| 8 | 8 | `segment_seq` | Monotonic segment sequence number used during recovery and segment ordering checks. | +| 16 | 4 | `data_size` | Size in bytes of all record data following the header(s). | +| 20 | 4 | `crc` | CRC32 of all preceding buffer header fields. Used for validating the header. | + +The buffer header is 24 bytes long, which keeps it aligned to `record_alignment` (8 bytes). #### Segment Header (full segments only) @@ -179,9 +182,9 @@ A serialized form of `write_buffer::segment_header`. | Offset | Size | Field | Description | |--------|------|---------------|-------------| -| 16 | 16 | `table` | UUID of the table this segment belongs to. | -| 32 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | -| 40 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | +| 24 | 16 | `table` | UUID of the table this segment belongs to. | +| 40 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | +| 48 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | #### Records diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index c1e11ee452..19a1423b60 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -317,7 +317,7 @@ public: future<> discard_logstor_segments(); - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); logstor::separator_buffer& get_separator_buffer(size_t write_size); logstor::segment_set& logstor_segments() noexcept { diff --git a/replica/database.cc b/replica/database.cc index bd4e9d3dfe..57af0e8da3 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -16,6 +16,8 @@ #include "locator/network_topology_strategy.hh" #include "locator/tablets.hh" #include "locator/token_metadata_fwd.hh" +#include "replica/logstor/compaction.hh" +#include "replica/logstor/types.hh" #include "utils/log.hh" #include "replica/database_fwd.hh" #include @@ -955,7 +957,7 @@ database::init_logstor() { trigger_logstor_compaction(false); }); - _logstor->set_trigger_separator_flush_hook([this] (size_t seq_num) { + _logstor->set_trigger_separator_flush_hook([this] (logstor::segment_sequence seq_num) { (void)flush_logstor_separator(seq_num); }); @@ -2955,7 +2957,7 @@ future<> database::flush_logstor_separator_on_all_shards(sharded& shar }); } -future<> database::flush_logstor_separator(std::optional seq_num) { +future<> database::flush_logstor_separator(std::optional seq_num) { return _tables_metadata.parallel_for_each_table([seq_num] (table_id, lw_shared_ptr table) { return table->flush_separator(seq_num); }); diff --git a/replica/database.hh b/replica/database.hh index d1a0d7fb28..83c3ac195b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1179,7 +1179,7 @@ public: return _logstor->get_compaction_manager(); } - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); future get_logstor_segment_stats() const; @@ -2069,7 +2069,7 @@ public: static future<> trigger_logstor_compaction_on_all_shards(sharded& sharded_db, bool major); void trigger_logstor_compaction(bool major); static future<> flush_logstor_separator_on_all_shards(sharded& sharded_db); - future<> flush_logstor_separator(std::optional seq_num = std::nullopt); + future<> flush_logstor_separator(std::optional seq_num = std::nullopt); future get_logstor_table_segment_stats(table_id table) const; size_t get_logstor_memory_usage() const; diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index a3884c6e46..2b5d31c12b 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -33,7 +33,6 @@ struct segment_descriptor : public log_heap_hook> pending_updates; utils::chunked_vector held_segments; - std::optional min_seq_num; + std::optional min_seq_num; bool flushed{false}; separator_buffer(write_buffer* wb) diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 51164bacc9..e6eb1121a0 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -310,7 +310,7 @@ void logstor::set_trigger_compaction_hook(std::function fn) { _segment_manager.set_trigger_compaction_hook(std::move(fn)); } -void logstor::set_trigger_separator_flush_hook(std::function fn) { +void logstor::set_trigger_separator_flush_hook(std::function fn) { _segment_manager.set_trigger_separator_flush_hook(std::move(fn)); } diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index f9f032911a..ba7e7ba300 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -74,7 +74,7 @@ public: tracing::trace_state_ptr trace_state = nullptr); void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); }; } // namespace logstor diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index e003ac9405..05cbaebbae 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -69,6 +69,7 @@ protected: class writeable_segment : public segment { seastar::gate _write_gate; segment_ref _seg_ref; + segment_sequence _seq_num; uint32_t _current_offset = 0; // next offset for write @@ -77,7 +78,7 @@ class writeable_segment : public segment { public: using segment::segment; - void start(segment_ref seg_ref); + void start(segment_ref, segment_sequence); future<> stop(); @@ -100,6 +101,10 @@ public: segment_ref ref() { return _seg_ref; } + + segment_sequence seq_num() const noexcept { + return _seq_num; + } }; segment::segment(log_segment_id id, seastar::file file, uint64_t file_offset, size_t max_size) @@ -132,8 +137,9 @@ future segment::read(log_location loc) { }); } -void writeable_segment::start(segment_ref seg_ref) { +void writeable_segment::start(segment_ref seg_ref, segment_sequence seq_num) { _seg_ref = std::move(seg_ref); + _seq_num = seq_num; } future<> writeable_segment::stop() { @@ -197,6 +203,9 @@ class file_manager { std::vector _open_read_files; + std::unique_ptr _zero_buf; + size_t _zero_buf_size{0}; + public: file_manager(segment_manager_config cfg) : _segments_per_file(cfg.file_size / cfg.segment_size) @@ -237,6 +246,9 @@ public: future<> file_manager::start() { co_await seastar::recursive_touch_directory(_base_dir.string()); + _zero_buf_size = 128 * 1024; + _zero_buf = allocate_aligned_buffer(_zero_buf_size, 4096); + std::memset(_zero_buf.get(), 0, _zero_buf_size); } future<> file_manager::stop() { @@ -326,19 +338,13 @@ future file_manager::get_file_for_read(size_t file_id) { } future<> file_manager::format_file_region(seastar::file file, uint64_t offset, size_t size) { - // Allocate aligned buffer for zeroing - const auto write_alignment = file.disk_write_dma_alignment(); - size_t buf_size = align_up(128 * 1024, size_t(write_alignment)); - auto zero_buf = allocate_aligned_buffer(buf_size, write_alignment); - std::memset(zero_buf.get(), 0, buf_size); - - // Write zeros to entire region + // Write zeros to entire region using the pre-allocated zero buffer size_t remaining = size; uint64_t current_offset = offset; while (remaining > 0) { - auto write_size = std::min(remaining, buf_size); - auto written = co_await file.dma_write(current_offset, zero_buf.get(), write_size); + auto write_size = std::min(remaining, _zero_buf_size); + auto written = co_await file.dma_write(current_offset, _zero_buf.get(), write_size); current_offset += written; remaining -= written; @@ -554,7 +560,7 @@ public: struct segment_header { segment_kind kind; - segment_generation seg_gen; + segment_sequence segment_seq; struct mixed {}; struct full { @@ -569,7 +575,7 @@ struct segment_header { static segment_header make_segment_header(const write_buffer::buffer_header& bh, std::optional sh) { segment_header seg_hdr { .kind = bh.kind, - .seg_gen = bh.seg_gen, + .segment_seq = bh.segment_seq, }; switch (bh.kind) { @@ -621,7 +627,7 @@ class segment_manager_impl { seastar::semaphore _active_segment_write_sem{1}; segment_pool _segment_pool; std::optional> _switch_segment_fut; - size_t _segment_seq_num{0}; + segment_sequence _next_segment_seq{1}; seastar::gate _async_gate; future<> _reserve_replenisher{make_ready_future<>()}; @@ -639,7 +645,7 @@ class segment_manager_impl { std::vector _available_separator_buffers; std::function _trigger_compaction_fn; - std::function _trigger_separator_flush_fn; + std::function _trigger_separator_flush_fn; utils::phased_barrier _writes_phaser{"logstor_sm_writes"}; @@ -682,7 +688,7 @@ public: } future<> load_segment(replica::database&, log_segment_id); - future<> recover_segment(replica::database&, log_segment_id); + future<> recover_segment(replica::database&, log_segment_id, primary_index::entry_cmp_fn cmp, std::function on_header); future> read_segment_header(log_segment_id); future<> add_segment_to_compaction_group(replica::database&, segment_descriptor&); @@ -692,7 +698,7 @@ public: } } - void trigger_separator_flush(size_t seq) { + void trigger_separator_flush(segment_sequence seq) { if (_trigger_separator_flush_fn) { _trigger_separator_flush_fn(seq); } @@ -710,7 +716,7 @@ public: _trigger_compaction_fn = std::move(fn); } - void set_trigger_separator_flush_hook(std::function fn) { + void set_trigger_separator_flush_hook(std::function fn) { _trigger_separator_flush_fn = std::move(fn); } @@ -740,9 +746,12 @@ private: future<> request_segment_switch(); future<> switch_active_segment(); + segment_sequence allocate_segment_seq() noexcept { + return _next_segment_seq++; + } std::chrono::microseconds calculate_separator_delay() const; - future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num); + future<> write_to_separator(write_buffer&, segment_ref, segment_sequence); void write_to_separator(table&, log_location prev_loc, log_record, segment_ref); // Sequentially scans one segment and invokes callbacks for the decoded @@ -777,7 +786,7 @@ private: future get_segment(write_source src) { seg_ptr seg = co_await _segment_pool.get_segment(src); - seg->start(make_segment_ref(seg->id())); + seg->start(make_segment_ref(seg->id()), allocate_segment_seq()); _stats.segments_in_use++; co_return seg; } @@ -991,7 +1000,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ptr seg = _active_segment; auto seg_holder = seg->hold(); auto seg_ref = seg->ref(); - auto segment_seq_num = _segment_seq_num; + auto seq_num = seg->seq_num(); auto write_op = _writes_phaser.start(); auto& desc = get_segment_descriptor(seg->id()); @@ -1000,7 +1009,9 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ref.set_flush_failure(); }); - wb.write_header(desc.seg_gen, std::nullopt); + logstor_logger.trace("Write active segment {} seq {}", seg->id(), seq_num); + + wb.write_header(seq_num, std::nullopt); auto loc = co_await seg->append(data); sem_units.return_all(); @@ -1015,7 +1026,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { co_await wb.complete_writes(loc); co_await with_scheduling_group(_cfg.separator_sg, [&] { - return write_to_separator(wb, std::move(seg_ref), segment_seq_num); + return write_to_separator(wb, std::move(seg_ref), seq_num); }); write_to_separator_failed.cancel(); } @@ -1040,9 +1051,9 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g auto seg = co_await get_segment(source); auto& desc = get_segment_descriptor(seg->id()); - logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source)); + logstor_logger.trace("Write full segment {} seq {} from {}", seg->id(), seg->seq_num(), write_source_to_string(source)); - wb.write_header(desc.seg_gen, cg.schema()->id()); + wb.write_header(seg->seq_num(), cg.schema()->id()); auto loc = co_await seg->append(data); @@ -1096,7 +1107,6 @@ future<> segment_manager_impl::switch_active_segment() { auto new_seg = co_await get_segment(write_source::normal_write); auto old_seg = std::exchange(_active_segment, std::move(new_seg)); - _segment_seq_num++; if (old_seg) { // close old segment in background @@ -1107,11 +1117,11 @@ future<> segment_manager_impl::switch_active_segment() { // trigger separator flush for separator buffers that hold old segments auto u = std::max(1, _max_segments / 100); - if (_segment_seq_num % u == 0 && _segment_seq_num > 5*u) { - trigger_separator_flush(_segment_seq_num - 5*u); + if (_next_segment_seq.value % u == 0 && _next_segment_seq.value > 5*u) { + trigger_separator_flush(segment_sequence(_next_segment_seq.value - 5*u)); } - logstor_logger.trace("Switched active segment to {}", _active_segment->id()); + logstor_logger.trace("Switched active segment to {} seq {}", _active_segment->id(), _active_segment->seq_num()); } future<> segment_manager_impl::replenish_reserve() { @@ -1193,10 +1203,6 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept { if (desc.ref_count != 0) { on_internal_error(logstor_logger, format("Freeing segment {} with non-zero reference count", segment_id)); } - desc.on_free_segment(); - - // TODO write new generation? - _free_segments.push_back(segment_id); _segment_freed_cv.signal(); @@ -1224,22 +1230,12 @@ future<> segment_manager_impl::discard_segments(segment_set& ss) { segments.push_back(seg_id); } + // Invalidate the first header block so recovery treats the slot as empty. co_await max_concurrent_for_each(segments, 32, [this] (log_segment_id seg_id) -> future<> { - // Write a valid empty segment header with the next generation. - // This marks the segment as discarded while preserving the generation counter. - auto next_gen = get_segment_descriptor(seg_id).seg_gen; - ++next_gen; - auto buf = allocate_aligned_buffer(block_alignment, 4096); - std::memset(buf.get(), 0, block_alignment); - simple_memory_output_stream out(buf.get(), write_buffer::buffer_header_size); - write_buffer::write_empty_header(out, next_gen); - + logstor_logger.trace("Discard segment {}", seg_id); auto [file_id, file_offset] = segment_id_to_file_location(seg_id); auto file = co_await _file_mgr.get_file_for_write(file_id); - co_await file.dma_write(file_offset, buf.get(), block_alignment); - - logstor_logger.trace("Discard segment {} next gen {}", seg_id, next_gen); - + co_await _file_mgr.format_file_region(file, file_offset, block_alignment); free_segment(seg_id); }); } @@ -1258,7 +1254,7 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, .read_ahead = 1, }); size_t current_position = 0; - auto seg_gen = get_segment_descriptor(segment_id).seg_gen; + std::optional segment_seq; logstor_logger.trace("Reading records from segment {} at file {} offset {}", segment_id, file_id, file_offset); @@ -1288,7 +1284,9 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, break; } - if (bh.seg_gen != seg_gen) { + if (!segment_seq) { + segment_seq = bh.segment_seq; + } else if (bh.segment_seq != *segment_seq) { break; } @@ -1755,7 +1753,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() { return separator_buffer(wb); } -future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) { +future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, segment_sequence segment_seq_num) { for (auto&& w : wb.records()) { co_await coroutine::maybe_yield(); @@ -1891,18 +1889,39 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { next_file_id++; } - // populate index from all segments. keep the latest record for each key. + size_t allocated_segment_count = next_file_id * _segments_per_file; + + std::vector segment_seqs(allocated_segment_count, segment_sequence(0)); + segment_sequence max_segment_seq = segment_sequence(0); + + // Populate the index from all segments. Keep the latest record for each key. + // For equal records, keep the one from the segment with the highest sequence number. + auto cmp_with_seq = [&segment_seqs] (const index_entry& old_entry, const index_entry& candidate) -> std::strong_ordering { + if (auto c = primary_index::default_entry_cmp(old_entry, candidate); c != 0) { + return c; + } + const auto old_seq = segment_seqs[old_entry.location.segment.value]; + const auto new_seq = segment_seqs[candidate.location.segment.value]; + if (auto c = old_seq <=> new_seq; c != 0) { + return c; + } + return old_entry.location.offset <=> candidate.location.offset; + }; + for (auto file_id : found_file_ids) { logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size()); co_await max_concurrent_for_each(segments_in_file(file_id), 32, - [this, &db] (log_segment_id seg_id) { - return recover_segment(db, seg_id); + [this, &db, &cmp_with_seq, &segment_seqs, &max_segment_seq] (log_segment_id seg_id) { + return recover_segment(db, seg_id, cmp_with_seq, + [seg_id, &segment_seqs, &max_segment_seq] (const segment_header& seg_hdr) { + segment_seqs[seg_id.value] = seg_hdr.segment_seq; + max_segment_seq = std::max(max_segment_seq, seg_hdr.segment_seq); + }); } ); } // go over the index and mark all segments that have live data as used. - size_t allocated_segment_count = next_file_id * _segments_per_file; utils::dynamic_bitset used_segments(allocated_segment_count); co_await db.get_tables_metadata().for_each_table_gently([&] (table_id tid, lw_shared_ptr
tp) -> future<> { @@ -1922,9 +1941,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { for (size_t seg_idx = 0; seg_idx < allocated_segment_count; ++seg_idx) { co_await coroutine::maybe_yield(); log_segment_id seg_id(seg_idx); - auto& desc = get_segment_descriptor(seg_id); if (!used_segments.test(seg_idx)) { - desc.on_free_segment(); _free_segments.push_back(seg_id); free_segment_count++; } else { @@ -1950,22 +1967,25 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { } _next_new_segment_id = allocated_segment_count; + _next_segment_seq = max_segment_seq + 1; _file_mgr.recover_next_file_id(next_file_id); logstor_logger.info("Recovery complete"); } -future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id) { +future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id, + primary_index::entry_cmp_fn cmp, std::function on_header) { auto& desc = get_segment_descriptor(segment_id); desc.reset(_cfg.segment_size); co_await scan_segment(segment_id, - [segment_id] (const segment_header& seg_hdr) { - logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen); + [segment_id, on_header = std::move(on_header)] (const segment_header& seg_hdr) mutable { + logstor_logger.trace("Recovering segment {} with sequence {}", segment_id, seg_hdr.segment_seq); + on_header(seg_hdr); return make_ready_future<>(); }, - [this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data { + [this, &desc, &db, &cmp] (log_location loc, const log_record_header& header) -> want_data { logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp); index_entry new_entry { @@ -1978,7 +1998,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen if (!t.uses_logstor()) { return want_data::no; } - auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry); + auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry, cmp); if (inserted) { desc.on_write(loc); if (prev_entry) { @@ -2146,7 +2166,7 @@ void segment_manager::set_trigger_compaction_hook(std::function fn) { _impl->set_trigger_compaction_hook(std::move(fn)); } -void segment_manager::set_trigger_separator_flush_hook(std::function fn) { +void segment_manager::set_trigger_separator_flush_hook(std::function fn) { _impl->set_trigger_separator_flush_hook(std::move(fn)); } @@ -2231,7 +2251,7 @@ public: future<> segment_manager_impl::load_segment(replica::database& db, log_segment_id seg_id) { // read the segment and populate the index - co_await recover_segment(db, seg_id); + co_await recover_segment(db, seg_id, primary_index::default_entry_cmp, [] (const segment_header&) {}); auto& desc = get_segment_descriptor(seg_id); co_await add_segment_to_compaction_group(db, desc); diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 52adb12533..17a486ee62 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -129,7 +129,7 @@ public: const compaction_manager& get_compaction_manager() const noexcept; void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); size_t get_segment_size() const noexcept; diff --git a/replica/logstor/types.hh b/replica/logstor/types.hh index ffe202c4ae..44b64cd96b 100644 --- a/replica/logstor/types.hh +++ b/replica/logstor/types.hh @@ -9,7 +9,6 @@ #include #include -#include "replica/logstor/utils.hh" #include "dht/decorated_key.hh" #include "mutation/canonical_mutation.hh" #include "mutation/timestamp.hh" @@ -35,8 +34,6 @@ struct primary_index_key { dht::decorated_key dk; }; -using segment_generation = generation_base; - struct index_entry { log_location location; api::timestamp_type timestamp; @@ -55,6 +52,28 @@ struct log_record { canonical_mutation mut; }; +struct segment_sequence { + uint64_t value; + + bool operator==(const segment_sequence& other) const noexcept = default; + auto operator<=>(const segment_sequence& other) const noexcept = default; + + segment_sequence& operator++() noexcept { + ++value; + return *this; + } + + segment_sequence operator++(int) noexcept { + segment_sequence tmp = *this; + ++value; + return tmp; + } + + segment_sequence operator+(uint64_t increment) const noexcept { + return segment_sequence{value + increment}; + } +}; + } // Format specialization declarations and implementations @@ -82,3 +101,11 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(const replica::logstor::segment_sequence& seq, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "sseq({})", seq.value); + } +}; diff --git a/replica/logstor/utils.hh b/replica/logstor/utils.hh deleted file mode 100644 index 72b10bfa94..0000000000 --- a/replica/logstor/utils.hh +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) 2026-present ScyllaDB - */ - -/* - * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 - */ -#pragma once - -#include -#include "serializer.hh" - -namespace replica::logstor { - -// an unsigned integer that can be incremented and compared with wraparound semantics -template -class generation_base { - T _value; - -public: - - using underlying = T; - - constexpr generation_base() noexcept : _value(0) {} - constexpr explicit generation_base(T value) noexcept : _value(value) {} - - constexpr T value() const noexcept { return _value; } - - constexpr generation_base& operator++() noexcept { - ++_value; - return *this; - } - - constexpr generation_base operator++(int) noexcept { - auto old = *this; - ++_value; - return old; - } - - constexpr generation_base& operator+=(T delta) noexcept { - _value += delta; - return *this; - } - - constexpr generation_base operator+(T delta) const noexcept { - return generation_base(_value + delta); - } - - constexpr bool operator==(const generation_base& other) const noexcept = default; - - /// Comparison using wraparound semantics. - /// Returns true if this generation is less than other, accounting for wraparound. - /// Assumes generations are within half the value space of each other. - constexpr bool operator<(const generation_base& other) const noexcept { - // Use signed comparison after converting difference to signed type - // This handles wraparound: if diff > max/2, it's treated as negative - using signed_type = std::make_signed_t; - auto diff = static_cast(_value - other._value); - return diff < 0; - } - - constexpr bool operator<=(const generation_base& other) const noexcept { - return *this == other || *this < other; - } - - constexpr bool operator>(const generation_base& other) const noexcept { - return other < *this; - } - - constexpr bool operator>=(const generation_base& other) const noexcept { - return other <= *this; - } -}; - -} - -template -struct fmt::formatter> : fmt::formatter { - template - auto format(const replica::logstor::generation_base& gen, FormatContext& ctx) const { - return fmt::formatter::format(gen.value(), ctx); - } -}; - -namespace ser { - -template -struct serializer> { - template - static void write(Output& out, const replica::logstor::generation_base& g) { - serializer::underlying>::write(out, g.value()); - } - template - static replica::logstor::generation_base read(Input& in) { - auto val = serializer::underlying>::read(in); - return replica::logstor::generation_base(val); - } - template - static void skip(Input& in) { - serializer::underlying>::skip(in); - } -}; - -} diff --git a/replica/logstor/write_buffer.cc b/replica/logstor/write_buffer.cc index b10844f1d7..987859642c 100644 --- a/replica/logstor/write_buffer.cc +++ b/replica/logstor/write_buffer.cc @@ -164,11 +164,12 @@ void write_buffer::finalize(size_t alignment) { pad_to_alignment(alignment); } -void write_buffer::write_header(segment_generation seg_gen, std::optional table) { +void write_buffer::write_header(segment_sequence segment_seq, std::optional table) { _buffer_header.magic = buffer_header_magic; - _buffer_header.seg_gen = seg_gen; _buffer_header.kind = _segment_kind; _buffer_header.version = current_version; + _buffer_header.reserved = 0; + _buffer_header.segment_seq = segment_seq; _buffer_header.crc = _buffer_header.calculate_crc(); @@ -185,25 +186,16 @@ void write_buffer::write_header(segment_generation seg_gen, std::optional(out, hdr); -} - bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { if (bh.magic != write_buffer::buffer_header_magic) { return false; } - if (bh.calculate_crc() != bh.crc) { + switch (bh.kind) { + case segment_kind::mixed: + case segment_kind::full: + break; + default: return false; } @@ -211,6 +203,10 @@ bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { return false; } + if (bh.calculate_crc() != bh.crc) { + return false; + } + return true; } @@ -248,10 +244,11 @@ size_t write_buffer::estimate_required_segments(size_t net_data_size, size_t rec uint32_t write_buffer::buffer_header::calculate_crc() const { utils::crc32 c; c.process_le(magic); - c.process_le(data_size); - c.process_le(seg_gen.value()); c.process_le(static_cast(kind)); c.process_le(version); + c.process_le(reserved); + c.process_le(segment_seq.value); + c.process_le(data_size); return c.get(); } diff --git a/replica/logstor/write_buffer.hh b/replica/logstor/write_buffer.hh index 6004988021..9840079560 100644 --- a/replica/logstor/write_buffer.hh +++ b/replica/logstor/write_buffer.hh @@ -114,19 +114,22 @@ public: struct buffer_header { uint32_t magic; - uint32_t data_size; // size of all records data following the buffer_header - segment_generation seg_gen; segment_kind kind; uint8_t version; + uint16_t reserved; + segment_sequence segment_seq; + uint32_t data_size; // size of all records data following the header(s) uint32_t crc; uint32_t calculate_crc() const; }; static constexpr size_t buffer_header_size = - 2 * sizeof(uint32_t) - + sizeof(segment_generation::underlying) + sizeof(uint32_t) + sizeof(std::underlying_type_t) + sizeof(uint8_t) + + sizeof(uint16_t) + + sizeof(segment_sequence) + + sizeof(uint32_t) + sizeof(uint32_t); static_assert(buffer_header_size % record_alignment == 0, "Buffer header size must be aligned by record_alignment"); @@ -234,8 +237,6 @@ public: return s; } - static void write_empty_header(ostream& out, segment_generation seg_gen); - static bool validate_header(const buffer_header& bh); private: @@ -243,7 +244,7 @@ private: const char* data() const noexcept { return _buffer.get(); } // table is set for segment_kind::full - void write_header(segment_generation seg_gen, std::optional table); + void write_header(segment_sequence segment_seq, std::optional table); // get all write records in the buffer. // with_record_copy must be to true when creating the write_buffer. @@ -329,30 +330,33 @@ struct serializer { template static void write(Output& out, const replica::logstor::write_buffer::buffer_header& h) { serializer::write(out, h.magic); - serializer::write(out, h.data_size); - serializer::write(out, h.seg_gen); serializer::write(out, static_cast(h.kind)); serializer::write(out, h.version); + serializer::write(out, h.reserved); + serializer::write(out, h.segment_seq.value); + serializer::write(out, h.data_size); serializer::write(out, h.crc); } template static replica::logstor::write_buffer::buffer_header read(Input& in) { replica::logstor::write_buffer::buffer_header h; h.magic = serializer::read(in); - h.data_size = serializer::read(in); - h.seg_gen = serializer::read(in); h.kind = static_cast(serializer::read(in)); h.version = serializer::read(in); + h.reserved = serializer::read(in); + h.segment_seq = replica::logstor::segment_sequence{serializer::read(in)}; + h.data_size = serializer::read(in); h.crc = serializer::read(in); return h; } template static void skip(Input& in) { serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); serializer::skip(in); - serializer::skip(in); - serializer::skip(in); - serializer::skip(in); serializer::skip(in); } }; diff --git a/replica/table.cc b/replica/table.cc index 4bf591fc92..e80d84e5a5 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1113,7 +1113,7 @@ future<> compaction_group::discard_logstor_segments() { co_await sm.discard_segments(*_logstor_segments); } -future<> compaction_group::flush_separator(std::optional seq_num) { +future<> compaction_group::flush_separator(std::optional seq_num) { auto units = co_await get_units(_separator_flush_sem, 1); auto pending = std::exchange(_separator_flushes, {}); if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) { @@ -2611,7 +2611,7 @@ void table::try_trigger_compaction(compaction_group& cg) noexcept { } } -future<> table::flush_separator(std::optional seq_num) { +future<> table::flush_separator(std::optional seq_num) { if (!uses_logstor()) { co_return; }