logstor: add segment sequence number

add a segment sequence number that is a global (per-shard) increasing
number that is allocated when getting a new segment for write, and is
written in buffer headers in the segment.

this is useful when scanning a segment for ignoring buffers that belong
to an older generation of that segment.

it's also useful by creating a single ordering of all segments that
respects the compaction ordering.

on recovery, we keep the records from the newest segments among equal
records. so if a record is rewritten by compaction, the recovery will
choose the records from the new compacted segments.

discard segments by writing zeroes to the segment header.
This commit is contained in:
Michael Litvak
2026-05-14 19:21:50 +02:00
parent 652d1164dd
commit 9e0741e5b7
14 changed files with 169 additions and 225 deletions

View File

@@ -165,11 +165,14 @@ A serialized form of `write_buffer::buffer_header`.
| Offset | Size | Field | Description |
|--------|------|-------------|-------------|
| 0 | 4 | `magic` | `0x4C475342` ("LGSB"). Used to detect valid buffers during recovery. |
| 4 | 4 | `data_size` | Size in bytes of all record data following the header(s). |
| 8 | 2 | `seg_gen` | Segment generation number. Incremented each time the segment slot is reused. Used during recovery to discard stale data. |
| 10 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. |
| 11 | 1 | `version` | Version of the write buffer format. |
| 12 | 4 | `crc` | CRC32 of all other fields in the buffer header. Used for validating the header. |
| 4 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. |
| 5 | 1 | `version` | Version of the write buffer format. |
| 6 | 2 | `reserved` | Reserved for future use. Currently written as zero and included in the CRC. |
| 8 | 8 | `segment_seq` | Monotonic segment sequence number used during recovery and segment ordering checks. |
| 16 | 4 | `data_size` | Size in bytes of all record data following the header(s). |
| 20 | 4 | `crc` | CRC32 of all preceding buffer header fields. Used for validating the header. |
The buffer header is 24 bytes long, which keeps it aligned to `record_alignment` (8 bytes).
#### Segment Header (full segments only)
@@ -179,9 +182,9 @@ A serialized form of `write_buffer::segment_header`.
| Offset | Size | Field | Description |
|--------|------|---------------|-------------|
| 16 | 16 | `table` | UUID of the table this segment belongs to. |
| 32 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). |
| 40 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). |
| 24 | 16 | `table` | UUID of the table this segment belongs to. |
| 40 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). |
| 48 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). |
#### Records

View File

@@ -317,7 +317,7 @@ public:
future<> discard_logstor_segments();
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
logstor::separator_buffer& get_separator_buffer(size_t write_size);
logstor::segment_set& logstor_segments() noexcept {

View File

@@ -16,6 +16,8 @@
#include "locator/network_topology_strategy.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata_fwd.hh"
#include "replica/logstor/compaction.hh"
#include "replica/logstor/types.hh"
#include "utils/log.hh"
#include "replica/database_fwd.hh"
#include <seastar/core/shard_id.hh>
@@ -955,7 +957,7 @@ database::init_logstor() {
trigger_logstor_compaction(false);
});
_logstor->set_trigger_separator_flush_hook([this] (size_t seq_num) {
_logstor->set_trigger_separator_flush_hook([this] (logstor::segment_sequence seq_num) {
(void)flush_logstor_separator(seq_num);
});
@@ -2955,7 +2957,7 @@ future<> database::flush_logstor_separator_on_all_shards(sharded<database>& shar
});
}
future<> database::flush_logstor_separator(std::optional<size_t> seq_num) {
future<> database::flush_logstor_separator(std::optional<logstor::segment_sequence> seq_num) {
return _tables_metadata.parallel_for_each_table([seq_num] (table_id, lw_shared_ptr<table> table) {
return table->flush_separator(seq_num);
});

View File

@@ -1179,7 +1179,7 @@ public:
return _logstor->get_compaction_manager();
}
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
future<logstor::table_segment_stats> get_logstor_segment_stats() const;
@@ -2069,7 +2069,7 @@ public:
static future<> trigger_logstor_compaction_on_all_shards(sharded<database>& sharded_db, bool major);
void trigger_logstor_compaction(bool major);
static future<> flush_logstor_separator_on_all_shards(sharded<database>& sharded_db);
future<> flush_logstor_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_logstor_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
future<logstor::table_segment_stats> get_logstor_table_segment_stats(table_id table) const;
size_t get_logstor_memory_usage() const;

View File

@@ -33,7 +33,6 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
// when freeing a record, increase by the record's net data size
size_t free_space{0};
size_t record_count{0};
segment_generation seg_gen{1};
segment_set* owner{nullptr}; // non-owning, set when added to a segment_set
int ref_count{0};
@@ -46,10 +45,6 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
return segment_size - free_space;
}
void on_free_segment() noexcept {
++seg_gen;
}
void on_write(size_t net_data_size, size_t cnt = 1) noexcept {
free_space -= net_data_size;
record_count += cnt;
@@ -160,7 +155,7 @@ struct separator_buffer {
write_buffer* buf;
utils::chunked_vector<future<>> pending_updates;
utils::chunked_vector<segment_ref> held_segments;
std::optional<size_t> min_seq_num;
std::optional<segment_sequence> min_seq_num;
bool flushed{false};
separator_buffer(write_buffer* wb)

View File

@@ -310,7 +310,7 @@ void logstor::set_trigger_compaction_hook(std::function<void()> fn) {
_segment_manager.set_trigger_compaction_hook(std::move(fn));
}
void logstor::set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void logstor::set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_segment_manager.set_trigger_separator_flush_hook(std::move(fn));
}

View File

@@ -74,7 +74,7 @@ public:
tracing::trace_state_ptr trace_state = nullptr);
void set_trigger_compaction_hook(std::function<void()> fn);
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn);
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn);
};
} // namespace logstor

View File

@@ -69,6 +69,7 @@ protected:
class writeable_segment : public segment {
seastar::gate _write_gate;
segment_ref _seg_ref;
segment_sequence _seq_num;
uint32_t _current_offset = 0; // next offset for write
@@ -77,7 +78,7 @@ class writeable_segment : public segment {
public:
using segment::segment;
void start(segment_ref seg_ref);
void start(segment_ref, segment_sequence);
future<> stop();
@@ -100,6 +101,10 @@ public:
segment_ref ref() {
return _seg_ref;
}
segment_sequence seq_num() const noexcept {
return _seq_num;
}
};
segment::segment(log_segment_id id, seastar::file file, uint64_t file_offset, size_t max_size)
@@ -132,8 +137,9 @@ future<log_record> segment::read(log_location loc) {
});
}
void writeable_segment::start(segment_ref seg_ref) {
void writeable_segment::start(segment_ref seg_ref, segment_sequence seq_num) {
_seg_ref = std::move(seg_ref);
_seq_num = seq_num;
}
future<> writeable_segment::stop() {
@@ -197,6 +203,9 @@ class file_manager {
std::vector<seastar::file> _open_read_files;
std::unique_ptr<char[], seastar::free_deleter> _zero_buf;
size_t _zero_buf_size{0};
public:
file_manager(segment_manager_config cfg)
: _segments_per_file(cfg.file_size / cfg.segment_size)
@@ -237,6 +246,9 @@ public:
future<> file_manager::start() {
co_await seastar::recursive_touch_directory(_base_dir.string());
_zero_buf_size = 128 * 1024;
_zero_buf = allocate_aligned_buffer<char>(_zero_buf_size, 4096);
std::memset(_zero_buf.get(), 0, _zero_buf_size);
}
future<> file_manager::stop() {
@@ -326,19 +338,13 @@ future<seastar::file> file_manager::get_file_for_read(size_t file_id) {
}
future<> file_manager::format_file_region(seastar::file file, uint64_t offset, size_t size) {
// Allocate aligned buffer for zeroing
const auto write_alignment = file.disk_write_dma_alignment();
size_t buf_size = align_up<size_t>(128 * 1024, size_t(write_alignment));
auto zero_buf = allocate_aligned_buffer<char>(buf_size, write_alignment);
std::memset(zero_buf.get(), 0, buf_size);
// Write zeros to entire region
// Write zeros to entire region using the pre-allocated zero buffer
size_t remaining = size;
uint64_t current_offset = offset;
while (remaining > 0) {
auto write_size = std::min(remaining, buf_size);
auto written = co_await file.dma_write(current_offset, zero_buf.get(), write_size);
auto write_size = std::min(remaining, _zero_buf_size);
auto written = co_await file.dma_write(current_offset, _zero_buf.get(), write_size);
current_offset += written;
remaining -= written;
@@ -554,7 +560,7 @@ public:
struct segment_header {
segment_kind kind;
segment_generation seg_gen;
segment_sequence segment_seq;
struct mixed {};
struct full {
@@ -569,7 +575,7 @@ struct segment_header {
static segment_header make_segment_header(const write_buffer::buffer_header& bh, std::optional<write_buffer::segment_header> sh) {
segment_header seg_hdr {
.kind = bh.kind,
.seg_gen = bh.seg_gen,
.segment_seq = bh.segment_seq,
};
switch (bh.kind) {
@@ -621,7 +627,7 @@ class segment_manager_impl {
seastar::semaphore _active_segment_write_sem{1};
segment_pool _segment_pool;
std::optional<shared_future<>> _switch_segment_fut;
size_t _segment_seq_num{0};
segment_sequence _next_segment_seq{1};
seastar::gate _async_gate;
future<> _reserve_replenisher{make_ready_future<>()};
@@ -639,7 +645,7 @@ class segment_manager_impl {
std::vector<write_buffer*> _available_separator_buffers;
std::function<void()> _trigger_compaction_fn;
std::function<void(size_t)> _trigger_separator_flush_fn;
std::function<void(segment_sequence)> _trigger_separator_flush_fn;
utils::phased_barrier _writes_phaser{"logstor_sm_writes"};
@@ -682,7 +688,7 @@ public:
}
future<> load_segment(replica::database&, log_segment_id);
future<> recover_segment(replica::database&, log_segment_id);
future<> recover_segment(replica::database&, log_segment_id, primary_index::entry_cmp_fn cmp, std::function<void(const segment_header&)> on_header);
future<std::optional<segment_header>> read_segment_header(log_segment_id);
future<> add_segment_to_compaction_group(replica::database&, segment_descriptor&);
@@ -692,7 +698,7 @@ public:
}
}
void trigger_separator_flush(size_t seq) {
void trigger_separator_flush(segment_sequence seq) {
if (_trigger_separator_flush_fn) {
_trigger_separator_flush_fn(seq);
}
@@ -710,7 +716,7 @@ public:
_trigger_compaction_fn = std::move(fn);
}
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_trigger_separator_flush_fn = std::move(fn);
}
@@ -740,9 +746,12 @@ private:
future<> request_segment_switch();
future<> switch_active_segment();
segment_sequence allocate_segment_seq() noexcept {
return _next_segment_seq++;
}
std::chrono::microseconds calculate_separator_delay() const;
future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num);
future<> write_to_separator(write_buffer&, segment_ref, segment_sequence);
void write_to_separator(table&, log_location prev_loc, log_record, segment_ref);
// Sequentially scans one segment and invokes callbacks for the decoded
@@ -777,7 +786,7 @@ private:
future<seg_ptr> get_segment(write_source src) {
seg_ptr seg = co_await _segment_pool.get_segment(src);
seg->start(make_segment_ref(seg->id()));
seg->start(make_segment_ref(seg->id()), allocate_segment_seq());
_stats.segments_in_use++;
co_return seg;
}
@@ -991,7 +1000,7 @@ future<> segment_manager_impl::write(write_buffer& wb) {
seg_ptr seg = _active_segment;
auto seg_holder = seg->hold();
auto seg_ref = seg->ref();
auto segment_seq_num = _segment_seq_num;
auto seq_num = seg->seq_num();
auto write_op = _writes_phaser.start();
auto& desc = get_segment_descriptor(seg->id());
@@ -1000,7 +1009,9 @@ future<> segment_manager_impl::write(write_buffer& wb) {
seg_ref.set_flush_failure();
});
wb.write_header(desc.seg_gen, std::nullopt);
logstor_logger.trace("Write active segment {} seq {}", seg->id(), seq_num);
wb.write_header(seq_num, std::nullopt);
auto loc = co_await seg->append(data);
sem_units.return_all();
@@ -1015,7 +1026,7 @@ future<> segment_manager_impl::write(write_buffer& wb) {
co_await wb.complete_writes(loc);
co_await with_scheduling_group(_cfg.separator_sg, [&] {
return write_to_separator(wb, std::move(seg_ref), segment_seq_num);
return write_to_separator(wb, std::move(seg_ref), seq_num);
});
write_to_separator_failed.cancel();
}
@@ -1040,9 +1051,9 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g
auto seg = co_await get_segment(source);
auto& desc = get_segment_descriptor(seg->id());
logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source));
logstor_logger.trace("Write full segment {} seq {} from {}", seg->id(), seg->seq_num(), write_source_to_string(source));
wb.write_header(desc.seg_gen, cg.schema()->id());
wb.write_header(seg->seq_num(), cg.schema()->id());
auto loc = co_await seg->append(data);
@@ -1096,7 +1107,6 @@ future<> segment_manager_impl::switch_active_segment() {
auto new_seg = co_await get_segment(write_source::normal_write);
auto old_seg = std::exchange(_active_segment, std::move(new_seg));
_segment_seq_num++;
if (old_seg) {
// close old segment in background
@@ -1107,11 +1117,11 @@ future<> segment_manager_impl::switch_active_segment() {
// trigger separator flush for separator buffers that hold old segments
auto u = std::max<size_t>(1, _max_segments / 100);
if (_segment_seq_num % u == 0 && _segment_seq_num > 5*u) {
trigger_separator_flush(_segment_seq_num - 5*u);
if (_next_segment_seq.value % u == 0 && _next_segment_seq.value > 5*u) {
trigger_separator_flush(segment_sequence(_next_segment_seq.value - 5*u));
}
logstor_logger.trace("Switched active segment to {}", _active_segment->id());
logstor_logger.trace("Switched active segment to {} seq {}", _active_segment->id(), _active_segment->seq_num());
}
future<> segment_manager_impl::replenish_reserve() {
@@ -1193,10 +1203,6 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept {
if (desc.ref_count != 0) {
on_internal_error(logstor_logger, format("Freeing segment {} with non-zero reference count", segment_id));
}
desc.on_free_segment();
// TODO write new generation?
_free_segments.push_back(segment_id);
_segment_freed_cv.signal();
@@ -1224,22 +1230,12 @@ future<> segment_manager_impl::discard_segments(segment_set& ss) {
segments.push_back(seg_id);
}
// Invalidate the first header block so recovery treats the slot as empty.
co_await max_concurrent_for_each(segments, 32, [this] (log_segment_id seg_id) -> future<> {
// Write a valid empty segment header with the next generation.
// This marks the segment as discarded while preserving the generation counter.
auto next_gen = get_segment_descriptor(seg_id).seg_gen;
++next_gen;
auto buf = allocate_aligned_buffer<char>(block_alignment, 4096);
std::memset(buf.get(), 0, block_alignment);
simple_memory_output_stream out(buf.get(), write_buffer::buffer_header_size);
write_buffer::write_empty_header(out, next_gen);
logstor_logger.trace("Discard segment {}", seg_id);
auto [file_id, file_offset] = segment_id_to_file_location(seg_id);
auto file = co_await _file_mgr.get_file_for_write(file_id);
co_await file.dma_write(file_offset, buf.get(), block_alignment);
logstor_logger.trace("Discard segment {} next gen {}", seg_id, next_gen);
co_await _file_mgr.format_file_region(file, file_offset, block_alignment);
free_segment(seg_id);
});
}
@@ -1258,7 +1254,7 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id,
.read_ahead = 1,
});
size_t current_position = 0;
auto seg_gen = get_segment_descriptor(segment_id).seg_gen;
std::optional<segment_sequence> segment_seq;
logstor_logger.trace("Reading records from segment {} at file {} offset {}",
segment_id, file_id, file_offset);
@@ -1288,7 +1284,9 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id,
break;
}
if (bh.seg_gen != seg_gen) {
if (!segment_seq) {
segment_seq = bh.segment_seq;
} else if (bh.segment_seq != *segment_seq) {
break;
}
@@ -1755,7 +1753,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() {
return separator_buffer(wb);
}
future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) {
future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, segment_sequence segment_seq_num) {
for (auto&& w : wb.records()) {
co_await coroutine::maybe_yield();
@@ -1891,18 +1889,39 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
next_file_id++;
}
// populate index from all segments. keep the latest record for each key.
size_t allocated_segment_count = next_file_id * _segments_per_file;
std::vector<segment_sequence> segment_seqs(allocated_segment_count, segment_sequence(0));
segment_sequence max_segment_seq = segment_sequence(0);
// Populate the index from all segments. Keep the latest record for each key.
// For equal records, keep the one from the segment with the highest sequence number.
auto cmp_with_seq = [&segment_seqs] (const index_entry& old_entry, const index_entry& candidate) -> std::strong_ordering {
if (auto c = primary_index::default_entry_cmp(old_entry, candidate); c != 0) {
return c;
}
const auto old_seq = segment_seqs[old_entry.location.segment.value];
const auto new_seq = segment_seqs[candidate.location.segment.value];
if (auto c = old_seq <=> new_seq; c != 0) {
return c;
}
return old_entry.location.offset <=> candidate.location.offset;
};
for (auto file_id : found_file_ids) {
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
[this, &db] (log_segment_id seg_id) {
return recover_segment(db, seg_id);
[this, &db, &cmp_with_seq, &segment_seqs, &max_segment_seq] (log_segment_id seg_id) {
return recover_segment(db, seg_id, cmp_with_seq,
[seg_id, &segment_seqs, &max_segment_seq] (const segment_header& seg_hdr) {
segment_seqs[seg_id.value] = seg_hdr.segment_seq;
max_segment_seq = std::max(max_segment_seq, seg_hdr.segment_seq);
});
}
);
}
// go over the index and mark all segments that have live data as used.
size_t allocated_segment_count = next_file_id * _segments_per_file;
utils::dynamic_bitset used_segments(allocated_segment_count);
co_await db.get_tables_metadata().for_each_table_gently([&] (table_id tid, lw_shared_ptr<table> tp) -> future<> {
@@ -1922,9 +1941,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
for (size_t seg_idx = 0; seg_idx < allocated_segment_count; ++seg_idx) {
co_await coroutine::maybe_yield();
log_segment_id seg_id(seg_idx);
auto& desc = get_segment_descriptor(seg_id);
if (!used_segments.test(seg_idx)) {
desc.on_free_segment();
_free_segments.push_back(seg_id);
free_segment_count++;
} else {
@@ -1950,22 +1967,25 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
}
_next_new_segment_id = allocated_segment_count;
_next_segment_seq = max_segment_seq + 1;
_file_mgr.recover_next_file_id(next_file_id);
logstor_logger.info("Recovery complete");
}
future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id) {
future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id,
primary_index::entry_cmp_fn cmp, std::function<void(const segment_header&)> on_header) {
auto& desc = get_segment_descriptor(segment_id);
desc.reset(_cfg.segment_size);
co_await scan_segment(segment_id,
[segment_id] (const segment_header& seg_hdr) {
logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen);
[segment_id, on_header = std::move(on_header)] (const segment_header& seg_hdr) mutable {
logstor_logger.trace("Recovering segment {} with sequence {}", segment_id, seg_hdr.segment_seq);
on_header(seg_hdr);
return make_ready_future<>();
},
[this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data {
[this, &desc, &db, &cmp] (log_location loc, const log_record_header& header) -> want_data {
logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp);
index_entry new_entry {
@@ -1978,7 +1998,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen
if (!t.uses_logstor()) {
return want_data::no;
}
auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry);
auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry, cmp);
if (inserted) {
desc.on_write(loc);
if (prev_entry) {
@@ -2146,7 +2166,7 @@ void segment_manager::set_trigger_compaction_hook(std::function<void()> fn) {
_impl->set_trigger_compaction_hook(std::move(fn));
}
void segment_manager::set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void segment_manager::set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_impl->set_trigger_separator_flush_hook(std::move(fn));
}
@@ -2231,7 +2251,7 @@ public:
future<> segment_manager_impl::load_segment(replica::database& db, log_segment_id seg_id) {
// read the segment and populate the index
co_await recover_segment(db, seg_id);
co_await recover_segment(db, seg_id, primary_index::default_entry_cmp, [] (const segment_header&) {});
auto& desc = get_segment_descriptor(seg_id);
co_await add_segment_to_compaction_group(db, desc);

View File

@@ -129,7 +129,7 @@ public:
const compaction_manager& get_compaction_manager() const noexcept;
void set_trigger_compaction_hook(std::function<void()> fn);
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn);
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn);
size_t get_segment_size() const noexcept;

View File

@@ -9,7 +9,6 @@
#include <cstdint>
#include <fmt/format.h>
#include "replica/logstor/utils.hh"
#include "dht/decorated_key.hh"
#include "mutation/canonical_mutation.hh"
#include "mutation/timestamp.hh"
@@ -35,8 +34,6 @@ struct primary_index_key {
dht::decorated_key dk;
};
using segment_generation = generation_base<uint16_t>;
struct index_entry {
log_location location;
api::timestamp_type timestamp;
@@ -55,6 +52,28 @@ struct log_record {
canonical_mutation mut;
};
struct segment_sequence {
uint64_t value;
bool operator==(const segment_sequence& other) const noexcept = default;
auto operator<=>(const segment_sequence& other) const noexcept = default;
segment_sequence& operator++() noexcept {
++value;
return *this;
}
segment_sequence operator++(int) noexcept {
segment_sequence tmp = *this;
++value;
return tmp;
}
segment_sequence operator+(uint64_t increment) const noexcept {
return segment_sequence{value + increment};
}
};
}
// Format specialization declarations and implementations
@@ -82,3 +101,11 @@ struct fmt::formatter<replica::logstor::primary_index_key> : fmt::formatter<stri
return fmt::format_to(ctx.out(), "{}", key.dk);
}
};
template <>
struct fmt::formatter<replica::logstor::segment_sequence> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(const replica::logstor::segment_sequence& seq, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "sseq({})", seq.value);
}
};

View File

@@ -1,104 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <concepts>
#include "serializer.hh"
namespace replica::logstor {
// an unsigned integer that can be incremented and compared with wraparound semantics
template <std::unsigned_integral T>
class generation_base {
T _value;
public:
using underlying = T;
constexpr generation_base() noexcept : _value(0) {}
constexpr explicit generation_base(T value) noexcept : _value(value) {}
constexpr T value() const noexcept { return _value; }
constexpr generation_base& operator++() noexcept {
++_value;
return *this;
}
constexpr generation_base operator++(int) noexcept {
auto old = *this;
++_value;
return old;
}
constexpr generation_base& operator+=(T delta) noexcept {
_value += delta;
return *this;
}
constexpr generation_base operator+(T delta) const noexcept {
return generation_base(_value + delta);
}
constexpr bool operator==(const generation_base& other) const noexcept = default;
/// Comparison using wraparound semantics.
/// Returns true if this generation is less than other, accounting for wraparound.
/// Assumes generations are within half the value space of each other.
constexpr bool operator<(const generation_base& other) const noexcept {
// Use signed comparison after converting difference to signed type
// This handles wraparound: if diff > max/2, it's treated as negative
using signed_type = std::make_signed_t<T>;
auto diff = static_cast<signed_type>(_value - other._value);
return diff < 0;
}
constexpr bool operator<=(const generation_base& other) const noexcept {
return *this == other || *this < other;
}
constexpr bool operator>(const generation_base& other) const noexcept {
return other < *this;
}
constexpr bool operator>=(const generation_base& other) const noexcept {
return other <= *this;
}
};
}
template <std::unsigned_integral T>
struct fmt::formatter<replica::logstor::generation_base<T>> : fmt::formatter<T> {
template <typename FormatContext>
auto format(const replica::logstor::generation_base<T>& gen, FormatContext& ctx) const {
return fmt::formatter<T>::format(gen.value(), ctx);
}
};
namespace ser {
template <std::unsigned_integral T>
struct serializer<replica::logstor::generation_base<T>> {
template <typename Output>
static void write(Output& out, const replica::logstor::generation_base<T>& g) {
serializer<typename replica::logstor::generation_base<T>::underlying>::write(out, g.value());
}
template <typename Input>
static replica::logstor::generation_base<T> read(Input& in) {
auto val = serializer<typename replica::logstor::generation_base<T>::underlying>::read(in);
return replica::logstor::generation_base<T>(val);
}
template <typename Input>
static void skip(Input& in) {
serializer<typename replica::logstor::generation_base<T>::underlying>::skip(in);
}
};
}

View File

@@ -164,11 +164,12 @@ void write_buffer::finalize(size_t alignment) {
pad_to_alignment(alignment);
}
void write_buffer::write_header(segment_generation seg_gen, std::optional<table_id> table) {
void write_buffer::write_header(segment_sequence segment_seq, std::optional<table_id> table) {
_buffer_header.magic = buffer_header_magic;
_buffer_header.seg_gen = seg_gen;
_buffer_header.kind = _segment_kind;
_buffer_header.version = current_version;
_buffer_header.reserved = 0;
_buffer_header.segment_seq = segment_seq;
_buffer_header.crc = _buffer_header.calculate_crc();
@@ -185,25 +186,16 @@ void write_buffer::write_header(segment_generation seg_gen, std::optional<table_
}
}
void write_buffer::write_empty_header(ostream& out, segment_generation seg_gen) {
buffer_header hdr;
hdr.magic = buffer_header_magic;
hdr.data_size = 0;
hdr.seg_gen = seg_gen;
hdr.kind = segment_kind::mixed;
hdr.version = current_version;
hdr.crc = hdr.calculate_crc();
ser::serialize<buffer_header>(out, hdr);
}
bool write_buffer::validate_header(const write_buffer::buffer_header& bh) {
if (bh.magic != write_buffer::buffer_header_magic) {
return false;
}
if (bh.calculate_crc() != bh.crc) {
switch (bh.kind) {
case segment_kind::mixed:
case segment_kind::full:
break;
default:
return false;
}
@@ -211,6 +203,10 @@ bool write_buffer::validate_header(const write_buffer::buffer_header& bh) {
return false;
}
if (bh.calculate_crc() != bh.crc) {
return false;
}
return true;
}
@@ -248,10 +244,11 @@ size_t write_buffer::estimate_required_segments(size_t net_data_size, size_t rec
uint32_t write_buffer::buffer_header::calculate_crc() const {
utils::crc32 c;
c.process_le(magic);
c.process_le(data_size);
c.process_le(seg_gen.value());
c.process_le(static_cast<uint8_t>(kind));
c.process_le(version);
c.process_le(reserved);
c.process_le(segment_seq.value);
c.process_le(data_size);
return c.get();
}

View File

@@ -114,19 +114,22 @@ public:
struct buffer_header {
uint32_t magic;
uint32_t data_size; // size of all records data following the buffer_header
segment_generation seg_gen;
segment_kind kind;
uint8_t version;
uint16_t reserved;
segment_sequence segment_seq;
uint32_t data_size; // size of all records data following the header(s)
uint32_t crc;
uint32_t calculate_crc() const;
};
static constexpr size_t buffer_header_size =
2 * sizeof(uint32_t)
+ sizeof(segment_generation::underlying)
sizeof(uint32_t)
+ sizeof(std::underlying_type_t<segment_kind>)
+ sizeof(uint8_t)
+ sizeof(uint16_t)
+ sizeof(segment_sequence)
+ sizeof(uint32_t)
+ sizeof(uint32_t);
static_assert(buffer_header_size % record_alignment == 0, "Buffer header size must be aligned by record_alignment");
@@ -234,8 +237,6 @@ public:
return s;
}
static void write_empty_header(ostream& out, segment_generation seg_gen);
static bool validate_header(const buffer_header& bh);
private:
@@ -243,7 +244,7 @@ private:
const char* data() const noexcept { return _buffer.get(); }
// table is set for segment_kind::full
void write_header(segment_generation seg_gen, std::optional<table_id> table);
void write_header(segment_sequence segment_seq, std::optional<table_id> table);
// get all write records in the buffer.
// with_record_copy must be to true when creating the write_buffer.
@@ -329,30 +330,33 @@ struct serializer<replica::logstor::write_buffer::buffer_header> {
template <typename Output>
static void write(Output& out, const replica::logstor::write_buffer::buffer_header& h) {
serializer<uint32_t>::write(out, h.magic);
serializer<uint32_t>::write(out, h.data_size);
serializer<replica::logstor::segment_generation>::write(out, h.seg_gen);
serializer<uint8_t>::write(out, static_cast<uint8_t>(h.kind));
serializer<uint8_t>::write(out, h.version);
serializer<uint16_t>::write(out, h.reserved);
serializer<uint64_t>::write(out, h.segment_seq.value);
serializer<uint32_t>::write(out, h.data_size);
serializer<uint32_t>::write(out, h.crc);
}
template <typename Input>
static replica::logstor::write_buffer::buffer_header read(Input& in) {
replica::logstor::write_buffer::buffer_header h;
h.magic = serializer<uint32_t>::read(in);
h.data_size = serializer<uint32_t>::read(in);
h.seg_gen = serializer<replica::logstor::segment_generation>::read(in);
h.kind = static_cast<replica::logstor::segment_kind>(serializer<uint8_t>::read(in));
h.version = serializer<uint8_t>::read(in);
h.reserved = serializer<uint16_t>::read(in);
h.segment_seq = replica::logstor::segment_sequence{serializer<uint64_t>::read(in)};
h.data_size = serializer<uint32_t>::read(in);
h.crc = serializer<uint32_t>::read(in);
return h;
}
template <typename Input>
static void skip(Input& in) {
serializer<uint32_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint16_t>::skip(in);
serializer<uint64_t>::skip(in);
serializer<uint32_t>::skip(in);
serializer<replica::logstor::segment_generation>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint32_t>::skip(in);
}
};

View File

@@ -1113,7 +1113,7 @@ future<> compaction_group::discard_logstor_segments() {
co_await sm.discard_segments(*_logstor_segments);
}
future<> compaction_group::flush_separator(std::optional<size_t> seq_num) {
future<> compaction_group::flush_separator(std::optional<logstor::segment_sequence> seq_num) {
auto units = co_await get_units(_separator_flush_sem, 1);
auto pending = std::exchange(_separator_flushes, {});
if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) {
@@ -2611,7 +2611,7 @@ void table::try_trigger_compaction(compaction_group& cg) noexcept {
}
}
future<> table::flush_separator(std::optional<size_t> seq_num) {
future<> table::flush_separator(std::optional<logstor::segment_sequence> seq_num) {
if (!uses_logstor()) {
co_return;
}