From c18d616f6465e13f361991c83cf5eba85d5abc2f Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 14 May 2026 20:39:40 +0200 Subject: [PATCH 1/5] logstor: compare records by timestamp Add the record timestamp. The timestamp is extracted from the row marker of the mutation when we write it. When inserting a record to index, we compare it with the existing record, and insert it only if it has newer timestamp. --- docs/dev/logstor.md | 2 +- idl/logstor.idl.hh | 3 ++- replica/logstor/index.hh | 25 +++++++----------- replica/logstor/logstor.cc | 42 +++++++++++++++++++++--------- replica/logstor/segment_manager.cc | 23 +++++++--------- replica/logstor/types.hh | 8 +++--- test/cluster/test_logstor.py | 7 +++++ 7 files changed, 62 insertions(+), 48 deletions(-) diff --git a/docs/dev/logstor.md b/docs/dev/logstor.md index a26811c32a..c543d0b46d 100644 --- a/docs/dev/logstor.md +++ b/docs/dev/logstor.md @@ -205,7 +205,7 @@ zero_padding -- to align to record_alignment (8 bytes) The `header_size` bytes immediately following the record header are the IDL-serialized form of `log_record_header`, which contains: - `key`: the partition key (`primary_index_key`), including a `decorated_key` with a token and partition key bytes. -- `generation`: a 16-bit write generation number, used during recovery to resolve conflicts when the same key appears in multiple segments. +- `timestamp`: the timestamp of the record, used to resolve conflicts by keeping the record with the latest timestamp. - `table`: UUID of the table this record belongs to. **Mutation Data**: diff --git a/idl/logstor.idl.hh b/idl/logstor.idl.hh index ac8642f3b0..48d45ffc68 100644 --- a/idl/logstor.idl.hh +++ b/idl/logstor.idl.hh @@ -8,6 +8,7 @@ #include "idl/frozen_schema.idl.hh" #include "idl/token.idl.hh" +#include "mutation/timestamp.hh" namespace replica { namespace logstor { @@ -18,7 +19,7 @@ struct primary_index_key { class log_record_header { replica::logstor::primary_index_key key; - replica::logstor::record_generation generation; + api::timestamp_type timestamp; table_id table; }; diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index 7c3d4e69a7..d24989db57 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -14,6 +14,7 @@ #include "utils/bptree.hh" #include "utils/double-decker.hh" #include "utils/phased_barrier.hh" +#include namespace replica::logstor { @@ -100,20 +101,6 @@ public: } } - std::optional exchange(const primary_index_key& key, index_entry new_entry) { - partitions_type::bound_hint hint; - auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); - if (hint.match) { - auto old_entry = i->_e; - i->_e = std::move(new_entry); - return old_entry; - } else { - _partitions.emplace_before(i, key.dk.token().raw(), hint, key.dk, std::move(new_entry)); - ++_key_count; - return std::nullopt; - } - } - bool update_record_location(const primary_index_key& key, log_location old_location, log_location new_location) { auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema)); if (it != _partitions.end()) { @@ -125,11 +112,17 @@ public: return false; } - std::pair> insert_if_newer(const primary_index_key& key, index_entry new_entry, bool prefer_on_tie) { + using entry_cmp_fn = std::function; + + static std::strong_ordering default_entry_cmp(const index_entry& a, const index_entry& b) noexcept { + return a.timestamp <=> b.timestamp; + } + + std::pair> insert(const primary_index_key& key, index_entry new_entry, entry_cmp_fn cmp = default_entry_cmp) { partitions_type::bound_hint hint; auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); if (hint.match) { - if (i->_e.generation < new_entry.generation || (i->_e.generation == new_entry.generation && prefer_on_tie)) { + if (cmp(i->_e, new_entry) <= 0) { auto old_entry = i->_e; i->_e = std::move(new_entry); return {true, std::make_optional(old_entry)}; diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 180c61434c..51164bacc9 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -21,6 +21,25 @@ namespace replica::logstor { seastar::logger logstor_logger("logstor"); +static api::timestamp_type extract_logstor_record_timestamp(const mutation& m) { + const auto& partition = m.partition(); + + for (const auto& row_entry : partition.clustered_rows()) { + if (row_entry.dummy()) { + continue; + } + if (!row_entry.row().marker().is_missing()) { + return row_entry.row().marker().timestamp(); + } + } + + if (const auto partition_tombstone = partition.partition_tombstone(); partition_tombstone) { + return partition_tombstone.timestamp; + } + + throw std::runtime_error("logstor mutation has no row marker or partition tombstone timestamp"); +} + logstor::logstor(logstor_config config) : _segment_manager(config.segment_manager_cfg) , _write_buffer(_segment_manager, config.flush_sg) { @@ -73,33 +92,32 @@ future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate:: table_id table = m.schema()->id(); auto& index = cg.get_logstor_index(); - // TODO ? - record_generation gen = index.get(key) - .transform([](const index_entry& entry) { - return entry.generation + 1; - }).value_or(record_generation(1)); + const auto ts = extract_logstor_record_timestamp(m); log_record record { .header = { .key = key, - .generation = gen, + .timestamp = ts, .table = table, }, .mut = canonical_mutation(m) }; - return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, gen, key = std::move(key)] + return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, ts, key = std::move(key)] (log_location location, seastar::gate::holder op) { index_entry new_entry { .location = location, - .generation = gen, + .timestamp = ts, }; - auto old_entry = index.exchange(key, std::move(new_entry)); + auto [inserted, prev_entry] = index.insert(key, std::move(new_entry)); - // If overwriting, free old record - if (old_entry) { - _segment_manager.free_record(old_entry->location); + if (!inserted) { + // A newer entry already exists; free the record we just wrote. + _segment_manager.free_record(location); + } else if (prev_entry) { + // Overwrote an older entry; free it. + _segment_manager.free_record(prev_entry->location); } }).handle_exception([] (std::exception_ptr ep) { logstor_logger.error("Error writing mutation: {}", ep); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 4b2995710a..e161d12615 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -1956,22 +1956,17 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen auto& desc = get_segment_descriptor(segment_id); desc.reset(_cfg.segment_size); - auto seg_hdr = co_await read_segment_header(segment_id); - if (!seg_hdr) { - logstor_logger.trace("Segment {} has invalid header, skipping", segment_id); - co_return; - } - desc.seg_gen = seg_hdr->seg_gen; - bool is_full_segment = seg_hdr->kind == segment_kind::full; - logstor_logger.trace("Recovering segment {} with generation {}", segment_id, desc.seg_gen); - - co_await for_each_record(segment_id, - [this, &desc, &db, is_full_segment] (log_location loc, const log_record_header& header) -> want_data { - logstor_logger.trace("Recovery: read record at {} gen {}", loc, header.generation); + co_await scan_segment(segment_id, + [segment_id] (const segment_header& seg_hdr) { + logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen); + return make_ready_future<>(); + }, + [this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data { + logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp); index_entry new_entry { .location = loc, - .generation = header.generation + .timestamp = header.timestamp }; try { @@ -1979,7 +1974,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen if (!t.uses_logstor()) { return want_data::no; } - auto [inserted, prev_entry] = t.logstor_index().insert_if_newer(header.key, new_entry, is_full_segment); + auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry); if (inserted) { desc.on_write(loc); if (prev_entry) { diff --git a/replica/logstor/types.hh b/replica/logstor/types.hh index 7f7b3a2c65..ffe202c4ae 100644 --- a/replica/logstor/types.hh +++ b/replica/logstor/types.hh @@ -9,9 +9,10 @@ #include #include -#include "mutation/canonical_mutation.hh" #include "replica/logstor/utils.hh" #include "dht/decorated_key.hh" +#include "mutation/canonical_mutation.hh" +#include "mutation/timestamp.hh" namespace replica::logstor { @@ -34,19 +35,18 @@ struct primary_index_key { dht::decorated_key dk; }; -using record_generation = generation_base; using segment_generation = generation_base; struct index_entry { log_location location; - record_generation generation; + api::timestamp_type timestamp; bool operator==(const index_entry& other) const noexcept = default; }; struct log_record_header { primary_index_key key; - record_generation generation; + api::timestamp_type timestamp; table_id table; }; diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 4f3d48a748..1dba454999 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -86,6 +86,13 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 2 assert rows[0].v == 150 + # test conflict resolution by timestamp + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000") + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 3") + assert rows[0].pk == 3 + assert rows[0].v == 300 + # test frozen map value await cql.run_async(f"CREATE TABLE {ks}.test_map (pk int PRIMARY KEY, v frozen>) WITH storage_engine = 'logstor'") From 652d1164ddcd200c34462f89cd3a1b1969ab547b Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 14 May 2026 19:21:43 +0200 Subject: [PATCH 2/5] logstor: get_segment helper add get_segment helper for use in all places where we get a new segment for write. it takes care of initializing the segment and updating stats in a consistent way. it always creates a segment_ref that is stored in the segment ptr and holds it alive while the ptr exists, and frees it automatically if the ptr is destroyed and there are no other refereces. in normal flows after we get the segment we add it to a compaction group that takes ownership of the segment and increases the ref count, so it won't be freed by the segment_ref. however this is useful if an error occurs before we pass the ownership, and will prevent the leak of the segment. --- replica/logstor/segment_manager.cc | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index e161d12615..e003ac9405 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -775,6 +775,13 @@ private: ); } + future get_segment(write_source src) { + seg_ptr seg = co_await _segment_pool.get_segment(src); + seg->start(make_segment_ref(seg->id())); + _stats.segments_in_use++; + co_return seg; + } + void free_segment(log_segment_id) noexcept; segment_descriptor& get_segment_descriptor(log_segment_id segment_id) { @@ -1030,10 +1037,9 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g throw std::runtime_error(fmt::format("Write size {} exceeds segment size {}", data.size(), _cfg.segment_size)); } - seg_ptr seg = co_await _segment_pool.get_segment(source); + auto seg = co_await get_segment(source); auto& desc = get_segment_descriptor(seg->id()); - _stats.segments_in_use++; logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source)); wb.write_header(desc.seg_gen, cg.schema()->id()); @@ -1087,10 +1093,9 @@ future<> segment_manager_impl::request_segment_switch() { future<> segment_manager_impl::switch_active_segment() { auto holder = _async_gate.hold(); - auto new_seg = co_await _segment_pool.get_segment(write_source::normal_write); + auto new_seg = co_await get_segment(write_source::normal_write); auto old_seg = std::exchange(_active_segment, std::move(new_seg)); - _stats.segments_in_use++; _segment_seq_num++; if (old_seg) { @@ -1106,7 +1111,6 @@ future<> segment_manager_impl::switch_active_segment() { trigger_separator_flush(_segment_seq_num - 5*u); } - _active_segment->start(make_segment_ref(_active_segment->id())); logstor_logger.trace("Switched active segment to {}", _active_segment->id()); } @@ -2221,7 +2225,6 @@ public: co_await _sm.load_segment(_db, _seg->id()); } future<> abort() override { - _sm.free_segment(_seg->id()); co_return; } }; @@ -2235,8 +2238,7 @@ future<> segment_manager_impl::load_segment(replica::database& db, log_segment_i } future> segment_manager_impl::create_segment_output_stream(replica::database& db) { - auto seg = co_await _segment_pool.get_segment(write_source::streaming); - _stats.segments_in_use++; + auto seg = co_await get_segment(write_source::streaming); co_return std::make_unique(*this, db, std::move(seg)); } From 9e0741e5b71cac7adbcec151fc26e1b427cb336d Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 14 May 2026 19:21:50 +0200 Subject: [PATCH 3/5] logstor: add segment sequence number add a segment sequence number that is a global (per-shard) increasing number that is allocated when getting a new segment for write, and is written in buffer headers in the segment. this is useful when scanning a segment for ignoring buffers that belong to an older generation of that segment. it's also useful by creating a single ordering of all segments that respects the compaction ordering. on recovery, we keep the records from the newest segments among equal records. so if a record is rewritten by compaction, the recovery will choose the records from the new compacted segments. discard segments by writing zeroes to the segment header. --- docs/dev/logstor.md | 19 ++-- replica/compaction_group.hh | 2 +- replica/database.cc | 6 +- replica/database.hh | 4 +- replica/logstor/compaction.hh | 7 +- replica/logstor/logstor.cc | 2 +- replica/logstor/logstor.hh | 2 +- replica/logstor/segment_manager.cc | 144 ++++++++++++++++------------- replica/logstor/segment_manager.hh | 2 +- replica/logstor/types.hh | 33 ++++++- replica/logstor/utils.hh | 104 --------------------- replica/logstor/write_buffer.cc | 33 +++---- replica/logstor/write_buffer.hh | 32 ++++--- replica/table.cc | 4 +- 14 files changed, 169 insertions(+), 225 deletions(-) delete mode 100644 replica/logstor/utils.hh diff --git a/docs/dev/logstor.md b/docs/dev/logstor.md index c543d0b46d..e05c52e5d2 100644 --- a/docs/dev/logstor.md +++ b/docs/dev/logstor.md @@ -165,11 +165,14 @@ A serialized form of `write_buffer::buffer_header`. | Offset | Size | Field | Description | |--------|------|-------------|-------------| | 0 | 4 | `magic` | `0x4C475342` ("LGSB"). Used to detect valid buffers during recovery. | -| 4 | 4 | `data_size` | Size in bytes of all record data following the header(s). | -| 8 | 2 | `seg_gen` | Segment generation number. Incremented each time the segment slot is reused. Used during recovery to discard stale data. | -| 10 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | -| 11 | 1 | `version` | Version of the write buffer format. | -| 12 | 4 | `crc` | CRC32 of all other fields in the buffer header. Used for validating the header. | +| 4 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | +| 5 | 1 | `version` | Version of the write buffer format. | +| 6 | 2 | `reserved` | Reserved for future use. Currently written as zero and included in the CRC. | +| 8 | 8 | `segment_seq` | Monotonic segment sequence number used during recovery and segment ordering checks. | +| 16 | 4 | `data_size` | Size in bytes of all record data following the header(s). | +| 20 | 4 | `crc` | CRC32 of all preceding buffer header fields. Used for validating the header. | + +The buffer header is 24 bytes long, which keeps it aligned to `record_alignment` (8 bytes). #### Segment Header (full segments only) @@ -179,9 +182,9 @@ A serialized form of `write_buffer::segment_header`. | Offset | Size | Field | Description | |--------|------|---------------|-------------| -| 16 | 16 | `table` | UUID of the table this segment belongs to. | -| 32 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | -| 40 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | +| 24 | 16 | `table` | UUID of the table this segment belongs to. | +| 40 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | +| 48 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | #### Records diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index c1e11ee452..19a1423b60 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -317,7 +317,7 @@ public: future<> discard_logstor_segments(); - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); logstor::separator_buffer& get_separator_buffer(size_t write_size); logstor::segment_set& logstor_segments() noexcept { diff --git a/replica/database.cc b/replica/database.cc index bd4e9d3dfe..57af0e8da3 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -16,6 +16,8 @@ #include "locator/network_topology_strategy.hh" #include "locator/tablets.hh" #include "locator/token_metadata_fwd.hh" +#include "replica/logstor/compaction.hh" +#include "replica/logstor/types.hh" #include "utils/log.hh" #include "replica/database_fwd.hh" #include @@ -955,7 +957,7 @@ database::init_logstor() { trigger_logstor_compaction(false); }); - _logstor->set_trigger_separator_flush_hook([this] (size_t seq_num) { + _logstor->set_trigger_separator_flush_hook([this] (logstor::segment_sequence seq_num) { (void)flush_logstor_separator(seq_num); }); @@ -2955,7 +2957,7 @@ future<> database::flush_logstor_separator_on_all_shards(sharded& shar }); } -future<> database::flush_logstor_separator(std::optional seq_num) { +future<> database::flush_logstor_separator(std::optional seq_num) { return _tables_metadata.parallel_for_each_table([seq_num] (table_id, lw_shared_ptr table) { return table->flush_separator(seq_num); }); diff --git a/replica/database.hh b/replica/database.hh index d1a0d7fb28..83c3ac195b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1179,7 +1179,7 @@ public: return _logstor->get_compaction_manager(); } - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); future get_logstor_segment_stats() const; @@ -2069,7 +2069,7 @@ public: static future<> trigger_logstor_compaction_on_all_shards(sharded& sharded_db, bool major); void trigger_logstor_compaction(bool major); static future<> flush_logstor_separator_on_all_shards(sharded& sharded_db); - future<> flush_logstor_separator(std::optional seq_num = std::nullopt); + future<> flush_logstor_separator(std::optional seq_num = std::nullopt); future get_logstor_table_segment_stats(table_id table) const; size_t get_logstor_memory_usage() const; diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index a3884c6e46..2b5d31c12b 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -33,7 +33,6 @@ struct segment_descriptor : public log_heap_hook> pending_updates; utils::chunked_vector held_segments; - std::optional min_seq_num; + std::optional min_seq_num; bool flushed{false}; separator_buffer(write_buffer* wb) diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 51164bacc9..e6eb1121a0 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -310,7 +310,7 @@ void logstor::set_trigger_compaction_hook(std::function fn) { _segment_manager.set_trigger_compaction_hook(std::move(fn)); } -void logstor::set_trigger_separator_flush_hook(std::function fn) { +void logstor::set_trigger_separator_flush_hook(std::function fn) { _segment_manager.set_trigger_separator_flush_hook(std::move(fn)); } diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index f9f032911a..ba7e7ba300 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -74,7 +74,7 @@ public: tracing::trace_state_ptr trace_state = nullptr); void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); }; } // namespace logstor diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index e003ac9405..05cbaebbae 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -69,6 +69,7 @@ protected: class writeable_segment : public segment { seastar::gate _write_gate; segment_ref _seg_ref; + segment_sequence _seq_num; uint32_t _current_offset = 0; // next offset for write @@ -77,7 +78,7 @@ class writeable_segment : public segment { public: using segment::segment; - void start(segment_ref seg_ref); + void start(segment_ref, segment_sequence); future<> stop(); @@ -100,6 +101,10 @@ public: segment_ref ref() { return _seg_ref; } + + segment_sequence seq_num() const noexcept { + return _seq_num; + } }; segment::segment(log_segment_id id, seastar::file file, uint64_t file_offset, size_t max_size) @@ -132,8 +137,9 @@ future segment::read(log_location loc) { }); } -void writeable_segment::start(segment_ref seg_ref) { +void writeable_segment::start(segment_ref seg_ref, segment_sequence seq_num) { _seg_ref = std::move(seg_ref); + _seq_num = seq_num; } future<> writeable_segment::stop() { @@ -197,6 +203,9 @@ class file_manager { std::vector _open_read_files; + std::unique_ptr _zero_buf; + size_t _zero_buf_size{0}; + public: file_manager(segment_manager_config cfg) : _segments_per_file(cfg.file_size / cfg.segment_size) @@ -237,6 +246,9 @@ public: future<> file_manager::start() { co_await seastar::recursive_touch_directory(_base_dir.string()); + _zero_buf_size = 128 * 1024; + _zero_buf = allocate_aligned_buffer(_zero_buf_size, 4096); + std::memset(_zero_buf.get(), 0, _zero_buf_size); } future<> file_manager::stop() { @@ -326,19 +338,13 @@ future file_manager::get_file_for_read(size_t file_id) { } future<> file_manager::format_file_region(seastar::file file, uint64_t offset, size_t size) { - // Allocate aligned buffer for zeroing - const auto write_alignment = file.disk_write_dma_alignment(); - size_t buf_size = align_up(128 * 1024, size_t(write_alignment)); - auto zero_buf = allocate_aligned_buffer(buf_size, write_alignment); - std::memset(zero_buf.get(), 0, buf_size); - - // Write zeros to entire region + // Write zeros to entire region using the pre-allocated zero buffer size_t remaining = size; uint64_t current_offset = offset; while (remaining > 0) { - auto write_size = std::min(remaining, buf_size); - auto written = co_await file.dma_write(current_offset, zero_buf.get(), write_size); + auto write_size = std::min(remaining, _zero_buf_size); + auto written = co_await file.dma_write(current_offset, _zero_buf.get(), write_size); current_offset += written; remaining -= written; @@ -554,7 +560,7 @@ public: struct segment_header { segment_kind kind; - segment_generation seg_gen; + segment_sequence segment_seq; struct mixed {}; struct full { @@ -569,7 +575,7 @@ struct segment_header { static segment_header make_segment_header(const write_buffer::buffer_header& bh, std::optional sh) { segment_header seg_hdr { .kind = bh.kind, - .seg_gen = bh.seg_gen, + .segment_seq = bh.segment_seq, }; switch (bh.kind) { @@ -621,7 +627,7 @@ class segment_manager_impl { seastar::semaphore _active_segment_write_sem{1}; segment_pool _segment_pool; std::optional> _switch_segment_fut; - size_t _segment_seq_num{0}; + segment_sequence _next_segment_seq{1}; seastar::gate _async_gate; future<> _reserve_replenisher{make_ready_future<>()}; @@ -639,7 +645,7 @@ class segment_manager_impl { std::vector _available_separator_buffers; std::function _trigger_compaction_fn; - std::function _trigger_separator_flush_fn; + std::function _trigger_separator_flush_fn; utils::phased_barrier _writes_phaser{"logstor_sm_writes"}; @@ -682,7 +688,7 @@ public: } future<> load_segment(replica::database&, log_segment_id); - future<> recover_segment(replica::database&, log_segment_id); + future<> recover_segment(replica::database&, log_segment_id, primary_index::entry_cmp_fn cmp, std::function on_header); future> read_segment_header(log_segment_id); future<> add_segment_to_compaction_group(replica::database&, segment_descriptor&); @@ -692,7 +698,7 @@ public: } } - void trigger_separator_flush(size_t seq) { + void trigger_separator_flush(segment_sequence seq) { if (_trigger_separator_flush_fn) { _trigger_separator_flush_fn(seq); } @@ -710,7 +716,7 @@ public: _trigger_compaction_fn = std::move(fn); } - void set_trigger_separator_flush_hook(std::function fn) { + void set_trigger_separator_flush_hook(std::function fn) { _trigger_separator_flush_fn = std::move(fn); } @@ -740,9 +746,12 @@ private: future<> request_segment_switch(); future<> switch_active_segment(); + segment_sequence allocate_segment_seq() noexcept { + return _next_segment_seq++; + } std::chrono::microseconds calculate_separator_delay() const; - future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num); + future<> write_to_separator(write_buffer&, segment_ref, segment_sequence); void write_to_separator(table&, log_location prev_loc, log_record, segment_ref); // Sequentially scans one segment and invokes callbacks for the decoded @@ -777,7 +786,7 @@ private: future get_segment(write_source src) { seg_ptr seg = co_await _segment_pool.get_segment(src); - seg->start(make_segment_ref(seg->id())); + seg->start(make_segment_ref(seg->id()), allocate_segment_seq()); _stats.segments_in_use++; co_return seg; } @@ -991,7 +1000,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ptr seg = _active_segment; auto seg_holder = seg->hold(); auto seg_ref = seg->ref(); - auto segment_seq_num = _segment_seq_num; + auto seq_num = seg->seq_num(); auto write_op = _writes_phaser.start(); auto& desc = get_segment_descriptor(seg->id()); @@ -1000,7 +1009,9 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ref.set_flush_failure(); }); - wb.write_header(desc.seg_gen, std::nullopt); + logstor_logger.trace("Write active segment {} seq {}", seg->id(), seq_num); + + wb.write_header(seq_num, std::nullopt); auto loc = co_await seg->append(data); sem_units.return_all(); @@ -1015,7 +1026,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { co_await wb.complete_writes(loc); co_await with_scheduling_group(_cfg.separator_sg, [&] { - return write_to_separator(wb, std::move(seg_ref), segment_seq_num); + return write_to_separator(wb, std::move(seg_ref), seq_num); }); write_to_separator_failed.cancel(); } @@ -1040,9 +1051,9 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g auto seg = co_await get_segment(source); auto& desc = get_segment_descriptor(seg->id()); - logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source)); + logstor_logger.trace("Write full segment {} seq {} from {}", seg->id(), seg->seq_num(), write_source_to_string(source)); - wb.write_header(desc.seg_gen, cg.schema()->id()); + wb.write_header(seg->seq_num(), cg.schema()->id()); auto loc = co_await seg->append(data); @@ -1096,7 +1107,6 @@ future<> segment_manager_impl::switch_active_segment() { auto new_seg = co_await get_segment(write_source::normal_write); auto old_seg = std::exchange(_active_segment, std::move(new_seg)); - _segment_seq_num++; if (old_seg) { // close old segment in background @@ -1107,11 +1117,11 @@ future<> segment_manager_impl::switch_active_segment() { // trigger separator flush for separator buffers that hold old segments auto u = std::max(1, _max_segments / 100); - if (_segment_seq_num % u == 0 && _segment_seq_num > 5*u) { - trigger_separator_flush(_segment_seq_num - 5*u); + if (_next_segment_seq.value % u == 0 && _next_segment_seq.value > 5*u) { + trigger_separator_flush(segment_sequence(_next_segment_seq.value - 5*u)); } - logstor_logger.trace("Switched active segment to {}", _active_segment->id()); + logstor_logger.trace("Switched active segment to {} seq {}", _active_segment->id(), _active_segment->seq_num()); } future<> segment_manager_impl::replenish_reserve() { @@ -1193,10 +1203,6 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept { if (desc.ref_count != 0) { on_internal_error(logstor_logger, format("Freeing segment {} with non-zero reference count", segment_id)); } - desc.on_free_segment(); - - // TODO write new generation? - _free_segments.push_back(segment_id); _segment_freed_cv.signal(); @@ -1224,22 +1230,12 @@ future<> segment_manager_impl::discard_segments(segment_set& ss) { segments.push_back(seg_id); } + // Invalidate the first header block so recovery treats the slot as empty. co_await max_concurrent_for_each(segments, 32, [this] (log_segment_id seg_id) -> future<> { - // Write a valid empty segment header with the next generation. - // This marks the segment as discarded while preserving the generation counter. - auto next_gen = get_segment_descriptor(seg_id).seg_gen; - ++next_gen; - auto buf = allocate_aligned_buffer(block_alignment, 4096); - std::memset(buf.get(), 0, block_alignment); - simple_memory_output_stream out(buf.get(), write_buffer::buffer_header_size); - write_buffer::write_empty_header(out, next_gen); - + logstor_logger.trace("Discard segment {}", seg_id); auto [file_id, file_offset] = segment_id_to_file_location(seg_id); auto file = co_await _file_mgr.get_file_for_write(file_id); - co_await file.dma_write(file_offset, buf.get(), block_alignment); - - logstor_logger.trace("Discard segment {} next gen {}", seg_id, next_gen); - + co_await _file_mgr.format_file_region(file, file_offset, block_alignment); free_segment(seg_id); }); } @@ -1258,7 +1254,7 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, .read_ahead = 1, }); size_t current_position = 0; - auto seg_gen = get_segment_descriptor(segment_id).seg_gen; + std::optional segment_seq; logstor_logger.trace("Reading records from segment {} at file {} offset {}", segment_id, file_id, file_offset); @@ -1288,7 +1284,9 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, break; } - if (bh.seg_gen != seg_gen) { + if (!segment_seq) { + segment_seq = bh.segment_seq; + } else if (bh.segment_seq != *segment_seq) { break; } @@ -1755,7 +1753,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() { return separator_buffer(wb); } -future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) { +future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, segment_sequence segment_seq_num) { for (auto&& w : wb.records()) { co_await coroutine::maybe_yield(); @@ -1891,18 +1889,39 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { next_file_id++; } - // populate index from all segments. keep the latest record for each key. + size_t allocated_segment_count = next_file_id * _segments_per_file; + + std::vector segment_seqs(allocated_segment_count, segment_sequence(0)); + segment_sequence max_segment_seq = segment_sequence(0); + + // Populate the index from all segments. Keep the latest record for each key. + // For equal records, keep the one from the segment with the highest sequence number. + auto cmp_with_seq = [&segment_seqs] (const index_entry& old_entry, const index_entry& candidate) -> std::strong_ordering { + if (auto c = primary_index::default_entry_cmp(old_entry, candidate); c != 0) { + return c; + } + const auto old_seq = segment_seqs[old_entry.location.segment.value]; + const auto new_seq = segment_seqs[candidate.location.segment.value]; + if (auto c = old_seq <=> new_seq; c != 0) { + return c; + } + return old_entry.location.offset <=> candidate.location.offset; + }; + for (auto file_id : found_file_ids) { logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size()); co_await max_concurrent_for_each(segments_in_file(file_id), 32, - [this, &db] (log_segment_id seg_id) { - return recover_segment(db, seg_id); + [this, &db, &cmp_with_seq, &segment_seqs, &max_segment_seq] (log_segment_id seg_id) { + return recover_segment(db, seg_id, cmp_with_seq, + [seg_id, &segment_seqs, &max_segment_seq] (const segment_header& seg_hdr) { + segment_seqs[seg_id.value] = seg_hdr.segment_seq; + max_segment_seq = std::max(max_segment_seq, seg_hdr.segment_seq); + }); } ); } // go over the index and mark all segments that have live data as used. - size_t allocated_segment_count = next_file_id * _segments_per_file; utils::dynamic_bitset used_segments(allocated_segment_count); co_await db.get_tables_metadata().for_each_table_gently([&] (table_id tid, lw_shared_ptr
tp) -> future<> { @@ -1922,9 +1941,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { for (size_t seg_idx = 0; seg_idx < allocated_segment_count; ++seg_idx) { co_await coroutine::maybe_yield(); log_segment_id seg_id(seg_idx); - auto& desc = get_segment_descriptor(seg_id); if (!used_segments.test(seg_idx)) { - desc.on_free_segment(); _free_segments.push_back(seg_id); free_segment_count++; } else { @@ -1950,22 +1967,25 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { } _next_new_segment_id = allocated_segment_count; + _next_segment_seq = max_segment_seq + 1; _file_mgr.recover_next_file_id(next_file_id); logstor_logger.info("Recovery complete"); } -future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id) { +future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id, + primary_index::entry_cmp_fn cmp, std::function on_header) { auto& desc = get_segment_descriptor(segment_id); desc.reset(_cfg.segment_size); co_await scan_segment(segment_id, - [segment_id] (const segment_header& seg_hdr) { - logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen); + [segment_id, on_header = std::move(on_header)] (const segment_header& seg_hdr) mutable { + logstor_logger.trace("Recovering segment {} with sequence {}", segment_id, seg_hdr.segment_seq); + on_header(seg_hdr); return make_ready_future<>(); }, - [this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data { + [this, &desc, &db, &cmp] (log_location loc, const log_record_header& header) -> want_data { logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp); index_entry new_entry { @@ -1978,7 +1998,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen if (!t.uses_logstor()) { return want_data::no; } - auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry); + auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry, cmp); if (inserted) { desc.on_write(loc); if (prev_entry) { @@ -2146,7 +2166,7 @@ void segment_manager::set_trigger_compaction_hook(std::function fn) { _impl->set_trigger_compaction_hook(std::move(fn)); } -void segment_manager::set_trigger_separator_flush_hook(std::function fn) { +void segment_manager::set_trigger_separator_flush_hook(std::function fn) { _impl->set_trigger_separator_flush_hook(std::move(fn)); } @@ -2231,7 +2251,7 @@ public: future<> segment_manager_impl::load_segment(replica::database& db, log_segment_id seg_id) { // read the segment and populate the index - co_await recover_segment(db, seg_id); + co_await recover_segment(db, seg_id, primary_index::default_entry_cmp, [] (const segment_header&) {}); auto& desc = get_segment_descriptor(seg_id); co_await add_segment_to_compaction_group(db, desc); diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 52adb12533..17a486ee62 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -129,7 +129,7 @@ public: const compaction_manager& get_compaction_manager() const noexcept; void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); size_t get_segment_size() const noexcept; diff --git a/replica/logstor/types.hh b/replica/logstor/types.hh index ffe202c4ae..44b64cd96b 100644 --- a/replica/logstor/types.hh +++ b/replica/logstor/types.hh @@ -9,7 +9,6 @@ #include #include -#include "replica/logstor/utils.hh" #include "dht/decorated_key.hh" #include "mutation/canonical_mutation.hh" #include "mutation/timestamp.hh" @@ -35,8 +34,6 @@ struct primary_index_key { dht::decorated_key dk; }; -using segment_generation = generation_base; - struct index_entry { log_location location; api::timestamp_type timestamp; @@ -55,6 +52,28 @@ struct log_record { canonical_mutation mut; }; +struct segment_sequence { + uint64_t value; + + bool operator==(const segment_sequence& other) const noexcept = default; + auto operator<=>(const segment_sequence& other) const noexcept = default; + + segment_sequence& operator++() noexcept { + ++value; + return *this; + } + + segment_sequence operator++(int) noexcept { + segment_sequence tmp = *this; + ++value; + return tmp; + } + + segment_sequence operator+(uint64_t increment) const noexcept { + return segment_sequence{value + increment}; + } +}; + } // Format specialization declarations and implementations @@ -82,3 +101,11 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(const replica::logstor::segment_sequence& seq, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "sseq({})", seq.value); + } +}; diff --git a/replica/logstor/utils.hh b/replica/logstor/utils.hh deleted file mode 100644 index 72b10bfa94..0000000000 --- a/replica/logstor/utils.hh +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) 2026-present ScyllaDB - */ - -/* - * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 - */ -#pragma once - -#include -#include "serializer.hh" - -namespace replica::logstor { - -// an unsigned integer that can be incremented and compared with wraparound semantics -template -class generation_base { - T _value; - -public: - - using underlying = T; - - constexpr generation_base() noexcept : _value(0) {} - constexpr explicit generation_base(T value) noexcept : _value(value) {} - - constexpr T value() const noexcept { return _value; } - - constexpr generation_base& operator++() noexcept { - ++_value; - return *this; - } - - constexpr generation_base operator++(int) noexcept { - auto old = *this; - ++_value; - return old; - } - - constexpr generation_base& operator+=(T delta) noexcept { - _value += delta; - return *this; - } - - constexpr generation_base operator+(T delta) const noexcept { - return generation_base(_value + delta); - } - - constexpr bool operator==(const generation_base& other) const noexcept = default; - - /// Comparison using wraparound semantics. - /// Returns true if this generation is less than other, accounting for wraparound. - /// Assumes generations are within half the value space of each other. - constexpr bool operator<(const generation_base& other) const noexcept { - // Use signed comparison after converting difference to signed type - // This handles wraparound: if diff > max/2, it's treated as negative - using signed_type = std::make_signed_t; - auto diff = static_cast(_value - other._value); - return diff < 0; - } - - constexpr bool operator<=(const generation_base& other) const noexcept { - return *this == other || *this < other; - } - - constexpr bool operator>(const generation_base& other) const noexcept { - return other < *this; - } - - constexpr bool operator>=(const generation_base& other) const noexcept { - return other <= *this; - } -}; - -} - -template -struct fmt::formatter> : fmt::formatter { - template - auto format(const replica::logstor::generation_base& gen, FormatContext& ctx) const { - return fmt::formatter::format(gen.value(), ctx); - } -}; - -namespace ser { - -template -struct serializer> { - template - static void write(Output& out, const replica::logstor::generation_base& g) { - serializer::underlying>::write(out, g.value()); - } - template - static replica::logstor::generation_base read(Input& in) { - auto val = serializer::underlying>::read(in); - return replica::logstor::generation_base(val); - } - template - static void skip(Input& in) { - serializer::underlying>::skip(in); - } -}; - -} diff --git a/replica/logstor/write_buffer.cc b/replica/logstor/write_buffer.cc index b10844f1d7..987859642c 100644 --- a/replica/logstor/write_buffer.cc +++ b/replica/logstor/write_buffer.cc @@ -164,11 +164,12 @@ void write_buffer::finalize(size_t alignment) { pad_to_alignment(alignment); } -void write_buffer::write_header(segment_generation seg_gen, std::optional table) { +void write_buffer::write_header(segment_sequence segment_seq, std::optional table) { _buffer_header.magic = buffer_header_magic; - _buffer_header.seg_gen = seg_gen; _buffer_header.kind = _segment_kind; _buffer_header.version = current_version; + _buffer_header.reserved = 0; + _buffer_header.segment_seq = segment_seq; _buffer_header.crc = _buffer_header.calculate_crc(); @@ -185,25 +186,16 @@ void write_buffer::write_header(segment_generation seg_gen, std::optional(out, hdr); -} - bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { if (bh.magic != write_buffer::buffer_header_magic) { return false; } - if (bh.calculate_crc() != bh.crc) { + switch (bh.kind) { + case segment_kind::mixed: + case segment_kind::full: + break; + default: return false; } @@ -211,6 +203,10 @@ bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { return false; } + if (bh.calculate_crc() != bh.crc) { + return false; + } + return true; } @@ -248,10 +244,11 @@ size_t write_buffer::estimate_required_segments(size_t net_data_size, size_t rec uint32_t write_buffer::buffer_header::calculate_crc() const { utils::crc32 c; c.process_le(magic); - c.process_le(data_size); - c.process_le(seg_gen.value()); c.process_le(static_cast(kind)); c.process_le(version); + c.process_le(reserved); + c.process_le(segment_seq.value); + c.process_le(data_size); return c.get(); } diff --git a/replica/logstor/write_buffer.hh b/replica/logstor/write_buffer.hh index 6004988021..9840079560 100644 --- a/replica/logstor/write_buffer.hh +++ b/replica/logstor/write_buffer.hh @@ -114,19 +114,22 @@ public: struct buffer_header { uint32_t magic; - uint32_t data_size; // size of all records data following the buffer_header - segment_generation seg_gen; segment_kind kind; uint8_t version; + uint16_t reserved; + segment_sequence segment_seq; + uint32_t data_size; // size of all records data following the header(s) uint32_t crc; uint32_t calculate_crc() const; }; static constexpr size_t buffer_header_size = - 2 * sizeof(uint32_t) - + sizeof(segment_generation::underlying) + sizeof(uint32_t) + sizeof(std::underlying_type_t) + sizeof(uint8_t) + + sizeof(uint16_t) + + sizeof(segment_sequence) + + sizeof(uint32_t) + sizeof(uint32_t); static_assert(buffer_header_size % record_alignment == 0, "Buffer header size must be aligned by record_alignment"); @@ -234,8 +237,6 @@ public: return s; } - static void write_empty_header(ostream& out, segment_generation seg_gen); - static bool validate_header(const buffer_header& bh); private: @@ -243,7 +244,7 @@ private: const char* data() const noexcept { return _buffer.get(); } // table is set for segment_kind::full - void write_header(segment_generation seg_gen, std::optional table); + void write_header(segment_sequence segment_seq, std::optional table); // get all write records in the buffer. // with_record_copy must be to true when creating the write_buffer. @@ -329,30 +330,33 @@ struct serializer { template static void write(Output& out, const replica::logstor::write_buffer::buffer_header& h) { serializer::write(out, h.magic); - serializer::write(out, h.data_size); - serializer::write(out, h.seg_gen); serializer::write(out, static_cast(h.kind)); serializer::write(out, h.version); + serializer::write(out, h.reserved); + serializer::write(out, h.segment_seq.value); + serializer::write(out, h.data_size); serializer::write(out, h.crc); } template static replica::logstor::write_buffer::buffer_header read(Input& in) { replica::logstor::write_buffer::buffer_header h; h.magic = serializer::read(in); - h.data_size = serializer::read(in); - h.seg_gen = serializer::read(in); h.kind = static_cast(serializer::read(in)); h.version = serializer::read(in); + h.reserved = serializer::read(in); + h.segment_seq = replica::logstor::segment_sequence{serializer::read(in)}; + h.data_size = serializer::read(in); h.crc = serializer::read(in); return h; } template static void skip(Input& in) { serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); serializer::skip(in); - serializer::skip(in); - serializer::skip(in); - serializer::skip(in); serializer::skip(in); } }; diff --git a/replica/table.cc b/replica/table.cc index 4bf591fc92..e80d84e5a5 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1113,7 +1113,7 @@ future<> compaction_group::discard_logstor_segments() { co_await sm.discard_segments(*_logstor_segments); } -future<> compaction_group::flush_separator(std::optional seq_num) { +future<> compaction_group::flush_separator(std::optional seq_num) { auto units = co_await get_units(_separator_flush_sem, 1); auto pending = std::exchange(_separator_flushes, {}); if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) { @@ -2611,7 +2611,7 @@ void table::try_trigger_compaction(compaction_group& cg) noexcept { } } -future<> table::flush_separator(std::optional seq_num) { +future<> table::flush_separator(std::optional seq_num) { if (!uses_logstor()) { co_return; } From 6e9d6f02dfdffed410ff493b63f81a248df6901c Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 28 Apr 2026 15:30:17 +0200 Subject: [PATCH 4/5] logstor: rewrite segment seq num from streaming when receiving a segment from streaming, allocate a new sequence number for the segment and rewrite the header with the sequence number, to put it in the same ordering of the receiving shard. --- replica/logstor/segment_manager.cc | 65 +++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 05cbaebbae..14fa533f7f 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -2196,6 +2196,51 @@ future<> segment_manager::await_pending_writes() { class segment_data_sink_impl : public data_sink_impl { seg_ptr _segment; + std::vector _pending_data; + std::optional _initial_header_size; + bool _header_rewritten = false; + + write_buffer::buffer_header read_buffer_header() const { + simple_memory_input_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + return ser::deserialize(bh_stream, std::type_identity{}); + } + + void maybe_parse_initial_header() { + if (_initial_header_size || _pending_data.size() < write_buffer::buffer_header_size) { + return; + } + + auto bh = read_buffer_header(); + if (!write_buffer::validate_header(bh)) { + throw std::runtime_error("Invalid streamed logstor buffer header"); + } + + size_t header_size = write_buffer::buffer_header_size; + if (bh.kind == segment_kind::full) { + header_size += write_buffer::segment_header_size; + } + _initial_header_size = header_size; + } + + void rewrite_buffer_header() { + auto bh = read_buffer_header(); + + logstor_logger.trace("Rewriting buffer header for segment {} seq {} with seq {}", _segment->id(), bh.segment_seq, _segment->seq_num()); + + bh.segment_seq = _segment->seq_num(); + bh.crc = bh.calculate_crc(); + + simple_memory_output_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + ser::serialize(bh_stream, bh); + } + + future<> flush_pending_data() { + if (_pending_data.empty()) { + co_return; + } + co_await _segment->append(bytes_view(reinterpret_cast(_pending_data.data()), _pending_data.size())); + _pending_data.clear(); + } public: segment_data_sink_impl(seg_ptr segment) : _segment(std::move(segment)) @@ -2203,11 +2248,29 @@ public: virtual future<> put(std::span> data) override { for (auto& buf : data) { - co_await _segment->append(bytes_view((const int8_t*)buf.get(), buf.size())); + if (buf.empty()) { + continue; + } + + if (_header_rewritten) { + co_await _segment->append(bytes_view(reinterpret_cast(buf.get()), buf.size())); + continue; + } + + _pending_data.insert(_pending_data.end(), buf.get(), buf.get() + buf.size()); + maybe_parse_initial_header(); + if (_initial_header_size && _pending_data.size() >= *_initial_header_size) { + rewrite_buffer_header(); + _header_rewritten = true; + co_await flush_pending_data(); + } } } virtual future<> close() override { + if (!_header_rewritten && !_pending_data.empty()) { + throw std::runtime_error("Truncated streamed logstor segment header"); + } co_return; } From c3ab3412347cfef963810ca282c81024987dd3d5 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 13 May 2026 13:38:02 +0200 Subject: [PATCH 5/5] test: logstor: add basic delete test extend the test with basic DELETE queries and verify the results are as expected. --- test/cluster/test_logstor.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 1dba454999..8731313749 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -86,6 +86,13 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 2 assert rows[0].v == 150 + await cql.run_async(f"DELETE FROM {ks}.test_int WHERE pk = 1") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 1") + assert len(rows) == 0 + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 2") + assert rows[0].pk == 2 + assert rows[0].v == 150 + # test conflict resolution by timestamp await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000") await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900") @@ -107,6 +114,10 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 1 assert rows[0].v == {'a': 'apple', 'b': 'banana', 'c': 'cherry'} + await cql.run_async(f"DELETE FROM {ks}.test_map WHERE pk = 1") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_map WHERE pk = 1") + assert len(rows) == 0 + @pytest.mark.asyncio async def test_range_read(manager: ManagerClient): cmdline = ['--logger-log-level', 'logstor=debug']