diff --git a/docs/dev/logstor.md b/docs/dev/logstor.md index a26811c32a..e05c52e5d2 100644 --- a/docs/dev/logstor.md +++ b/docs/dev/logstor.md @@ -165,11 +165,14 @@ A serialized form of `write_buffer::buffer_header`. | Offset | Size | Field | Description | |--------|------|-------------|-------------| | 0 | 4 | `magic` | `0x4C475342` ("LGSB"). Used to detect valid buffers during recovery. | -| 4 | 4 | `data_size` | Size in bytes of all record data following the header(s). | -| 8 | 2 | `seg_gen` | Segment generation number. Incremented each time the segment slot is reused. Used during recovery to discard stale data. | -| 10 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | -| 11 | 1 | `version` | Version of the write buffer format. | -| 12 | 4 | `crc` | CRC32 of all other fields in the buffer header. Used for validating the header. | +| 4 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. | +| 5 | 1 | `version` | Version of the write buffer format. | +| 6 | 2 | `reserved` | Reserved for future use. Currently written as zero and included in the CRC. | +| 8 | 8 | `segment_seq` | Monotonic segment sequence number used during recovery and segment ordering checks. | +| 16 | 4 | `data_size` | Size in bytes of all record data following the header(s). | +| 20 | 4 | `crc` | CRC32 of all preceding buffer header fields. Used for validating the header. | + +The buffer header is 24 bytes long, which keeps it aligned to `record_alignment` (8 bytes). #### Segment Header (full segments only) @@ -179,9 +182,9 @@ A serialized form of `write_buffer::segment_header`. | Offset | Size | Field | Description | |--------|------|---------------|-------------| -| 16 | 16 | `table` | UUID of the table this segment belongs to. | -| 32 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | -| 40 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | +| 24 | 16 | `table` | UUID of the table this segment belongs to. | +| 40 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). | +| 48 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). | #### Records @@ -205,7 +208,7 @@ zero_padding -- to align to record_alignment (8 bytes) The `header_size` bytes immediately following the record header are the IDL-serialized form of `log_record_header`, which contains: - `key`: the partition key (`primary_index_key`), including a `decorated_key` with a token and partition key bytes. -- `generation`: a 16-bit write generation number, used during recovery to resolve conflicts when the same key appears in multiple segments. +- `timestamp`: the timestamp of the record, used to resolve conflicts by keeping the record with the latest timestamp. - `table`: UUID of the table this record belongs to. **Mutation Data**: diff --git a/idl/logstor.idl.hh b/idl/logstor.idl.hh index ac8642f3b0..48d45ffc68 100644 --- a/idl/logstor.idl.hh +++ b/idl/logstor.idl.hh @@ -8,6 +8,7 @@ #include "idl/frozen_schema.idl.hh" #include "idl/token.idl.hh" +#include "mutation/timestamp.hh" namespace replica { namespace logstor { @@ -18,7 +19,7 @@ struct primary_index_key { class log_record_header { replica::logstor::primary_index_key key; - replica::logstor::record_generation generation; + api::timestamp_type timestamp; table_id table; }; diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index c1e11ee452..19a1423b60 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -317,7 +317,7 @@ public: future<> discard_logstor_segments(); - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); logstor::separator_buffer& get_separator_buffer(size_t write_size); logstor::segment_set& logstor_segments() noexcept { diff --git a/replica/database.cc b/replica/database.cc index bd4e9d3dfe..57af0e8da3 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -16,6 +16,8 @@ #include "locator/network_topology_strategy.hh" #include "locator/tablets.hh" #include "locator/token_metadata_fwd.hh" +#include "replica/logstor/compaction.hh" +#include "replica/logstor/types.hh" #include "utils/log.hh" #include "replica/database_fwd.hh" #include @@ -955,7 +957,7 @@ database::init_logstor() { trigger_logstor_compaction(false); }); - _logstor->set_trigger_separator_flush_hook([this] (size_t seq_num) { + _logstor->set_trigger_separator_flush_hook([this] (logstor::segment_sequence seq_num) { (void)flush_logstor_separator(seq_num); }); @@ -2955,7 +2957,7 @@ future<> database::flush_logstor_separator_on_all_shards(sharded& shar }); } -future<> database::flush_logstor_separator(std::optional seq_num) { +future<> database::flush_logstor_separator(std::optional seq_num) { return _tables_metadata.parallel_for_each_table([seq_num] (table_id, lw_shared_ptr table) { return table->flush_separator(seq_num); }); diff --git a/replica/database.hh b/replica/database.hh index d1a0d7fb28..83c3ac195b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1179,7 +1179,7 @@ public: return _logstor->get_compaction_manager(); } - future<> flush_separator(std::optional seq_num = std::nullopt); + future<> flush_separator(std::optional seq_num = std::nullopt); future get_logstor_segment_stats() const; @@ -2069,7 +2069,7 @@ public: static future<> trigger_logstor_compaction_on_all_shards(sharded& sharded_db, bool major); void trigger_logstor_compaction(bool major); static future<> flush_logstor_separator_on_all_shards(sharded& sharded_db); - future<> flush_logstor_separator(std::optional seq_num = std::nullopt); + future<> flush_logstor_separator(std::optional seq_num = std::nullopt); future get_logstor_table_segment_stats(table_id table) const; size_t get_logstor_memory_usage() const; diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index a3884c6e46..2b5d31c12b 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -33,7 +33,6 @@ struct segment_descriptor : public log_heap_hook> pending_updates; utils::chunked_vector held_segments; - std::optional min_seq_num; + std::optional min_seq_num; bool flushed{false}; separator_buffer(write_buffer* wb) diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index 7c3d4e69a7..d24989db57 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -14,6 +14,7 @@ #include "utils/bptree.hh" #include "utils/double-decker.hh" #include "utils/phased_barrier.hh" +#include namespace replica::logstor { @@ -100,20 +101,6 @@ public: } } - std::optional exchange(const primary_index_key& key, index_entry new_entry) { - partitions_type::bound_hint hint; - auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); - if (hint.match) { - auto old_entry = i->_e; - i->_e = std::move(new_entry); - return old_entry; - } else { - _partitions.emplace_before(i, key.dk.token().raw(), hint, key.dk, std::move(new_entry)); - ++_key_count; - return std::nullopt; - } - } - bool update_record_location(const primary_index_key& key, log_location old_location, log_location new_location) { auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema)); if (it != _partitions.end()) { @@ -125,11 +112,17 @@ public: return false; } - std::pair> insert_if_newer(const primary_index_key& key, index_entry new_entry, bool prefer_on_tie) { + using entry_cmp_fn = std::function; + + static std::strong_ordering default_entry_cmp(const index_entry& a, const index_entry& b) noexcept { + return a.timestamp <=> b.timestamp; + } + + std::pair> insert(const primary_index_key& key, index_entry new_entry, entry_cmp_fn cmp = default_entry_cmp) { partitions_type::bound_hint hint; auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); if (hint.match) { - if (i->_e.generation < new_entry.generation || (i->_e.generation == new_entry.generation && prefer_on_tie)) { + if (cmp(i->_e, new_entry) <= 0) { auto old_entry = i->_e; i->_e = std::move(new_entry); return {true, std::make_optional(old_entry)}; diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 180c61434c..e6eb1121a0 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -21,6 +21,25 @@ namespace replica::logstor { seastar::logger logstor_logger("logstor"); +static api::timestamp_type extract_logstor_record_timestamp(const mutation& m) { + const auto& partition = m.partition(); + + for (const auto& row_entry : partition.clustered_rows()) { + if (row_entry.dummy()) { + continue; + } + if (!row_entry.row().marker().is_missing()) { + return row_entry.row().marker().timestamp(); + } + } + + if (const auto partition_tombstone = partition.partition_tombstone(); partition_tombstone) { + return partition_tombstone.timestamp; + } + + throw std::runtime_error("logstor mutation has no row marker or partition tombstone timestamp"); +} + logstor::logstor(logstor_config config) : _segment_manager(config.segment_manager_cfg) , _write_buffer(_segment_manager, config.flush_sg) { @@ -73,33 +92,32 @@ future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate:: table_id table = m.schema()->id(); auto& index = cg.get_logstor_index(); - // TODO ? - record_generation gen = index.get(key) - .transform([](const index_entry& entry) { - return entry.generation + 1; - }).value_or(record_generation(1)); + const auto ts = extract_logstor_record_timestamp(m); log_record record { .header = { .key = key, - .generation = gen, + .timestamp = ts, .table = table, }, .mut = canonical_mutation(m) }; - return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, gen, key = std::move(key)] + return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, ts, key = std::move(key)] (log_location location, seastar::gate::holder op) { index_entry new_entry { .location = location, - .generation = gen, + .timestamp = ts, }; - auto old_entry = index.exchange(key, std::move(new_entry)); + auto [inserted, prev_entry] = index.insert(key, std::move(new_entry)); - // If overwriting, free old record - if (old_entry) { - _segment_manager.free_record(old_entry->location); + if (!inserted) { + // A newer entry already exists; free the record we just wrote. + _segment_manager.free_record(location); + } else if (prev_entry) { + // Overwrote an older entry; free it. + _segment_manager.free_record(prev_entry->location); } }).handle_exception([] (std::exception_ptr ep) { logstor_logger.error("Error writing mutation: {}", ep); @@ -292,7 +310,7 @@ void logstor::set_trigger_compaction_hook(std::function fn) { _segment_manager.set_trigger_compaction_hook(std::move(fn)); } -void logstor::set_trigger_separator_flush_hook(std::function fn) { +void logstor::set_trigger_separator_flush_hook(std::function fn) { _segment_manager.set_trigger_separator_flush_hook(std::move(fn)); } diff --git a/replica/logstor/logstor.hh b/replica/logstor/logstor.hh index f9f032911a..ba7e7ba300 100644 --- a/replica/logstor/logstor.hh +++ b/replica/logstor/logstor.hh @@ -74,7 +74,7 @@ public: tracing::trace_state_ptr trace_state = nullptr); void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); }; } // namespace logstor diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 4b2995710a..14fa533f7f 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -69,6 +69,7 @@ protected: class writeable_segment : public segment { seastar::gate _write_gate; segment_ref _seg_ref; + segment_sequence _seq_num; uint32_t _current_offset = 0; // next offset for write @@ -77,7 +78,7 @@ class writeable_segment : public segment { public: using segment::segment; - void start(segment_ref seg_ref); + void start(segment_ref, segment_sequence); future<> stop(); @@ -100,6 +101,10 @@ public: segment_ref ref() { return _seg_ref; } + + segment_sequence seq_num() const noexcept { + return _seq_num; + } }; segment::segment(log_segment_id id, seastar::file file, uint64_t file_offset, size_t max_size) @@ -132,8 +137,9 @@ future segment::read(log_location loc) { }); } -void writeable_segment::start(segment_ref seg_ref) { +void writeable_segment::start(segment_ref seg_ref, segment_sequence seq_num) { _seg_ref = std::move(seg_ref); + _seq_num = seq_num; } future<> writeable_segment::stop() { @@ -197,6 +203,9 @@ class file_manager { std::vector _open_read_files; + std::unique_ptr _zero_buf; + size_t _zero_buf_size{0}; + public: file_manager(segment_manager_config cfg) : _segments_per_file(cfg.file_size / cfg.segment_size) @@ -237,6 +246,9 @@ public: future<> file_manager::start() { co_await seastar::recursive_touch_directory(_base_dir.string()); + _zero_buf_size = 128 * 1024; + _zero_buf = allocate_aligned_buffer(_zero_buf_size, 4096); + std::memset(_zero_buf.get(), 0, _zero_buf_size); } future<> file_manager::stop() { @@ -326,19 +338,13 @@ future file_manager::get_file_for_read(size_t file_id) { } future<> file_manager::format_file_region(seastar::file file, uint64_t offset, size_t size) { - // Allocate aligned buffer for zeroing - const auto write_alignment = file.disk_write_dma_alignment(); - size_t buf_size = align_up(128 * 1024, size_t(write_alignment)); - auto zero_buf = allocate_aligned_buffer(buf_size, write_alignment); - std::memset(zero_buf.get(), 0, buf_size); - - // Write zeros to entire region + // Write zeros to entire region using the pre-allocated zero buffer size_t remaining = size; uint64_t current_offset = offset; while (remaining > 0) { - auto write_size = std::min(remaining, buf_size); - auto written = co_await file.dma_write(current_offset, zero_buf.get(), write_size); + auto write_size = std::min(remaining, _zero_buf_size); + auto written = co_await file.dma_write(current_offset, _zero_buf.get(), write_size); current_offset += written; remaining -= written; @@ -554,7 +560,7 @@ public: struct segment_header { segment_kind kind; - segment_generation seg_gen; + segment_sequence segment_seq; struct mixed {}; struct full { @@ -569,7 +575,7 @@ struct segment_header { static segment_header make_segment_header(const write_buffer::buffer_header& bh, std::optional sh) { segment_header seg_hdr { .kind = bh.kind, - .seg_gen = bh.seg_gen, + .segment_seq = bh.segment_seq, }; switch (bh.kind) { @@ -621,7 +627,7 @@ class segment_manager_impl { seastar::semaphore _active_segment_write_sem{1}; segment_pool _segment_pool; std::optional> _switch_segment_fut; - size_t _segment_seq_num{0}; + segment_sequence _next_segment_seq{1}; seastar::gate _async_gate; future<> _reserve_replenisher{make_ready_future<>()}; @@ -639,7 +645,7 @@ class segment_manager_impl { std::vector _available_separator_buffers; std::function _trigger_compaction_fn; - std::function _trigger_separator_flush_fn; + std::function _trigger_separator_flush_fn; utils::phased_barrier _writes_phaser{"logstor_sm_writes"}; @@ -682,7 +688,7 @@ public: } future<> load_segment(replica::database&, log_segment_id); - future<> recover_segment(replica::database&, log_segment_id); + future<> recover_segment(replica::database&, log_segment_id, primary_index::entry_cmp_fn cmp, std::function on_header); future> read_segment_header(log_segment_id); future<> add_segment_to_compaction_group(replica::database&, segment_descriptor&); @@ -692,7 +698,7 @@ public: } } - void trigger_separator_flush(size_t seq) { + void trigger_separator_flush(segment_sequence seq) { if (_trigger_separator_flush_fn) { _trigger_separator_flush_fn(seq); } @@ -710,7 +716,7 @@ public: _trigger_compaction_fn = std::move(fn); } - void set_trigger_separator_flush_hook(std::function fn) { + void set_trigger_separator_flush_hook(std::function fn) { _trigger_separator_flush_fn = std::move(fn); } @@ -740,9 +746,12 @@ private: future<> request_segment_switch(); future<> switch_active_segment(); + segment_sequence allocate_segment_seq() noexcept { + return _next_segment_seq++; + } std::chrono::microseconds calculate_separator_delay() const; - future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num); + future<> write_to_separator(write_buffer&, segment_ref, segment_sequence); void write_to_separator(table&, log_location prev_loc, log_record, segment_ref); // Sequentially scans one segment and invokes callbacks for the decoded @@ -775,6 +784,13 @@ private: ); } + future get_segment(write_source src) { + seg_ptr seg = co_await _segment_pool.get_segment(src); + seg->start(make_segment_ref(seg->id()), allocate_segment_seq()); + _stats.segments_in_use++; + co_return seg; + } + void free_segment(log_segment_id) noexcept; segment_descriptor& get_segment_descriptor(log_segment_id segment_id) { @@ -984,7 +1000,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ptr seg = _active_segment; auto seg_holder = seg->hold(); auto seg_ref = seg->ref(); - auto segment_seq_num = _segment_seq_num; + auto seq_num = seg->seq_num(); auto write_op = _writes_phaser.start(); auto& desc = get_segment_descriptor(seg->id()); @@ -993,7 +1009,9 @@ future<> segment_manager_impl::write(write_buffer& wb) { seg_ref.set_flush_failure(); }); - wb.write_header(desc.seg_gen, std::nullopt); + logstor_logger.trace("Write active segment {} seq {}", seg->id(), seq_num); + + wb.write_header(seq_num, std::nullopt); auto loc = co_await seg->append(data); sem_units.return_all(); @@ -1008,7 +1026,7 @@ future<> segment_manager_impl::write(write_buffer& wb) { co_await wb.complete_writes(loc); co_await with_scheduling_group(_cfg.separator_sg, [&] { - return write_to_separator(wb, std::move(seg_ref), segment_seq_num); + return write_to_separator(wb, std::move(seg_ref), seq_num); }); write_to_separator_failed.cancel(); } @@ -1030,13 +1048,12 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g throw std::runtime_error(fmt::format("Write size {} exceeds segment size {}", data.size(), _cfg.segment_size)); } - seg_ptr seg = co_await _segment_pool.get_segment(source); + auto seg = co_await get_segment(source); auto& desc = get_segment_descriptor(seg->id()); - _stats.segments_in_use++; - logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source)); + logstor_logger.trace("Write full segment {} seq {} from {}", seg->id(), seg->seq_num(), write_source_to_string(source)); - wb.write_header(desc.seg_gen, cg.schema()->id()); + wb.write_header(seg->seq_num(), cg.schema()->id()); auto loc = co_await seg->append(data); @@ -1087,11 +1104,9 @@ future<> segment_manager_impl::request_segment_switch() { future<> segment_manager_impl::switch_active_segment() { auto holder = _async_gate.hold(); - auto new_seg = co_await _segment_pool.get_segment(write_source::normal_write); + auto new_seg = co_await get_segment(write_source::normal_write); auto old_seg = std::exchange(_active_segment, std::move(new_seg)); - _stats.segments_in_use++; - _segment_seq_num++; if (old_seg) { // close old segment in background @@ -1102,12 +1117,11 @@ future<> segment_manager_impl::switch_active_segment() { // trigger separator flush for separator buffers that hold old segments auto u = std::max(1, _max_segments / 100); - if (_segment_seq_num % u == 0 && _segment_seq_num > 5*u) { - trigger_separator_flush(_segment_seq_num - 5*u); + if (_next_segment_seq.value % u == 0 && _next_segment_seq.value > 5*u) { + trigger_separator_flush(segment_sequence(_next_segment_seq.value - 5*u)); } - _active_segment->start(make_segment_ref(_active_segment->id())); - logstor_logger.trace("Switched active segment to {}", _active_segment->id()); + logstor_logger.trace("Switched active segment to {} seq {}", _active_segment->id(), _active_segment->seq_num()); } future<> segment_manager_impl::replenish_reserve() { @@ -1189,10 +1203,6 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept { if (desc.ref_count != 0) { on_internal_error(logstor_logger, format("Freeing segment {} with non-zero reference count", segment_id)); } - desc.on_free_segment(); - - // TODO write new generation? - _free_segments.push_back(segment_id); _segment_freed_cv.signal(); @@ -1220,22 +1230,12 @@ future<> segment_manager_impl::discard_segments(segment_set& ss) { segments.push_back(seg_id); } + // Invalidate the first header block so recovery treats the slot as empty. co_await max_concurrent_for_each(segments, 32, [this] (log_segment_id seg_id) -> future<> { - // Write a valid empty segment header with the next generation. - // This marks the segment as discarded while preserving the generation counter. - auto next_gen = get_segment_descriptor(seg_id).seg_gen; - ++next_gen; - auto buf = allocate_aligned_buffer(block_alignment, 4096); - std::memset(buf.get(), 0, block_alignment); - simple_memory_output_stream out(buf.get(), write_buffer::buffer_header_size); - write_buffer::write_empty_header(out, next_gen); - + logstor_logger.trace("Discard segment {}", seg_id); auto [file_id, file_offset] = segment_id_to_file_location(seg_id); auto file = co_await _file_mgr.get_file_for_write(file_id); - co_await file.dma_write(file_offset, buf.get(), block_alignment); - - logstor_logger.trace("Discard segment {} next gen {}", seg_id, next_gen); - + co_await _file_mgr.format_file_region(file, file_offset, block_alignment); free_segment(seg_id); }); } @@ -1254,7 +1254,7 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, .read_ahead = 1, }); size_t current_position = 0; - auto seg_gen = get_segment_descriptor(segment_id).seg_gen; + std::optional segment_seq; logstor_logger.trace("Reading records from segment {} at file {} offset {}", segment_id, file_id, file_offset); @@ -1284,7 +1284,9 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id, break; } - if (bh.seg_gen != seg_gen) { + if (!segment_seq) { + segment_seq = bh.segment_seq; + } else if (bh.segment_seq != *segment_seq) { break; } @@ -1751,7 +1753,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() { return separator_buffer(wb); } -future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) { +future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, segment_sequence segment_seq_num) { for (auto&& w : wb.records()) { co_await coroutine::maybe_yield(); @@ -1887,18 +1889,39 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { next_file_id++; } - // populate index from all segments. keep the latest record for each key. + size_t allocated_segment_count = next_file_id * _segments_per_file; + + std::vector segment_seqs(allocated_segment_count, segment_sequence(0)); + segment_sequence max_segment_seq = segment_sequence(0); + + // Populate the index from all segments. Keep the latest record for each key. + // For equal records, keep the one from the segment with the highest sequence number. + auto cmp_with_seq = [&segment_seqs] (const index_entry& old_entry, const index_entry& candidate) -> std::strong_ordering { + if (auto c = primary_index::default_entry_cmp(old_entry, candidate); c != 0) { + return c; + } + const auto old_seq = segment_seqs[old_entry.location.segment.value]; + const auto new_seq = segment_seqs[candidate.location.segment.value]; + if (auto c = old_seq <=> new_seq; c != 0) { + return c; + } + return old_entry.location.offset <=> candidate.location.offset; + }; + for (auto file_id : found_file_ids) { logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size()); co_await max_concurrent_for_each(segments_in_file(file_id), 32, - [this, &db] (log_segment_id seg_id) { - return recover_segment(db, seg_id); + [this, &db, &cmp_with_seq, &segment_seqs, &max_segment_seq] (log_segment_id seg_id) { + return recover_segment(db, seg_id, cmp_with_seq, + [seg_id, &segment_seqs, &max_segment_seq] (const segment_header& seg_hdr) { + segment_seqs[seg_id.value] = seg_hdr.segment_seq; + max_segment_seq = std::max(max_segment_seq, seg_hdr.segment_seq); + }); } ); } // go over the index and mark all segments that have live data as used. - size_t allocated_segment_count = next_file_id * _segments_per_file; utils::dynamic_bitset used_segments(allocated_segment_count); co_await db.get_tables_metadata().for_each_table_gently([&] (table_id tid, lw_shared_ptr
tp) -> future<> { @@ -1918,9 +1941,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { for (size_t seg_idx = 0; seg_idx < allocated_segment_count; ++seg_idx) { co_await coroutine::maybe_yield(); log_segment_id seg_id(seg_idx); - auto& desc = get_segment_descriptor(seg_id); if (!used_segments.test(seg_idx)) { - desc.on_free_segment(); _free_segments.push_back(seg_id); free_segment_count++; } else { @@ -1946,32 +1967,30 @@ future<> segment_manager_impl::do_recovery(replica::database& db) { } _next_new_segment_id = allocated_segment_count; + _next_segment_seq = max_segment_seq + 1; _file_mgr.recover_next_file_id(next_file_id); logstor_logger.info("Recovery complete"); } -future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id) { +future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id, + primary_index::entry_cmp_fn cmp, std::function on_header) { auto& desc = get_segment_descriptor(segment_id); desc.reset(_cfg.segment_size); - auto seg_hdr = co_await read_segment_header(segment_id); - if (!seg_hdr) { - logstor_logger.trace("Segment {} has invalid header, skipping", segment_id); - co_return; - } - desc.seg_gen = seg_hdr->seg_gen; - bool is_full_segment = seg_hdr->kind == segment_kind::full; - logstor_logger.trace("Recovering segment {} with generation {}", segment_id, desc.seg_gen); - - co_await for_each_record(segment_id, - [this, &desc, &db, is_full_segment] (log_location loc, const log_record_header& header) -> want_data { - logstor_logger.trace("Recovery: read record at {} gen {}", loc, header.generation); + co_await scan_segment(segment_id, + [segment_id, on_header = std::move(on_header)] (const segment_header& seg_hdr) mutable { + logstor_logger.trace("Recovering segment {} with sequence {}", segment_id, seg_hdr.segment_seq); + on_header(seg_hdr); + return make_ready_future<>(); + }, + [this, &desc, &db, &cmp] (log_location loc, const log_record_header& header) -> want_data { + logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp); index_entry new_entry { .location = loc, - .generation = header.generation + .timestamp = header.timestamp }; try { @@ -1979,7 +1998,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen if (!t.uses_logstor()) { return want_data::no; } - auto [inserted, prev_entry] = t.logstor_index().insert_if_newer(header.key, new_entry, is_full_segment); + auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry, cmp); if (inserted) { desc.on_write(loc); if (prev_entry) { @@ -2147,7 +2166,7 @@ void segment_manager::set_trigger_compaction_hook(std::function fn) { _impl->set_trigger_compaction_hook(std::move(fn)); } -void segment_manager::set_trigger_separator_flush_hook(std::function fn) { +void segment_manager::set_trigger_separator_flush_hook(std::function fn) { _impl->set_trigger_separator_flush_hook(std::move(fn)); } @@ -2177,6 +2196,51 @@ future<> segment_manager::await_pending_writes() { class segment_data_sink_impl : public data_sink_impl { seg_ptr _segment; + std::vector _pending_data; + std::optional _initial_header_size; + bool _header_rewritten = false; + + write_buffer::buffer_header read_buffer_header() const { + simple_memory_input_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + return ser::deserialize(bh_stream, std::type_identity{}); + } + + void maybe_parse_initial_header() { + if (_initial_header_size || _pending_data.size() < write_buffer::buffer_header_size) { + return; + } + + auto bh = read_buffer_header(); + if (!write_buffer::validate_header(bh)) { + throw std::runtime_error("Invalid streamed logstor buffer header"); + } + + size_t header_size = write_buffer::buffer_header_size; + if (bh.kind == segment_kind::full) { + header_size += write_buffer::segment_header_size; + } + _initial_header_size = header_size; + } + + void rewrite_buffer_header() { + auto bh = read_buffer_header(); + + logstor_logger.trace("Rewriting buffer header for segment {} seq {} with seq {}", _segment->id(), bh.segment_seq, _segment->seq_num()); + + bh.segment_seq = _segment->seq_num(); + bh.crc = bh.calculate_crc(); + + simple_memory_output_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size); + ser::serialize(bh_stream, bh); + } + + future<> flush_pending_data() { + if (_pending_data.empty()) { + co_return; + } + co_await _segment->append(bytes_view(reinterpret_cast(_pending_data.data()), _pending_data.size())); + _pending_data.clear(); + } public: segment_data_sink_impl(seg_ptr segment) : _segment(std::move(segment)) @@ -2184,11 +2248,29 @@ public: virtual future<> put(std::span> data) override { for (auto& buf : data) { - co_await _segment->append(bytes_view((const int8_t*)buf.get(), buf.size())); + if (buf.empty()) { + continue; + } + + if (_header_rewritten) { + co_await _segment->append(bytes_view(reinterpret_cast(buf.get()), buf.size())); + continue; + } + + _pending_data.insert(_pending_data.end(), buf.get(), buf.get() + buf.size()); + maybe_parse_initial_header(); + if (_initial_header_size && _pending_data.size() >= *_initial_header_size) { + rewrite_buffer_header(); + _header_rewritten = true; + co_await flush_pending_data(); + } } } virtual future<> close() override { + if (!_header_rewritten && !_pending_data.empty()) { + throw std::runtime_error("Truncated streamed logstor segment header"); + } co_return; } @@ -2226,22 +2308,20 @@ public: co_await _sm.load_segment(_db, _seg->id()); } future<> abort() override { - _sm.free_segment(_seg->id()); co_return; } }; future<> segment_manager_impl::load_segment(replica::database& db, log_segment_id seg_id) { // read the segment and populate the index - co_await recover_segment(db, seg_id); + co_await recover_segment(db, seg_id, primary_index::default_entry_cmp, [] (const segment_header&) {}); auto& desc = get_segment_descriptor(seg_id); co_await add_segment_to_compaction_group(db, desc); } future> segment_manager_impl::create_segment_output_stream(replica::database& db) { - auto seg = co_await _segment_pool.get_segment(write_source::streaming); - _stats.segments_in_use++; + auto seg = co_await get_segment(write_source::streaming); co_return std::make_unique(*this, db, std::move(seg)); } diff --git a/replica/logstor/segment_manager.hh b/replica/logstor/segment_manager.hh index 52adb12533..17a486ee62 100644 --- a/replica/logstor/segment_manager.hh +++ b/replica/logstor/segment_manager.hh @@ -129,7 +129,7 @@ public: const compaction_manager& get_compaction_manager() const noexcept; void set_trigger_compaction_hook(std::function fn); - void set_trigger_separator_flush_hook(std::function fn); + void set_trigger_separator_flush_hook(std::function fn); size_t get_segment_size() const noexcept; diff --git a/replica/logstor/types.hh b/replica/logstor/types.hh index 7f7b3a2c65..44b64cd96b 100644 --- a/replica/logstor/types.hh +++ b/replica/logstor/types.hh @@ -9,9 +9,9 @@ #include #include -#include "mutation/canonical_mutation.hh" -#include "replica/logstor/utils.hh" #include "dht/decorated_key.hh" +#include "mutation/canonical_mutation.hh" +#include "mutation/timestamp.hh" namespace replica::logstor { @@ -34,19 +34,16 @@ struct primary_index_key { dht::decorated_key dk; }; -using record_generation = generation_base; -using segment_generation = generation_base; - struct index_entry { log_location location; - record_generation generation; + api::timestamp_type timestamp; bool operator==(const index_entry& other) const noexcept = default; }; struct log_record_header { primary_index_key key; - record_generation generation; + api::timestamp_type timestamp; table_id table; }; @@ -55,6 +52,28 @@ struct log_record { canonical_mutation mut; }; +struct segment_sequence { + uint64_t value; + + bool operator==(const segment_sequence& other) const noexcept = default; + auto operator<=>(const segment_sequence& other) const noexcept = default; + + segment_sequence& operator++() noexcept { + ++value; + return *this; + } + + segment_sequence operator++(int) noexcept { + segment_sequence tmp = *this; + ++value; + return tmp; + } + + segment_sequence operator+(uint64_t increment) const noexcept { + return segment_sequence{value + increment}; + } +}; + } // Format specialization declarations and implementations @@ -82,3 +101,11 @@ struct fmt::formatter : fmt::formatter +struct fmt::formatter : fmt::formatter { + template + auto format(const replica::logstor::segment_sequence& seq, FormatContext& ctx) const { + return fmt::format_to(ctx.out(), "sseq({})", seq.value); + } +}; diff --git a/replica/logstor/utils.hh b/replica/logstor/utils.hh deleted file mode 100644 index 72b10bfa94..0000000000 --- a/replica/logstor/utils.hh +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (C) 2026-present ScyllaDB - */ - -/* - * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 - */ -#pragma once - -#include -#include "serializer.hh" - -namespace replica::logstor { - -// an unsigned integer that can be incremented and compared with wraparound semantics -template -class generation_base { - T _value; - -public: - - using underlying = T; - - constexpr generation_base() noexcept : _value(0) {} - constexpr explicit generation_base(T value) noexcept : _value(value) {} - - constexpr T value() const noexcept { return _value; } - - constexpr generation_base& operator++() noexcept { - ++_value; - return *this; - } - - constexpr generation_base operator++(int) noexcept { - auto old = *this; - ++_value; - return old; - } - - constexpr generation_base& operator+=(T delta) noexcept { - _value += delta; - return *this; - } - - constexpr generation_base operator+(T delta) const noexcept { - return generation_base(_value + delta); - } - - constexpr bool operator==(const generation_base& other) const noexcept = default; - - /// Comparison using wraparound semantics. - /// Returns true if this generation is less than other, accounting for wraparound. - /// Assumes generations are within half the value space of each other. - constexpr bool operator<(const generation_base& other) const noexcept { - // Use signed comparison after converting difference to signed type - // This handles wraparound: if diff > max/2, it's treated as negative - using signed_type = std::make_signed_t; - auto diff = static_cast(_value - other._value); - return diff < 0; - } - - constexpr bool operator<=(const generation_base& other) const noexcept { - return *this == other || *this < other; - } - - constexpr bool operator>(const generation_base& other) const noexcept { - return other < *this; - } - - constexpr bool operator>=(const generation_base& other) const noexcept { - return other <= *this; - } -}; - -} - -template -struct fmt::formatter> : fmt::formatter { - template - auto format(const replica::logstor::generation_base& gen, FormatContext& ctx) const { - return fmt::formatter::format(gen.value(), ctx); - } -}; - -namespace ser { - -template -struct serializer> { - template - static void write(Output& out, const replica::logstor::generation_base& g) { - serializer::underlying>::write(out, g.value()); - } - template - static replica::logstor::generation_base read(Input& in) { - auto val = serializer::underlying>::read(in); - return replica::logstor::generation_base(val); - } - template - static void skip(Input& in) { - serializer::underlying>::skip(in); - } -}; - -} diff --git a/replica/logstor/write_buffer.cc b/replica/logstor/write_buffer.cc index b10844f1d7..987859642c 100644 --- a/replica/logstor/write_buffer.cc +++ b/replica/logstor/write_buffer.cc @@ -164,11 +164,12 @@ void write_buffer::finalize(size_t alignment) { pad_to_alignment(alignment); } -void write_buffer::write_header(segment_generation seg_gen, std::optional table) { +void write_buffer::write_header(segment_sequence segment_seq, std::optional table) { _buffer_header.magic = buffer_header_magic; - _buffer_header.seg_gen = seg_gen; _buffer_header.kind = _segment_kind; _buffer_header.version = current_version; + _buffer_header.reserved = 0; + _buffer_header.segment_seq = segment_seq; _buffer_header.crc = _buffer_header.calculate_crc(); @@ -185,25 +186,16 @@ void write_buffer::write_header(segment_generation seg_gen, std::optional(out, hdr); -} - bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { if (bh.magic != write_buffer::buffer_header_magic) { return false; } - if (bh.calculate_crc() != bh.crc) { + switch (bh.kind) { + case segment_kind::mixed: + case segment_kind::full: + break; + default: return false; } @@ -211,6 +203,10 @@ bool write_buffer::validate_header(const write_buffer::buffer_header& bh) { return false; } + if (bh.calculate_crc() != bh.crc) { + return false; + } + return true; } @@ -248,10 +244,11 @@ size_t write_buffer::estimate_required_segments(size_t net_data_size, size_t rec uint32_t write_buffer::buffer_header::calculate_crc() const { utils::crc32 c; c.process_le(magic); - c.process_le(data_size); - c.process_le(seg_gen.value()); c.process_le(static_cast(kind)); c.process_le(version); + c.process_le(reserved); + c.process_le(segment_seq.value); + c.process_le(data_size); return c.get(); } diff --git a/replica/logstor/write_buffer.hh b/replica/logstor/write_buffer.hh index 6004988021..9840079560 100644 --- a/replica/logstor/write_buffer.hh +++ b/replica/logstor/write_buffer.hh @@ -114,19 +114,22 @@ public: struct buffer_header { uint32_t magic; - uint32_t data_size; // size of all records data following the buffer_header - segment_generation seg_gen; segment_kind kind; uint8_t version; + uint16_t reserved; + segment_sequence segment_seq; + uint32_t data_size; // size of all records data following the header(s) uint32_t crc; uint32_t calculate_crc() const; }; static constexpr size_t buffer_header_size = - 2 * sizeof(uint32_t) - + sizeof(segment_generation::underlying) + sizeof(uint32_t) + sizeof(std::underlying_type_t) + sizeof(uint8_t) + + sizeof(uint16_t) + + sizeof(segment_sequence) + + sizeof(uint32_t) + sizeof(uint32_t); static_assert(buffer_header_size % record_alignment == 0, "Buffer header size must be aligned by record_alignment"); @@ -234,8 +237,6 @@ public: return s; } - static void write_empty_header(ostream& out, segment_generation seg_gen); - static bool validate_header(const buffer_header& bh); private: @@ -243,7 +244,7 @@ private: const char* data() const noexcept { return _buffer.get(); } // table is set for segment_kind::full - void write_header(segment_generation seg_gen, std::optional table); + void write_header(segment_sequence segment_seq, std::optional table); // get all write records in the buffer. // with_record_copy must be to true when creating the write_buffer. @@ -329,30 +330,33 @@ struct serializer { template static void write(Output& out, const replica::logstor::write_buffer::buffer_header& h) { serializer::write(out, h.magic); - serializer::write(out, h.data_size); - serializer::write(out, h.seg_gen); serializer::write(out, static_cast(h.kind)); serializer::write(out, h.version); + serializer::write(out, h.reserved); + serializer::write(out, h.segment_seq.value); + serializer::write(out, h.data_size); serializer::write(out, h.crc); } template static replica::logstor::write_buffer::buffer_header read(Input& in) { replica::logstor::write_buffer::buffer_header h; h.magic = serializer::read(in); - h.data_size = serializer::read(in); - h.seg_gen = serializer::read(in); h.kind = static_cast(serializer::read(in)); h.version = serializer::read(in); + h.reserved = serializer::read(in); + h.segment_seq = replica::logstor::segment_sequence{serializer::read(in)}; + h.data_size = serializer::read(in); h.crc = serializer::read(in); return h; } template static void skip(Input& in) { serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); + serializer::skip(in); serializer::skip(in); - serializer::skip(in); - serializer::skip(in); - serializer::skip(in); serializer::skip(in); } }; diff --git a/replica/table.cc b/replica/table.cc index 4bf591fc92..e80d84e5a5 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1113,7 +1113,7 @@ future<> compaction_group::discard_logstor_segments() { co_await sm.discard_segments(*_logstor_segments); } -future<> compaction_group::flush_separator(std::optional seq_num) { +future<> compaction_group::flush_separator(std::optional seq_num) { auto units = co_await get_units(_separator_flush_sem, 1); auto pending = std::exchange(_separator_flushes, {}); if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) { @@ -2611,7 +2611,7 @@ void table::try_trigger_compaction(compaction_group& cg) noexcept { } } -future<> table::flush_separator(std::optional seq_num) { +future<> table::flush_separator(std::optional seq_num) { if (!uses_logstor()) { co_return; } diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 4f3d48a748..8731313749 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -86,6 +86,20 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 2 assert rows[0].v == 150 + await cql.run_async(f"DELETE FROM {ks}.test_int WHERE pk = 1") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 1") + assert len(rows) == 0 + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 2") + assert rows[0].pk == 2 + assert rows[0].v == 150 + + # test conflict resolution by timestamp + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000") + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 3") + assert rows[0].pk == 3 + assert rows[0].v == 300 + # test frozen map value await cql.run_async(f"CREATE TABLE {ks}.test_map (pk int PRIMARY KEY, v frozen>) WITH storage_engine = 'logstor'") @@ -100,6 +114,10 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 1 assert rows[0].v == {'a': 'apple', 'b': 'banana', 'c': 'cherry'} + await cql.run_async(f"DELETE FROM {ks}.test_map WHERE pk = 1") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_map WHERE pk = 1") + assert len(rows) == 0 + @pytest.mark.asyncio async def test_range_read(manager: ManagerClient): cmdline = ['--logger-log-level', 'logstor=debug']