logstor: compare records by timestamp

Add the record timestamp. The timestamp is extracted from the row marker
of the mutation when we write it.

When inserting a record to index, we compare it with the existing
record, and insert it only if it has newer timestamp.
This commit is contained in:
Michael Litvak
2026-05-14 20:39:40 +02:00
parent 61e5ec9888
commit c18d616f64
7 changed files with 62 additions and 48 deletions

View File

@@ -205,7 +205,7 @@ zero_padding -- to align to record_alignment (8 bytes)
The `header_size` bytes immediately following the record header are the IDL-serialized form of `log_record_header`, which contains:
- `key`: the partition key (`primary_index_key`), including a `decorated_key` with a token and partition key bytes.
- `generation`: a 16-bit write generation number, used during recovery to resolve conflicts when the same key appears in multiple segments.
- `timestamp`: the timestamp of the record, used to resolve conflicts by keeping the record with the latest timestamp.
- `table`: UUID of the table this record belongs to.
**Mutation Data**:

View File

@@ -8,6 +8,7 @@
#include "idl/frozen_schema.idl.hh"
#include "idl/token.idl.hh"
#include "mutation/timestamp.hh"
namespace replica {
namespace logstor {
@@ -18,7 +19,7 @@ struct primary_index_key {
class log_record_header {
replica::logstor::primary_index_key key;
replica::logstor::record_generation generation;
api::timestamp_type timestamp;
table_id table;
};

View File

@@ -14,6 +14,7 @@
#include "utils/bptree.hh"
#include "utils/double-decker.hh"
#include "utils/phased_barrier.hh"
#include <utility>
namespace replica::logstor {
@@ -100,20 +101,6 @@ public:
}
}
std::optional<index_entry> exchange(const primary_index_key& key, index_entry new_entry) {
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint);
if (hint.match) {
auto old_entry = i->_e;
i->_e = std::move(new_entry);
return old_entry;
} else {
_partitions.emplace_before(i, key.dk.token().raw(), hint, key.dk, std::move(new_entry));
++_key_count;
return std::nullopt;
}
}
bool update_record_location(const primary_index_key& key, log_location old_location, log_location new_location) {
auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema));
if (it != _partitions.end()) {
@@ -125,11 +112,17 @@ public:
return false;
}
std::pair<bool, std::optional<index_entry>> insert_if_newer(const primary_index_key& key, index_entry new_entry, bool prefer_on_tie) {
using entry_cmp_fn = std::function<std::strong_ordering(const index_entry&, const index_entry&)>;
static std::strong_ordering default_entry_cmp(const index_entry& a, const index_entry& b) noexcept {
return a.timestamp <=> b.timestamp;
}
std::pair<bool, std::optional<index_entry>> insert(const primary_index_key& key, index_entry new_entry, entry_cmp_fn cmp = default_entry_cmp) {
partitions_type::bound_hint hint;
auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint);
if (hint.match) {
if (i->_e.generation < new_entry.generation || (i->_e.generation == new_entry.generation && prefer_on_tie)) {
if (cmp(i->_e, new_entry) <= 0) {
auto old_entry = i->_e;
i->_e = std::move(new_entry);
return {true, std::make_optional(old_entry)};

View File

@@ -21,6 +21,25 @@ namespace replica::logstor {
seastar::logger logstor_logger("logstor");
static api::timestamp_type extract_logstor_record_timestamp(const mutation& m) {
const auto& partition = m.partition();
for (const auto& row_entry : partition.clustered_rows()) {
if (row_entry.dummy()) {
continue;
}
if (!row_entry.row().marker().is_missing()) {
return row_entry.row().marker().timestamp();
}
}
if (const auto partition_tombstone = partition.partition_tombstone(); partition_tombstone) {
return partition_tombstone.timestamp;
}
throw std::runtime_error("logstor mutation has no row marker or partition tombstone timestamp");
}
logstor::logstor(logstor_config config)
: _segment_manager(config.segment_manager_cfg)
, _write_buffer(_segment_manager, config.flush_sg) {
@@ -73,33 +92,32 @@ future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate::
table_id table = m.schema()->id();
auto& index = cg.get_logstor_index();
// TODO ?
record_generation gen = index.get(key)
.transform([](const index_entry& entry) {
return entry.generation + 1;
}).value_or(record_generation(1));
const auto ts = extract_logstor_record_timestamp(m);
log_record record {
.header = {
.key = key,
.generation = gen,
.timestamp = ts,
.table = table,
},
.mut = canonical_mutation(m)
};
return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, gen, key = std::move(key)]
return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, ts, key = std::move(key)]
(log_location location, seastar::gate::holder op) {
index_entry new_entry {
.location = location,
.generation = gen,
.timestamp = ts,
};
auto old_entry = index.exchange(key, std::move(new_entry));
auto [inserted, prev_entry] = index.insert(key, std::move(new_entry));
// If overwriting, free old record
if (old_entry) {
_segment_manager.free_record(old_entry->location);
if (!inserted) {
// A newer entry already exists; free the record we just wrote.
_segment_manager.free_record(location);
} else if (prev_entry) {
// Overwrote an older entry; free it.
_segment_manager.free_record(prev_entry->location);
}
}).handle_exception([] (std::exception_ptr ep) {
logstor_logger.error("Error writing mutation: {}", ep);

View File

@@ -1956,22 +1956,17 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen
auto& desc = get_segment_descriptor(segment_id);
desc.reset(_cfg.segment_size);
auto seg_hdr = co_await read_segment_header(segment_id);
if (!seg_hdr) {
logstor_logger.trace("Segment {} has invalid header, skipping", segment_id);
co_return;
}
desc.seg_gen = seg_hdr->seg_gen;
bool is_full_segment = seg_hdr->kind == segment_kind::full;
logstor_logger.trace("Recovering segment {} with generation {}", segment_id, desc.seg_gen);
co_await for_each_record(segment_id,
[this, &desc, &db, is_full_segment] (log_location loc, const log_record_header& header) -> want_data {
logstor_logger.trace("Recovery: read record at {} gen {}", loc, header.generation);
co_await scan_segment(segment_id,
[segment_id] (const segment_header& seg_hdr) {
logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen);
return make_ready_future<>();
},
[this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data {
logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp);
index_entry new_entry {
.location = loc,
.generation = header.generation
.timestamp = header.timestamp
};
try {
@@ -1979,7 +1974,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen
if (!t.uses_logstor()) {
return want_data::no;
}
auto [inserted, prev_entry] = t.logstor_index().insert_if_newer(header.key, new_entry, is_full_segment);
auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry);
if (inserted) {
desc.on_write(loc);
if (prev_entry) {

View File

@@ -9,9 +9,10 @@
#include <cstdint>
#include <fmt/format.h>
#include "mutation/canonical_mutation.hh"
#include "replica/logstor/utils.hh"
#include "dht/decorated_key.hh"
#include "mutation/canonical_mutation.hh"
#include "mutation/timestamp.hh"
namespace replica::logstor {
@@ -34,19 +35,18 @@ struct primary_index_key {
dht::decorated_key dk;
};
using record_generation = generation_base<uint16_t>;
using segment_generation = generation_base<uint16_t>;
struct index_entry {
log_location location;
record_generation generation;
api::timestamp_type timestamp;
bool operator==(const index_entry& other) const noexcept = default;
};
struct log_record_header {
primary_index_key key;
record_generation generation;
api::timestamp_type timestamp;
table_id table;
};

View File

@@ -86,6 +86,13 @@ async def test_basic_write_and_read(manager: ManagerClient):
assert rows[0].pk == 2
assert rows[0].v == 150
# test conflict resolution by timestamp
await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000")
await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900")
rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 3")
assert rows[0].pk == 3
assert rows[0].v == 300
# test frozen map value
await cql.run_async(f"CREATE TABLE {ks}.test_map (pk int PRIMARY KEY, v frozen<map<text, text>>) WITH storage_engine = 'logstor'")