Merge 'logstor: compare records by timestamp and segment sequence number' from Michael Litvak

Add the record timestamp. The timestamp is extracted from the row marker
of the mutation when we write it.
When inserting a record to index, we compare it with the existing
record, and insert it only if it has newer timestamp.

Add a segment sequence number that is a global (per-shard) increasing
number that is allocated when getting a new segment for write, and is
written in buffer headers in the segment.
It is used to distinguish between buffers written to different generations
of a segment, and for recovery to break ties by keeping the record
from the newest segment.

Refs https://scylladb.atlassian.net/browse/SCYLLADB-770

no backport - logstor is a new feature

Closes scylladb/scylladb#29933

* github.com:scylladb/scylladb:
  test: logstor: add basic delete test
  logstor: rewrite segment seq num from streaming
  logstor: add segment sequence number
  logstor: get_segment helper
  logstor: compare records by timestamp
This commit is contained in:
Botond Dénes
2026-05-21 08:44:18 +03:00
17 changed files with 311 additions and 277 deletions

View File

@@ -165,11 +165,14 @@ A serialized form of `write_buffer::buffer_header`.
| Offset | Size | Field | Description |
|--------|------|-------------|-------------|
| 0 | 4 | `magic` | `0x4C475342` ("LGSB"). Used to detect valid buffers during recovery. |
| 4 | 4 | `data_size` | Size in bytes of all record data following the header(s). |
| 8 | 2 | `seg_gen` | Segment generation number. Incremented each time the segment slot is reused. Used during recovery to discard stale data. |
| 10 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. |
| 11 | 1 | `version` | Version of the write buffer format. |
| 12 | 4 | `crc` | CRC32 of all other fields in the buffer header. Used for validating the header. |
| 4 | 1 | `kind` | Segment kind: `0` = mixed, `1` = full. |
| 5 | 1 | `version` | Version of the write buffer format. |
| 6 | 2 | `reserved` | Reserved for future use. Currently written as zero and included in the CRC. |
| 8 | 8 | `segment_seq` | Monotonic segment sequence number used during recovery and segment ordering checks. |
| 16 | 4 | `data_size` | Size in bytes of all record data following the header(s). |
| 20 | 4 | `crc` | CRC32 of all preceding buffer header fields. Used for validating the header. |
The buffer header is 24 bytes long, which keeps it aligned to `record_alignment` (8 bytes).
#### Segment Header (full segments only)
@@ -179,9 +182,9 @@ A serialized form of `write_buffer::segment_header`.
| Offset | Size | Field | Description |
|--------|------|---------------|-------------|
| 16 | 16 | `table` | UUID of the table this segment belongs to. |
| 32 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). |
| 40 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). |
| 24 | 16 | `table` | UUID of the table this segment belongs to. |
| 40 | 8 | `first_token` | Minimum token of all records in the segment (raw token number). |
| 48 | 8 | `last_token` | Maximum token of all records in the segment (raw token number). |
#### Records
@@ -205,7 +208,7 @@ zero_padding -- to align to record_alignment (8 bytes)
The `header_size` bytes immediately following the record header are the IDL-serialized form of `log_record_header`, which contains:
- `key`: the partition key (`primary_index_key`), including a `decorated_key` with a token and partition key bytes.
- `generation`: a 16-bit write generation number, used during recovery to resolve conflicts when the same key appears in multiple segments.
- `timestamp`: the timestamp of the record, used to resolve conflicts by keeping the record with the latest timestamp.
- `table`: UUID of the table this record belongs to.
**Mutation Data**:

View File

@@ -8,6 +8,7 @@
#include "idl/frozen_schema.idl.hh"
#include "idl/token.idl.hh"
#include "mutation/timestamp.hh"
namespace replica {
namespace logstor {
@@ -18,7 +19,7 @@ struct primary_index_key {
class log_record_header {
replica::logstor::primary_index_key key;
replica::logstor::record_generation generation;
api::timestamp_type timestamp;
table_id table;
};

View File

@@ -317,7 +317,7 @@ public:
future<> discard_logstor_segments();
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
logstor::separator_buffer& get_separator_buffer(size_t write_size);
logstor::segment_set& logstor_segments() noexcept {

View File

@@ -16,6 +16,8 @@
#include "locator/network_topology_strategy.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata_fwd.hh"
#include "replica/logstor/compaction.hh"
#include "replica/logstor/types.hh"
#include "utils/log.hh"
#include "replica/database_fwd.hh"
#include <seastar/core/shard_id.hh>
@@ -955,7 +957,7 @@ database::init_logstor() {
trigger_logstor_compaction(false);
});
_logstor->set_trigger_separator_flush_hook([this] (size_t seq_num) {
_logstor->set_trigger_separator_flush_hook([this] (logstor::segment_sequence seq_num) {
(void)flush_logstor_separator(seq_num);
});
@@ -2955,7 +2957,7 @@ future<> database::flush_logstor_separator_on_all_shards(sharded<database>& shar
});
}
future<> database::flush_logstor_separator(std::optional<size_t> seq_num) {
future<> database::flush_logstor_separator(std::optional<logstor::segment_sequence> seq_num) {
return _tables_metadata.parallel_for_each_table([seq_num] (table_id, lw_shared_ptr<table> table) {
return table->flush_separator(seq_num);
});

View File

@@ -1179,7 +1179,7 @@ public:
return _logstor->get_compaction_manager();
}
future<> flush_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
future<logstor::table_segment_stats> get_logstor_segment_stats() const;
@@ -2069,7 +2069,7 @@ public:
static future<> trigger_logstor_compaction_on_all_shards(sharded<database>& sharded_db, bool major);
void trigger_logstor_compaction(bool major);
static future<> flush_logstor_separator_on_all_shards(sharded<database>& sharded_db);
future<> flush_logstor_separator(std::optional<size_t> seq_num = std::nullopt);
future<> flush_logstor_separator(std::optional<logstor::segment_sequence> seq_num = std::nullopt);
future<logstor::table_segment_stats> get_logstor_table_segment_stats(table_id table) const;
size_t get_logstor_memory_usage() const;

View File

@@ -33,7 +33,6 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
// when freeing a record, increase by the record's net data size
size_t free_space{0};
size_t record_count{0};
segment_generation seg_gen{1};
segment_set* owner{nullptr}; // non-owning, set when added to a segment_set
int ref_count{0};
@@ -46,10 +45,6 @@ struct segment_descriptor : public log_heap_hook<segment_descriptor_hist_options
return segment_size - free_space;
}
void on_free_segment() noexcept {
++seg_gen;
}
void on_write(size_t net_data_size, size_t cnt = 1) noexcept {
free_space -= net_data_size;
record_count += cnt;
@@ -160,7 +155,7 @@ struct separator_buffer {
write_buffer* buf;
utils::chunked_vector<future<>> pending_updates;
utils::chunked_vector<segment_ref> held_segments;
std::optional<size_t> min_seq_num;
std::optional<segment_sequence> min_seq_num;
bool flushed{false};
separator_buffer(write_buffer* wb)

View File

@@ -14,6 +14,7 @@
#include "utils/bptree.hh"
#include "utils/double-decker.hh"
#include "utils/phased_barrier.hh"
#include <utility>
namespace replica::logstor {
@@ -100,20 +101,6 @@ public:
}
}
std::optional<index_entry> exchange(const primary_index_key& key, index_entry new_entry) {
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint);
if (hint.match) {
auto old_entry = i->_e;
i->_e = std::move(new_entry);
return old_entry;
} else {
_partitions.emplace_before(i, key.dk.token().raw(), hint, key.dk, std::move(new_entry));
++_key_count;
return std::nullopt;
}
}
bool update_record_location(const primary_index_key& key, log_location old_location, log_location new_location) {
auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema));
if (it != _partitions.end()) {
@@ -125,11 +112,17 @@ public:
return false;
}
std::pair<bool, std::optional<index_entry>> insert_if_newer(const primary_index_key& key, index_entry new_entry, bool prefer_on_tie) {
using entry_cmp_fn = std::function<std::strong_ordering(const index_entry&, const index_entry&)>;
static std::strong_ordering default_entry_cmp(const index_entry& a, const index_entry& b) noexcept {
return a.timestamp <=> b.timestamp;
}
std::pair<bool, std::optional<index_entry>> insert(const primary_index_key& key, index_entry new_entry, entry_cmp_fn cmp = default_entry_cmp) {
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint);
if (hint.match) {
if (i->_e.generation < new_entry.generation || (i->_e.generation == new_entry.generation && prefer_on_tie)) {
if (cmp(i->_e, new_entry) <= 0) {
auto old_entry = i->_e;
i->_e = std::move(new_entry);
return {true, std::make_optional(old_entry)};

View File

@@ -21,6 +21,25 @@ namespace replica::logstor {
seastar::logger logstor_logger("logstor");
static api::timestamp_type extract_logstor_record_timestamp(const mutation& m) {
const auto& partition = m.partition();
for (const auto& row_entry : partition.clustered_rows()) {
if (row_entry.dummy()) {
continue;
}
if (!row_entry.row().marker().is_missing()) {
return row_entry.row().marker().timestamp();
}
}
if (const auto partition_tombstone = partition.partition_tombstone(); partition_tombstone) {
return partition_tombstone.timestamp;
}
throw std::runtime_error("logstor mutation has no row marker or partition tombstone timestamp");
}
logstor::logstor(logstor_config config)
: _segment_manager(config.segment_manager_cfg)
, _write_buffer(_segment_manager, config.flush_sg) {
@@ -73,33 +92,32 @@ future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate::
table_id table = m.schema()->id();
auto& index = cg.get_logstor_index();
// TODO ?
record_generation gen = index.get(key)
.transform([](const index_entry& entry) {
return entry.generation + 1;
}).value_or(record_generation(1));
const auto ts = extract_logstor_record_timestamp(m);
log_record record {
.header = {
.key = key,
.generation = gen,
.timestamp = ts,
.table = table,
},
.mut = canonical_mutation(m)
};
return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, gen, key = std::move(key)]
return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, ts, key = std::move(key)]
(log_location location, seastar::gate::holder op) {
index_entry new_entry {
.location = location,
.generation = gen,
.timestamp = ts,
};
auto old_entry = index.exchange(key, std::move(new_entry));
auto [inserted, prev_entry] = index.insert(key, std::move(new_entry));
// If overwriting, free old record
if (old_entry) {
_segment_manager.free_record(old_entry->location);
if (!inserted) {
// A newer entry already exists; free the record we just wrote.
_segment_manager.free_record(location);
} else if (prev_entry) {
// Overwrote an older entry; free it.
_segment_manager.free_record(prev_entry->location);
}
}).handle_exception([] (std::exception_ptr ep) {
logstor_logger.error("Error writing mutation: {}", ep);
@@ -292,7 +310,7 @@ void logstor::set_trigger_compaction_hook(std::function<void()> fn) {
_segment_manager.set_trigger_compaction_hook(std::move(fn));
}
void logstor::set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void logstor::set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_segment_manager.set_trigger_separator_flush_hook(std::move(fn));
}

View File

@@ -74,7 +74,7 @@ public:
tracing::trace_state_ptr trace_state = nullptr);
void set_trigger_compaction_hook(std::function<void()> fn);
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn);
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn);
};
} // namespace logstor

View File

@@ -69,6 +69,7 @@ protected:
class writeable_segment : public segment {
seastar::gate _write_gate;
segment_ref _seg_ref;
segment_sequence _seq_num;
uint32_t _current_offset = 0; // next offset for write
@@ -77,7 +78,7 @@ class writeable_segment : public segment {
public:
using segment::segment;
void start(segment_ref seg_ref);
void start(segment_ref, segment_sequence);
future<> stop();
@@ -100,6 +101,10 @@ public:
segment_ref ref() {
return _seg_ref;
}
segment_sequence seq_num() const noexcept {
return _seq_num;
}
};
segment::segment(log_segment_id id, seastar::file file, uint64_t file_offset, size_t max_size)
@@ -132,8 +137,9 @@ future<log_record> segment::read(log_location loc) {
});
}
void writeable_segment::start(segment_ref seg_ref) {
void writeable_segment::start(segment_ref seg_ref, segment_sequence seq_num) {
_seg_ref = std::move(seg_ref);
_seq_num = seq_num;
}
future<> writeable_segment::stop() {
@@ -197,6 +203,9 @@ class file_manager {
std::vector<seastar::file> _open_read_files;
std::unique_ptr<char[], seastar::free_deleter> _zero_buf;
size_t _zero_buf_size{0};
public:
file_manager(segment_manager_config cfg)
: _segments_per_file(cfg.file_size / cfg.segment_size)
@@ -237,6 +246,9 @@ public:
future<> file_manager::start() {
co_await seastar::recursive_touch_directory(_base_dir.string());
_zero_buf_size = 128 * 1024;
_zero_buf = allocate_aligned_buffer<char>(_zero_buf_size, 4096);
std::memset(_zero_buf.get(), 0, _zero_buf_size);
}
future<> file_manager::stop() {
@@ -326,19 +338,13 @@ future<seastar::file> file_manager::get_file_for_read(size_t file_id) {
}
future<> file_manager::format_file_region(seastar::file file, uint64_t offset, size_t size) {
// Allocate aligned buffer for zeroing
const auto write_alignment = file.disk_write_dma_alignment();
size_t buf_size = align_up<size_t>(128 * 1024, size_t(write_alignment));
auto zero_buf = allocate_aligned_buffer<char>(buf_size, write_alignment);
std::memset(zero_buf.get(), 0, buf_size);
// Write zeros to entire region
// Write zeros to entire region using the pre-allocated zero buffer
size_t remaining = size;
uint64_t current_offset = offset;
while (remaining > 0) {
auto write_size = std::min(remaining, buf_size);
auto written = co_await file.dma_write(current_offset, zero_buf.get(), write_size);
auto write_size = std::min(remaining, _zero_buf_size);
auto written = co_await file.dma_write(current_offset, _zero_buf.get(), write_size);
current_offset += written;
remaining -= written;
@@ -554,7 +560,7 @@ public:
struct segment_header {
segment_kind kind;
segment_generation seg_gen;
segment_sequence segment_seq;
struct mixed {};
struct full {
@@ -569,7 +575,7 @@ struct segment_header {
static segment_header make_segment_header(const write_buffer::buffer_header& bh, std::optional<write_buffer::segment_header> sh) {
segment_header seg_hdr {
.kind = bh.kind,
.seg_gen = bh.seg_gen,
.segment_seq = bh.segment_seq,
};
switch (bh.kind) {
@@ -621,7 +627,7 @@ class segment_manager_impl {
seastar::semaphore _active_segment_write_sem{1};
segment_pool _segment_pool;
std::optional<shared_future<>> _switch_segment_fut;
size_t _segment_seq_num{0};
segment_sequence _next_segment_seq{1};
seastar::gate _async_gate;
future<> _reserve_replenisher{make_ready_future<>()};
@@ -639,7 +645,7 @@ class segment_manager_impl {
std::vector<write_buffer*> _available_separator_buffers;
std::function<void()> _trigger_compaction_fn;
std::function<void(size_t)> _trigger_separator_flush_fn;
std::function<void(segment_sequence)> _trigger_separator_flush_fn;
utils::phased_barrier _writes_phaser{"logstor_sm_writes"};
@@ -682,7 +688,7 @@ public:
}
future<> load_segment(replica::database&, log_segment_id);
future<> recover_segment(replica::database&, log_segment_id);
future<> recover_segment(replica::database&, log_segment_id, primary_index::entry_cmp_fn cmp, std::function<void(const segment_header&)> on_header);
future<std::optional<segment_header>> read_segment_header(log_segment_id);
future<> add_segment_to_compaction_group(replica::database&, segment_descriptor&);
@@ -692,7 +698,7 @@ public:
}
}
void trigger_separator_flush(size_t seq) {
void trigger_separator_flush(segment_sequence seq) {
if (_trigger_separator_flush_fn) {
_trigger_separator_flush_fn(seq);
}
@@ -710,7 +716,7 @@ public:
_trigger_compaction_fn = std::move(fn);
}
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_trigger_separator_flush_fn = std::move(fn);
}
@@ -740,9 +746,12 @@ private:
future<> request_segment_switch();
future<> switch_active_segment();
segment_sequence allocate_segment_seq() noexcept {
return _next_segment_seq++;
}
std::chrono::microseconds calculate_separator_delay() const;
future<> write_to_separator(write_buffer&, segment_ref, size_t segment_seq_num);
future<> write_to_separator(write_buffer&, segment_ref, segment_sequence);
void write_to_separator(table&, log_location prev_loc, log_record, segment_ref);
// Sequentially scans one segment and invokes callbacks for the decoded
@@ -775,6 +784,13 @@ private:
);
}
future<seg_ptr> get_segment(write_source src) {
seg_ptr seg = co_await _segment_pool.get_segment(src);
seg->start(make_segment_ref(seg->id()), allocate_segment_seq());
_stats.segments_in_use++;
co_return seg;
}
void free_segment(log_segment_id) noexcept;
segment_descriptor& get_segment_descriptor(log_segment_id segment_id) {
@@ -984,7 +1000,7 @@ future<> segment_manager_impl::write(write_buffer& wb) {
seg_ptr seg = _active_segment;
auto seg_holder = seg->hold();
auto seg_ref = seg->ref();
auto segment_seq_num = _segment_seq_num;
auto seq_num = seg->seq_num();
auto write_op = _writes_phaser.start();
auto& desc = get_segment_descriptor(seg->id());
@@ -993,7 +1009,9 @@ future<> segment_manager_impl::write(write_buffer& wb) {
seg_ref.set_flush_failure();
});
wb.write_header(desc.seg_gen, std::nullopt);
logstor_logger.trace("Write active segment {} seq {}", seg->id(), seq_num);
wb.write_header(seq_num, std::nullopt);
auto loc = co_await seg->append(data);
sem_units.return_all();
@@ -1008,7 +1026,7 @@ future<> segment_manager_impl::write(write_buffer& wb) {
co_await wb.complete_writes(loc);
co_await with_scheduling_group(_cfg.separator_sg, [&] {
return write_to_separator(wb, std::move(seg_ref), segment_seq_num);
return write_to_separator(wb, std::move(seg_ref), seq_num);
});
write_to_separator_failed.cancel();
}
@@ -1030,13 +1048,12 @@ future<> segment_manager_impl::write_full_segment(write_buffer& wb, compaction_g
throw std::runtime_error(fmt::format("Write size {} exceeds segment size {}", data.size(), _cfg.segment_size));
}
seg_ptr seg = co_await _segment_pool.get_segment(source);
auto seg = co_await get_segment(source);
auto& desc = get_segment_descriptor(seg->id());
_stats.segments_in_use++;
logstor_logger.trace("Write full segment {} from {}", seg->id(), write_source_to_string(source));
logstor_logger.trace("Write full segment {} seq {} from {}", seg->id(), seg->seq_num(), write_source_to_string(source));
wb.write_header(desc.seg_gen, cg.schema()->id());
wb.write_header(seg->seq_num(), cg.schema()->id());
auto loc = co_await seg->append(data);
@@ -1087,11 +1104,9 @@ future<> segment_manager_impl::request_segment_switch() {
future<> segment_manager_impl::switch_active_segment() {
auto holder = _async_gate.hold();
auto new_seg = co_await _segment_pool.get_segment(write_source::normal_write);
auto new_seg = co_await get_segment(write_source::normal_write);
auto old_seg = std::exchange(_active_segment, std::move(new_seg));
_stats.segments_in_use++;
_segment_seq_num++;
if (old_seg) {
// close old segment in background
@@ -1102,12 +1117,11 @@ future<> segment_manager_impl::switch_active_segment() {
// trigger separator flush for separator buffers that hold old segments
auto u = std::max<size_t>(1, _max_segments / 100);
if (_segment_seq_num % u == 0 && _segment_seq_num > 5*u) {
trigger_separator_flush(_segment_seq_num - 5*u);
if (_next_segment_seq.value % u == 0 && _next_segment_seq.value > 5*u) {
trigger_separator_flush(segment_sequence(_next_segment_seq.value - 5*u));
}
_active_segment->start(make_segment_ref(_active_segment->id()));
logstor_logger.trace("Switched active segment to {}", _active_segment->id());
logstor_logger.trace("Switched active segment to {} seq {}", _active_segment->id(), _active_segment->seq_num());
}
future<> segment_manager_impl::replenish_reserve() {
@@ -1189,10 +1203,6 @@ void segment_manager_impl::free_segment(log_segment_id segment_id) noexcept {
if (desc.ref_count != 0) {
on_internal_error(logstor_logger, format("Freeing segment {} with non-zero reference count", segment_id));
}
desc.on_free_segment();
// TODO write new generation?
_free_segments.push_back(segment_id);
_segment_freed_cv.signal();
@@ -1220,22 +1230,12 @@ future<> segment_manager_impl::discard_segments(segment_set& ss) {
segments.push_back(seg_id);
}
// Invalidate the first header block so recovery treats the slot as empty.
co_await max_concurrent_for_each(segments, 32, [this] (log_segment_id seg_id) -> future<> {
// Write a valid empty segment header with the next generation.
// This marks the segment as discarded while preserving the generation counter.
auto next_gen = get_segment_descriptor(seg_id).seg_gen;
++next_gen;
auto buf = allocate_aligned_buffer<char>(block_alignment, 4096);
std::memset(buf.get(), 0, block_alignment);
simple_memory_output_stream out(buf.get(), write_buffer::buffer_header_size);
write_buffer::write_empty_header(out, next_gen);
logstor_logger.trace("Discard segment {}", seg_id);
auto [file_id, file_offset] = segment_id_to_file_location(seg_id);
auto file = co_await _file_mgr.get_file_for_write(file_id);
co_await file.dma_write(file_offset, buf.get(), block_alignment);
logstor_logger.trace("Discard segment {} next gen {}", seg_id, next_gen);
co_await _file_mgr.format_file_region(file, file_offset, block_alignment);
free_segment(seg_id);
});
}
@@ -1254,7 +1254,7 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id,
.read_ahead = 1,
});
size_t current_position = 0;
auto seg_gen = get_segment_descriptor(segment_id).seg_gen;
std::optional<segment_sequence> segment_seq;
logstor_logger.trace("Reading records from segment {} at file {} offset {}",
segment_id, file_id, file_offset);
@@ -1284,7 +1284,9 @@ future<> segment_manager_impl::scan_segment(log_segment_id segment_id,
break;
}
if (bh.seg_gen != seg_gen) {
if (!segment_seq) {
segment_seq = bh.segment_seq;
} else if (bh.segment_seq != *segment_seq) {
break;
}
@@ -1751,7 +1753,7 @@ separator_buffer compaction_manager_impl::allocate_separator_buffer() {
return separator_buffer(wb);
}
future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, size_t segment_seq_num) {
future<> segment_manager_impl::write_to_separator(write_buffer& wb, segment_ref seg_ref, segment_sequence segment_seq_num) {
for (auto&& w : wb.records()) {
co_await coroutine::maybe_yield();
@@ -1887,18 +1889,39 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
next_file_id++;
}
// populate index from all segments. keep the latest record for each key.
size_t allocated_segment_count = next_file_id * _segments_per_file;
std::vector<segment_sequence> segment_seqs(allocated_segment_count, segment_sequence(0));
segment_sequence max_segment_seq = segment_sequence(0);
// Populate the index from all segments. Keep the latest record for each key.
// For equal records, keep the one from the segment with the highest sequence number.
auto cmp_with_seq = [&segment_seqs] (const index_entry& old_entry, const index_entry& candidate) -> std::strong_ordering {
if (auto c = primary_index::default_entry_cmp(old_entry, candidate); c != 0) {
return c;
}
const auto old_seq = segment_seqs[old_entry.location.segment.value];
const auto new_seq = segment_seqs[candidate.location.segment.value];
if (auto c = old_seq <=> new_seq; c != 0) {
return c;
}
return old_entry.location.offset <=> candidate.location.offset;
};
for (auto file_id : found_file_ids) {
logstor_logger.info("Recovering segments from file {}: {}%", _file_mgr.get_file_path(file_id).string(), (file_id + 1) * 100 / found_file_ids.size());
co_await max_concurrent_for_each(segments_in_file(file_id), 32,
[this, &db] (log_segment_id seg_id) {
return recover_segment(db, seg_id);
[this, &db, &cmp_with_seq, &segment_seqs, &max_segment_seq] (log_segment_id seg_id) {
return recover_segment(db, seg_id, cmp_with_seq,
[seg_id, &segment_seqs, &max_segment_seq] (const segment_header& seg_hdr) {
segment_seqs[seg_id.value] = seg_hdr.segment_seq;
max_segment_seq = std::max(max_segment_seq, seg_hdr.segment_seq);
});
}
);
}
// go over the index and mark all segments that have live data as used.
size_t allocated_segment_count = next_file_id * _segments_per_file;
utils::dynamic_bitset used_segments(allocated_segment_count);
co_await db.get_tables_metadata().for_each_table_gently([&] (table_id tid, lw_shared_ptr<table> tp) -> future<> {
@@ -1918,9 +1941,7 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
for (size_t seg_idx = 0; seg_idx < allocated_segment_count; ++seg_idx) {
co_await coroutine::maybe_yield();
log_segment_id seg_id(seg_idx);
auto& desc = get_segment_descriptor(seg_id);
if (!used_segments.test(seg_idx)) {
desc.on_free_segment();
_free_segments.push_back(seg_id);
free_segment_count++;
} else {
@@ -1946,32 +1967,30 @@ future<> segment_manager_impl::do_recovery(replica::database& db) {
}
_next_new_segment_id = allocated_segment_count;
_next_segment_seq = max_segment_seq + 1;
_file_mgr.recover_next_file_id(next_file_id);
logstor_logger.info("Recovery complete");
}
future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id) {
future<> segment_manager_impl::recover_segment(replica::database& db, log_segment_id segment_id,
primary_index::entry_cmp_fn cmp, std::function<void(const segment_header&)> on_header) {
auto& desc = get_segment_descriptor(segment_id);
desc.reset(_cfg.segment_size);
auto seg_hdr = co_await read_segment_header(segment_id);
if (!seg_hdr) {
logstor_logger.trace("Segment {} has invalid header, skipping", segment_id);
co_return;
}
desc.seg_gen = seg_hdr->seg_gen;
bool is_full_segment = seg_hdr->kind == segment_kind::full;
logstor_logger.trace("Recovering segment {} with generation {}", segment_id, desc.seg_gen);
co_await for_each_record(segment_id,
[this, &desc, &db, is_full_segment] (log_location loc, const log_record_header& header) -> want_data {
logstor_logger.trace("Recovery: read record at {} gen {}", loc, header.generation);
co_await scan_segment(segment_id,
[segment_id, on_header = std::move(on_header)] (const segment_header& seg_hdr) mutable {
logstor_logger.trace("Recovering segment {} with sequence {}", segment_id, seg_hdr.segment_seq);
on_header(seg_hdr);
return make_ready_future<>();
},
[this, &desc, &db, &cmp] (log_location loc, const log_record_header& header) -> want_data {
logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp);
index_entry new_entry {
.location = loc,
.generation = header.generation
.timestamp = header.timestamp
};
try {
@@ -1979,7 +1998,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen
if (!t.uses_logstor()) {
return want_data::no;
}
auto [inserted, prev_entry] = t.logstor_index().insert_if_newer(header.key, new_entry, is_full_segment);
auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry, cmp);
if (inserted) {
desc.on_write(loc);
if (prev_entry) {
@@ -2147,7 +2166,7 @@ void segment_manager::set_trigger_compaction_hook(std::function<void()> fn) {
_impl->set_trigger_compaction_hook(std::move(fn));
}
void segment_manager::set_trigger_separator_flush_hook(std::function<void(size_t)> fn) {
void segment_manager::set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn) {
_impl->set_trigger_separator_flush_hook(std::move(fn));
}
@@ -2177,6 +2196,51 @@ future<> segment_manager::await_pending_writes() {
class segment_data_sink_impl : public data_sink_impl {
seg_ptr _segment;
std::vector<char> _pending_data;
std::optional<size_t> _initial_header_size;
bool _header_rewritten = false;
write_buffer::buffer_header read_buffer_header() const {
simple_memory_input_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size);
return ser::deserialize(bh_stream, std::type_identity<write_buffer::buffer_header>{});
}
void maybe_parse_initial_header() {
if (_initial_header_size || _pending_data.size() < write_buffer::buffer_header_size) {
return;
}
auto bh = read_buffer_header();
if (!write_buffer::validate_header(bh)) {
throw std::runtime_error("Invalid streamed logstor buffer header");
}
size_t header_size = write_buffer::buffer_header_size;
if (bh.kind == segment_kind::full) {
header_size += write_buffer::segment_header_size;
}
_initial_header_size = header_size;
}
void rewrite_buffer_header() {
auto bh = read_buffer_header();
logstor_logger.trace("Rewriting buffer header for segment {} seq {} with seq {}", _segment->id(), bh.segment_seq, _segment->seq_num());
bh.segment_seq = _segment->seq_num();
bh.crc = bh.calculate_crc();
simple_memory_output_stream bh_stream(_pending_data.data(), write_buffer::buffer_header_size);
ser::serialize<write_buffer::buffer_header>(bh_stream, bh);
}
future<> flush_pending_data() {
if (_pending_data.empty()) {
co_return;
}
co_await _segment->append(bytes_view(reinterpret_cast<const int8_t*>(_pending_data.data()), _pending_data.size()));
_pending_data.clear();
}
public:
segment_data_sink_impl(seg_ptr segment)
: _segment(std::move(segment))
@@ -2184,11 +2248,29 @@ public:
virtual future<> put(std::span<temporary_buffer<char>> data) override {
for (auto& buf : data) {
co_await _segment->append(bytes_view((const int8_t*)buf.get(), buf.size()));
if (buf.empty()) {
continue;
}
if (_header_rewritten) {
co_await _segment->append(bytes_view(reinterpret_cast<const int8_t*>(buf.get()), buf.size()));
continue;
}
_pending_data.insert(_pending_data.end(), buf.get(), buf.get() + buf.size());
maybe_parse_initial_header();
if (_initial_header_size && _pending_data.size() >= *_initial_header_size) {
rewrite_buffer_header();
_header_rewritten = true;
co_await flush_pending_data();
}
}
}
virtual future<> close() override {
if (!_header_rewritten && !_pending_data.empty()) {
throw std::runtime_error("Truncated streamed logstor segment header");
}
co_return;
}
@@ -2226,22 +2308,20 @@ public:
co_await _sm.load_segment(_db, _seg->id());
}
future<> abort() override {
_sm.free_segment(_seg->id());
co_return;
}
};
future<> segment_manager_impl::load_segment(replica::database& db, log_segment_id seg_id) {
// read the segment and populate the index
co_await recover_segment(db, seg_id);
co_await recover_segment(db, seg_id, primary_index::default_entry_cmp, [] (const segment_header&) {});
auto& desc = get_segment_descriptor(seg_id);
co_await add_segment_to_compaction_group(db, desc);
}
future<std::unique_ptr<segment_stream_sink>> segment_manager_impl::create_segment_output_stream(replica::database& db) {
auto seg = co_await _segment_pool.get_segment(write_source::streaming);
_stats.segments_in_use++;
auto seg = co_await get_segment(write_source::streaming);
co_return std::make_unique<segment_stream_sink_impl>(*this, db, std::move(seg));
}

View File

@@ -129,7 +129,7 @@ public:
const compaction_manager& get_compaction_manager() const noexcept;
void set_trigger_compaction_hook(std::function<void()> fn);
void set_trigger_separator_flush_hook(std::function<void(size_t)> fn);
void set_trigger_separator_flush_hook(std::function<void(segment_sequence)> fn);
size_t get_segment_size() const noexcept;

View File

@@ -9,9 +9,9 @@
#include <cstdint>
#include <fmt/format.h>
#include "mutation/canonical_mutation.hh"
#include "replica/logstor/utils.hh"
#include "dht/decorated_key.hh"
#include "mutation/canonical_mutation.hh"
#include "mutation/timestamp.hh"
namespace replica::logstor {
@@ -34,19 +34,16 @@ struct primary_index_key {
dht::decorated_key dk;
};
using record_generation = generation_base<uint16_t>;
using segment_generation = generation_base<uint16_t>;
struct index_entry {
log_location location;
record_generation generation;
api::timestamp_type timestamp;
bool operator==(const index_entry& other) const noexcept = default;
};
struct log_record_header {
primary_index_key key;
record_generation generation;
api::timestamp_type timestamp;
table_id table;
};
@@ -55,6 +52,28 @@ struct log_record {
canonical_mutation mut;
};
struct segment_sequence {
uint64_t value;
bool operator==(const segment_sequence& other) const noexcept = default;
auto operator<=>(const segment_sequence& other) const noexcept = default;
segment_sequence& operator++() noexcept {
++value;
return *this;
}
segment_sequence operator++(int) noexcept {
segment_sequence tmp = *this;
++value;
return tmp;
}
segment_sequence operator+(uint64_t increment) const noexcept {
return segment_sequence{value + increment};
}
};
}
// Format specialization declarations and implementations
@@ -82,3 +101,11 @@ struct fmt::formatter<replica::logstor::primary_index_key> : fmt::formatter<stri
return fmt::format_to(ctx.out(), "{}", key.dk);
}
};
template <>
struct fmt::formatter<replica::logstor::segment_sequence> : fmt::formatter<string_view> {
template <typename FormatContext>
auto format(const replica::logstor::segment_sequence& seq, FormatContext& ctx) const {
return fmt::format_to(ctx.out(), "sseq({})", seq.value);
}
};

View File

@@ -1,104 +0,0 @@
/*
* Copyright (C) 2026-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#pragma once
#include <concepts>
#include "serializer.hh"
namespace replica::logstor {
// an unsigned integer that can be incremented and compared with wraparound semantics
template <std::unsigned_integral T>
class generation_base {
T _value;
public:
using underlying = T;
constexpr generation_base() noexcept : _value(0) {}
constexpr explicit generation_base(T value) noexcept : _value(value) {}
constexpr T value() const noexcept { return _value; }
constexpr generation_base& operator++() noexcept {
++_value;
return *this;
}
constexpr generation_base operator++(int) noexcept {
auto old = *this;
++_value;
return old;
}
constexpr generation_base& operator+=(T delta) noexcept {
_value += delta;
return *this;
}
constexpr generation_base operator+(T delta) const noexcept {
return generation_base(_value + delta);
}
constexpr bool operator==(const generation_base& other) const noexcept = default;
/// Comparison using wraparound semantics.
/// Returns true if this generation is less than other, accounting for wraparound.
/// Assumes generations are within half the value space of each other.
constexpr bool operator<(const generation_base& other) const noexcept {
// Use signed comparison after converting difference to signed type
// This handles wraparound: if diff > max/2, it's treated as negative
using signed_type = std::make_signed_t<T>;
auto diff = static_cast<signed_type>(_value - other._value);
return diff < 0;
}
constexpr bool operator<=(const generation_base& other) const noexcept {
return *this == other || *this < other;
}
constexpr bool operator>(const generation_base& other) const noexcept {
return other < *this;
}
constexpr bool operator>=(const generation_base& other) const noexcept {
return other <= *this;
}
};
}
template <std::unsigned_integral T>
struct fmt::formatter<replica::logstor::generation_base<T>> : fmt::formatter<T> {
template <typename FormatContext>
auto format(const replica::logstor::generation_base<T>& gen, FormatContext& ctx) const {
return fmt::formatter<T>::format(gen.value(), ctx);
}
};
namespace ser {
template <std::unsigned_integral T>
struct serializer<replica::logstor::generation_base<T>> {
template <typename Output>
static void write(Output& out, const replica::logstor::generation_base<T>& g) {
serializer<typename replica::logstor::generation_base<T>::underlying>::write(out, g.value());
}
template <typename Input>
static replica::logstor::generation_base<T> read(Input& in) {
auto val = serializer<typename replica::logstor::generation_base<T>::underlying>::read(in);
return replica::logstor::generation_base<T>(val);
}
template <typename Input>
static void skip(Input& in) {
serializer<typename replica::logstor::generation_base<T>::underlying>::skip(in);
}
};
}

View File

@@ -164,11 +164,12 @@ void write_buffer::finalize(size_t alignment) {
pad_to_alignment(alignment);
}
void write_buffer::write_header(segment_generation seg_gen, std::optional<table_id> table) {
void write_buffer::write_header(segment_sequence segment_seq, std::optional<table_id> table) {
_buffer_header.magic = buffer_header_magic;
_buffer_header.seg_gen = seg_gen;
_buffer_header.kind = _segment_kind;
_buffer_header.version = current_version;
_buffer_header.reserved = 0;
_buffer_header.segment_seq = segment_seq;
_buffer_header.crc = _buffer_header.calculate_crc();
@@ -185,25 +186,16 @@ void write_buffer::write_header(segment_generation seg_gen, std::optional<table_
}
}
void write_buffer::write_empty_header(ostream& out, segment_generation seg_gen) {
buffer_header hdr;
hdr.magic = buffer_header_magic;
hdr.data_size = 0;
hdr.seg_gen = seg_gen;
hdr.kind = segment_kind::mixed;
hdr.version = current_version;
hdr.crc = hdr.calculate_crc();
ser::serialize<buffer_header>(out, hdr);
}
bool write_buffer::validate_header(const write_buffer::buffer_header& bh) {
if (bh.magic != write_buffer::buffer_header_magic) {
return false;
}
if (bh.calculate_crc() != bh.crc) {
switch (bh.kind) {
case segment_kind::mixed:
case segment_kind::full:
break;
default:
return false;
}
@@ -211,6 +203,10 @@ bool write_buffer::validate_header(const write_buffer::buffer_header& bh) {
return false;
}
if (bh.calculate_crc() != bh.crc) {
return false;
}
return true;
}
@@ -248,10 +244,11 @@ size_t write_buffer::estimate_required_segments(size_t net_data_size, size_t rec
uint32_t write_buffer::buffer_header::calculate_crc() const {
utils::crc32 c;
c.process_le(magic);
c.process_le(data_size);
c.process_le(seg_gen.value());
c.process_le(static_cast<uint8_t>(kind));
c.process_le(version);
c.process_le(reserved);
c.process_le(segment_seq.value);
c.process_le(data_size);
return c.get();
}

View File

@@ -114,19 +114,22 @@ public:
struct buffer_header {
uint32_t magic;
uint32_t data_size; // size of all records data following the buffer_header
segment_generation seg_gen;
segment_kind kind;
uint8_t version;
uint16_t reserved;
segment_sequence segment_seq;
uint32_t data_size; // size of all records data following the header(s)
uint32_t crc;
uint32_t calculate_crc() const;
};
static constexpr size_t buffer_header_size =
2 * sizeof(uint32_t)
+ sizeof(segment_generation::underlying)
sizeof(uint32_t)
+ sizeof(std::underlying_type_t<segment_kind>)
+ sizeof(uint8_t)
+ sizeof(uint16_t)
+ sizeof(segment_sequence)
+ sizeof(uint32_t)
+ sizeof(uint32_t);
static_assert(buffer_header_size % record_alignment == 0, "Buffer header size must be aligned by record_alignment");
@@ -234,8 +237,6 @@ public:
return s;
}
static void write_empty_header(ostream& out, segment_generation seg_gen);
static bool validate_header(const buffer_header& bh);
private:
@@ -243,7 +244,7 @@ private:
const char* data() const noexcept { return _buffer.get(); }
// table is set for segment_kind::full
void write_header(segment_generation seg_gen, std::optional<table_id> table);
void write_header(segment_sequence segment_seq, std::optional<table_id> table);
// get all write records in the buffer.
// with_record_copy must be to true when creating the write_buffer.
@@ -329,30 +330,33 @@ struct serializer<replica::logstor::write_buffer::buffer_header> {
template <typename Output>
static void write(Output& out, const replica::logstor::write_buffer::buffer_header& h) {
serializer<uint32_t>::write(out, h.magic);
serializer<uint32_t>::write(out, h.data_size);
serializer<replica::logstor::segment_generation>::write(out, h.seg_gen);
serializer<uint8_t>::write(out, static_cast<uint8_t>(h.kind));
serializer<uint8_t>::write(out, h.version);
serializer<uint16_t>::write(out, h.reserved);
serializer<uint64_t>::write(out, h.segment_seq.value);
serializer<uint32_t>::write(out, h.data_size);
serializer<uint32_t>::write(out, h.crc);
}
template <typename Input>
static replica::logstor::write_buffer::buffer_header read(Input& in) {
replica::logstor::write_buffer::buffer_header h;
h.magic = serializer<uint32_t>::read(in);
h.data_size = serializer<uint32_t>::read(in);
h.seg_gen = serializer<replica::logstor::segment_generation>::read(in);
h.kind = static_cast<replica::logstor::segment_kind>(serializer<uint8_t>::read(in));
h.version = serializer<uint8_t>::read(in);
h.reserved = serializer<uint16_t>::read(in);
h.segment_seq = replica::logstor::segment_sequence{serializer<uint64_t>::read(in)};
h.data_size = serializer<uint32_t>::read(in);
h.crc = serializer<uint32_t>::read(in);
return h;
}
template <typename Input>
static void skip(Input& in) {
serializer<uint32_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint16_t>::skip(in);
serializer<uint64_t>::skip(in);
serializer<uint32_t>::skip(in);
serializer<replica::logstor::segment_generation>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint8_t>::skip(in);
serializer<uint32_t>::skip(in);
}
};

View File

@@ -1113,7 +1113,7 @@ future<> compaction_group::discard_logstor_segments() {
co_await sm.discard_segments(*_logstor_segments);
}
future<> compaction_group::flush_separator(std::optional<size_t> seq_num) {
future<> compaction_group::flush_separator(std::optional<logstor::segment_sequence> seq_num) {
auto units = co_await get_units(_separator_flush_sem, 1);
auto pending = std::exchange(_separator_flushes, {});
if (_logstor_separator && (!seq_num || _logstor_separator->min_seq_num < *seq_num)) {
@@ -2611,7 +2611,7 @@ void table::try_trigger_compaction(compaction_group& cg) noexcept {
}
}
future<> table::flush_separator(std::optional<size_t> seq_num) {
future<> table::flush_separator(std::optional<logstor::segment_sequence> seq_num) {
if (!uses_logstor()) {
co_return;
}

View File

@@ -86,6 +86,20 @@ async def test_basic_write_and_read(manager: ManagerClient):
assert rows[0].pk == 2
assert rows[0].v == 150
await cql.run_async(f"DELETE FROM {ks}.test_int WHERE pk = 1")
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 1")
assert len(rows) == 0
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 2")
assert rows[0].pk == 2
assert rows[0].v == 150
# test conflict resolution by timestamp
await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000")
await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900")
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 3")
assert rows[0].pk == 3
assert rows[0].v == 300
# test frozen map value
await cql.run_async(f"CREATE TABLE {ks}.test_map (pk int PRIMARY KEY, v frozen<map<text, text>>) WITH storage_engine = 'logstor'")
@@ -100,6 +114,10 @@ async def test_basic_write_and_read(manager: ManagerClient):
assert rows[0].pk == 1
assert rows[0].v == {'a': 'apple', 'b': 'banana', 'c': 'cherry'}
await cql.run_async(f"DELETE FROM {ks}.test_map WHERE pk = 1")
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_map WHERE pk = 1")
assert len(rows) == 0
@pytest.mark.asyncio
async def test_range_read(manager: ManagerClient):
cmdline = ['--logger-log-level', 'logstor=debug']