diff --git a/replica/database.cc b/replica/database.cc index df607b8ad2..c5d2ae4c57 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1771,6 +1771,38 @@ future database::do_apply_counter_update(column_family& cf, const froz co_return m; } +api::timestamp_type memtable_list::min_live_timestamp(const dht::decorated_key& dk, is_shadowable is, api::timestamp_type max_seen_timestamp) const noexcept { + const auto get_min_ts = [is] (const memtable& mt) { + // see get_max_purgeable_timestamp() in compaction.cc for comments on choosing min timestamp + return is ? mt.get_min_live_row_marker_timestamp() : mt.get_min_live_timestamp(); + }; + + auto min_live_ts = api::max_timestamp; + + for (const auto& mt : _memtables) { + const auto mt_min_live_ts = get_min_ts(*mt); + if (mt_min_live_ts > max_seen_timestamp) { + continue; + } + // We cannot do lookups on flushing memtables, they might be in the + // process of merging into cache. Keys already merged will not be seen + // by the lookup. + if (!mt->is_merging_to_cache() && !mt->contains_partition(dk)) { + continue; + } + min_live_ts = std::min(min_live_ts, mt_min_live_ts); + } + + for (const auto& mt : _flushed_memtables_with_active_reads) { + // We cannot check if the flushed memtable contains the key as it + // becomes empty after the merge to cache completes, so we only use the + // min ts metadata. + min_live_ts = std::min(min_live_ts, get_min_ts(mt)); + } + + return min_live_ts; +} + future<> memtable_list::flush() { if (!may_flush()) { return make_ready_future<>(); diff --git a/replica/database.hh b/replica/database.hh index a2a4d6a38e..a1ba7f6cc5 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -78,6 +78,8 @@ class compaction_manager; class frozen_mutation; class reconcilable_result; +namespace bi = boost::intrusive; + namespace tracing { class trace_state_ptr; } namespace s3 { struct endpoint_config; } @@ -181,8 +183,13 @@ class global_table_ptr; class memtable_list { public: using seal_immediate_fn_type = std::function (flush_permit&&)>; + using intrusive_memtable_list = bi::list< + memtable, + bi::base_hook>>, + bi::constant_time_size>; private: std::vector _memtables; + intrusive_memtable_list _flushed_memtables_with_active_reads; seal_immediate_fn_type _seal_immediate_fn; std::function _current_schema; replica::dirty_memory_manager* _dirty_memory_manager; @@ -238,6 +245,15 @@ public: return _memtables.back(); } + // Returns the minimum live timestamp. Considers all memtables, even + // those that were flushed and removed with erase(), but an + // in-progress read is still using them. + // Memtables whose min live timestamp > max_seen_timestamp are ignored as we + // consider that their content is more recent than any potential tombstone in + // other mutation sources. + // Returns api::max_timestamp if the key is not in any of the memtables. + api::timestamp_type min_live_timestamp(const dht::decorated_key& dk, is_shadowable is, api::timestamp_type max_seen_timestamp) const noexcept; + // # 8904 - this method is akin to std::set::erase(key_type), not // erase(iterator). Should be tolerant against non-existing. void erase(const shared_memtable& element) noexcept { @@ -245,6 +261,7 @@ public: if (i != _memtables.end()) { _memtables.erase(i); } + _flushed_memtables_with_active_reads.push_back(*element); } // Synchronously swaps the active memtable with a new, empty one, diff --git a/replica/memtable.cc b/replica/memtable.cc index 46e75d5380..07a64cdd42 100644 --- a/replica/memtable.cc +++ b/replica/memtable.cc @@ -283,6 +283,11 @@ memtable::slice(const dht::partition_range& range) const { } class iterator_reader { + // DO NOT RELEASE the memtable! Keep a reference to it, so it stays in + // memtable_list::_flushed_memtables_with_active_reads and so that it keeps + // blocking tombstone GC of tombstone in the cache, which cover data that + // used to be in this memtable, and which will possibly be produced by this + // reader later on. lw_shared_ptr _memtable; schema_ptr _schema; const dht::partition_range* _range; @@ -381,7 +386,6 @@ protected: streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { auto ret = _memtable->_underlying->make_reader_v2(_schema, std::move(permit), delegate, slice, nullptr, fwd, fwd_mr); - _memtable = {}; _last = {}; return ret; } diff --git a/replica/memtable.hh b/replica/memtable.hh index 526385f54c..894ede61ab 100644 --- a/replica/memtable.hh +++ b/replica/memtable.hh @@ -104,7 +104,10 @@ class dirty_memory_manager; struct table_stats; // Managed by lw_shared_ptr<>. -class memtable final : public enable_lw_shared_from_this, private dirty_memory_manager_logalloc::size_tracked_region { +class memtable final + : public enable_lw_shared_from_this + , public boost::intrusive::list_base_hook> + , private dirty_memory_manager_logalloc::size_tracked_region { public: using partitions_type = double_deckermemtables()->active_memtable(); - // see get_max_purgeable_timestamp() in compaction.cc for comments on choosing min timestamp - api::timestamp_type memtable_min_timestamp = is_shadowable ? mt.get_min_live_row_marker_timestamp() : mt.get_min_live_timestamp(); - if (memtable_min_timestamp > cg->max_seen_timestamp()) { - // All the entries in the memtable are newer than the entries in the - // SSTable within this compaction group. So, no need to check further. - return; - } - - // If a memtable with a minimum timestamp lower than the current maximum - // purgeable timestamp has the given key, the tombstone should not be purged. - if (memtable_min_timestamp < max_purgeable_timestamp && mt.contains_partition(dk)) { - max_purgeable_timestamp = memtable_min_timestamp; - } + max_purgeable_timestamp = std::min(cg->memtables()->min_live_timestamp(dk, is_shadowable, cg->max_seen_timestamp()), max_purgeable_timestamp); }); return max_purgeable_timestamp;