diff --git a/docs/dev/logstor.md b/docs/dev/logstor.md index a26811c32a..c543d0b46d 100644 --- a/docs/dev/logstor.md +++ b/docs/dev/logstor.md @@ -205,7 +205,7 @@ zero_padding -- to align to record_alignment (8 bytes) The `header_size` bytes immediately following the record header are the IDL-serialized form of `log_record_header`, which contains: - `key`: the partition key (`primary_index_key`), including a `decorated_key` with a token and partition key bytes. -- `generation`: a 16-bit write generation number, used during recovery to resolve conflicts when the same key appears in multiple segments. +- `timestamp`: the timestamp of the record, used to resolve conflicts by keeping the record with the latest timestamp. - `table`: UUID of the table this record belongs to. **Mutation Data**: diff --git a/idl/logstor.idl.hh b/idl/logstor.idl.hh index ac8642f3b0..48d45ffc68 100644 --- a/idl/logstor.idl.hh +++ b/idl/logstor.idl.hh @@ -8,6 +8,7 @@ #include "idl/frozen_schema.idl.hh" #include "idl/token.idl.hh" +#include "mutation/timestamp.hh" namespace replica { namespace logstor { @@ -18,7 +19,7 @@ struct primary_index_key { class log_record_header { replica::logstor::primary_index_key key; - replica::logstor::record_generation generation; + api::timestamp_type timestamp; table_id table; }; diff --git a/replica/logstor/index.hh b/replica/logstor/index.hh index 7c3d4e69a7..d24989db57 100644 --- a/replica/logstor/index.hh +++ b/replica/logstor/index.hh @@ -14,6 +14,7 @@ #include "utils/bptree.hh" #include "utils/double-decker.hh" #include "utils/phased_barrier.hh" +#include namespace replica::logstor { @@ -100,20 +101,6 @@ public: } } - std::optional exchange(const primary_index_key& key, index_entry new_entry) { - partitions_type::bound_hint hint; - auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); - if (hint.match) { - auto old_entry = i->_e; - i->_e = std::move(new_entry); - return old_entry; - } else { - _partitions.emplace_before(i, key.dk.token().raw(), hint, key.dk, std::move(new_entry)); - ++_key_count; - return std::nullopt; - } - } - bool update_record_location(const primary_index_key& key, log_location old_location, log_location new_location) { auto it = _partitions.find(key.dk, dht::ring_position_comparator(*_schema)); if (it != _partitions.end()) { @@ -125,11 +112,17 @@ public: return false; } - std::pair> insert_if_newer(const primary_index_key& key, index_entry new_entry, bool prefer_on_tie) { + using entry_cmp_fn = std::function; + + static std::strong_ordering default_entry_cmp(const index_entry& a, const index_entry& b) noexcept { + return a.timestamp <=> b.timestamp; + } + + std::pair> insert(const primary_index_key& key, index_entry new_entry, entry_cmp_fn cmp = default_entry_cmp) { partitions_type::bound_hint hint; auto i = _partitions.lower_bound(key.dk, dht::ring_position_comparator(*_schema), hint); if (hint.match) { - if (i->_e.generation < new_entry.generation || (i->_e.generation == new_entry.generation && prefer_on_tie)) { + if (cmp(i->_e, new_entry) <= 0) { auto old_entry = i->_e; i->_e = std::move(new_entry); return {true, std::make_optional(old_entry)}; diff --git a/replica/logstor/logstor.cc b/replica/logstor/logstor.cc index 180c61434c..51164bacc9 100644 --- a/replica/logstor/logstor.cc +++ b/replica/logstor/logstor.cc @@ -21,6 +21,25 @@ namespace replica::logstor { seastar::logger logstor_logger("logstor"); +static api::timestamp_type extract_logstor_record_timestamp(const mutation& m) { + const auto& partition = m.partition(); + + for (const auto& row_entry : partition.clustered_rows()) { + if (row_entry.dummy()) { + continue; + } + if (!row_entry.row().marker().is_missing()) { + return row_entry.row().marker().timestamp(); + } + } + + if (const auto partition_tombstone = partition.partition_tombstone(); partition_tombstone) { + return partition_tombstone.timestamp; + } + + throw std::runtime_error("logstor mutation has no row marker or partition tombstone timestamp"); +} + logstor::logstor(logstor_config config) : _segment_manager(config.segment_manager_cfg) , _write_buffer(_segment_manager, config.flush_sg) { @@ -73,33 +92,32 @@ future<> logstor::write(const mutation& m, compaction_group& cg, seastar::gate:: table_id table = m.schema()->id(); auto& index = cg.get_logstor_index(); - // TODO ? - record_generation gen = index.get(key) - .transform([](const index_entry& entry) { - return entry.generation + 1; - }).value_or(record_generation(1)); + const auto ts = extract_logstor_record_timestamp(m); log_record record { .header = { .key = key, - .generation = gen, + .timestamp = ts, .table = table, }, .mut = canonical_mutation(m) }; - return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, gen, key = std::move(key)] + return _write_buffer.write(std::move(record), &cg, std::move(cg_holder)).then_unpack([this, &index, ts, key = std::move(key)] (log_location location, seastar::gate::holder op) { index_entry new_entry { .location = location, - .generation = gen, + .timestamp = ts, }; - auto old_entry = index.exchange(key, std::move(new_entry)); + auto [inserted, prev_entry] = index.insert(key, std::move(new_entry)); - // If overwriting, free old record - if (old_entry) { - _segment_manager.free_record(old_entry->location); + if (!inserted) { + // A newer entry already exists; free the record we just wrote. + _segment_manager.free_record(location); + } else if (prev_entry) { + // Overwrote an older entry; free it. + _segment_manager.free_record(prev_entry->location); } }).handle_exception([] (std::exception_ptr ep) { logstor_logger.error("Error writing mutation: {}", ep); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index 4b2995710a..e161d12615 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -1956,22 +1956,17 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen auto& desc = get_segment_descriptor(segment_id); desc.reset(_cfg.segment_size); - auto seg_hdr = co_await read_segment_header(segment_id); - if (!seg_hdr) { - logstor_logger.trace("Segment {} has invalid header, skipping", segment_id); - co_return; - } - desc.seg_gen = seg_hdr->seg_gen; - bool is_full_segment = seg_hdr->kind == segment_kind::full; - logstor_logger.trace("Recovering segment {} with generation {}", segment_id, desc.seg_gen); - - co_await for_each_record(segment_id, - [this, &desc, &db, is_full_segment] (log_location loc, const log_record_header& header) -> want_data { - logstor_logger.trace("Recovery: read record at {} gen {}", loc, header.generation); + co_await scan_segment(segment_id, + [segment_id] (const segment_header& seg_hdr) { + logstor_logger.trace("Recovering segment {} with generation {}", segment_id, seg_hdr.seg_gen); + return make_ready_future<>(); + }, + [this, &desc, &db] (log_location loc, const log_record_header& header) -> want_data { + logstor_logger.trace("Recovery: read record at {} key {} ts {}", loc, header.key, header.timestamp); index_entry new_entry { .location = loc, - .generation = header.generation + .timestamp = header.timestamp }; try { @@ -1979,7 +1974,7 @@ future<> segment_manager_impl::recover_segment(replica::database& db, log_segmen if (!t.uses_logstor()) { return want_data::no; } - auto [inserted, prev_entry] = t.logstor_index().insert_if_newer(header.key, new_entry, is_full_segment); + auto [inserted, prev_entry] = t.logstor_index().insert(header.key, new_entry); if (inserted) { desc.on_write(loc); if (prev_entry) { diff --git a/replica/logstor/types.hh b/replica/logstor/types.hh index 7f7b3a2c65..ffe202c4ae 100644 --- a/replica/logstor/types.hh +++ b/replica/logstor/types.hh @@ -9,9 +9,10 @@ #include #include -#include "mutation/canonical_mutation.hh" #include "replica/logstor/utils.hh" #include "dht/decorated_key.hh" +#include "mutation/canonical_mutation.hh" +#include "mutation/timestamp.hh" namespace replica::logstor { @@ -34,19 +35,18 @@ struct primary_index_key { dht::decorated_key dk; }; -using record_generation = generation_base; using segment_generation = generation_base; struct index_entry { log_location location; - record_generation generation; + api::timestamp_type timestamp; bool operator==(const index_entry& other) const noexcept = default; }; struct log_record_header { primary_index_key key; - record_generation generation; + api::timestamp_type timestamp; table_id table; }; diff --git a/test/cluster/test_logstor.py b/test/cluster/test_logstor.py index 4f3d48a748..1dba454999 100644 --- a/test/cluster/test_logstor.py +++ b/test/cluster/test_logstor.py @@ -86,6 +86,13 @@ async def test_basic_write_and_read(manager: ManagerClient): assert rows[0].pk == 2 assert rows[0].v == 150 + # test conflict resolution by timestamp + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 300) USING TIMESTAMP 1000") + await cql.run_async(f"INSERT INTO {ks}.test_int (pk, v) VALUES (3, 200) USING TIMESTAMP 900") + rows = await cql.run_async(f"SELECT pk, v FROM {ks}.test_int WHERE pk = 3") + assert rows[0].pk == 3 + assert rows[0].v == 300 + # test frozen map value await cql.run_async(f"CREATE TABLE {ks}.test_map (pk int PRIMARY KEY, v frozen>) WITH storage_engine = 'logstor'")