/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include "replica/database_fwd.hh" #include "dht/decorated_key.hh" #include "dht/ring_position.hh" #include "schema/schema_fwd.hh" #include "encoding_stats.hh" #include "dirty_memory_manager.hh" #include "db/commitlog/replay_position.hh" #include "db/commitlog/rp_set.hh" #include "utils/extremum_tracking.hh" #include "mutation/mutation_cleaner.hh" #include "utils/double-decker.hh" #include "readers/empty.hh" #include "readers/mutation_source.hh" class frozen_mutation; class row_cache; namespace bi = boost::intrusive; namespace replica { class memtable_entry { dht::decorated_key _key; partition_entry _pe; struct { bool _head : 1; bool _tail : 1; bool _train : 1; } _flags{}; public: bool is_head() const noexcept { return _flags._head; } void set_head(bool v) noexcept { _flags._head = v; } bool is_tail() const noexcept { return _flags._tail; } void set_tail(bool v) noexcept { _flags._tail = v; } bool with_train() const noexcept { return _flags._train; } void set_train(bool v) noexcept { _flags._train = v; } friend class memtable; memtable_entry(schema_ptr s, dht::decorated_key key, mutation_partition p) : _key(std::move(key)) , _pe(*s, std::move(p)) { } memtable_entry(memtable_entry&& o) noexcept; // Frees elements of the entry in batches. // Returns stop_iteration::yes iff there are no more elements to free. stop_iteration clear_gently() noexcept; const dht::decorated_key& key() const { return _key; } dht::decorated_key& key() { return _key; } const partition_entry& partition() const { return _pe; } partition_entry& partition() { return _pe; } const schema_ptr& schema() const { return _pe.get_schema(); } partition_snapshot_ptr snapshot(memtable& mtbl); // Makes the entry conform to given schema. // Must be called under allocating section of the region which owns the entry. void upgrade_schema(logalloc::region&, const schema_ptr&, mutation_cleaner&); size_t external_memory_usage_without_rows() const { return _key.key().external_memory_usage(); } size_t object_memory_size(allocation_strategy& allocator); size_t size_in_allocator_without_rows(allocation_strategy& allocator) { return object_memory_size(allocator) + external_memory_usage_without_rows(); } size_t size_in_allocator(allocation_strategy& allocator) { auto size = size_in_allocator_without_rows(allocator); for (auto&& v : _pe.versions()) { size += v.size_in_allocator(allocator); } return size; } friend dht::ring_position_view ring_position_view_to_compare(const memtable_entry& mt) { return mt._key; } }; } namespace replica { // Statistics that need to be shared across all memtables for a single table struct memtable_table_shared_data { logalloc::allocating_section read_section; logalloc::allocating_section allocating_section; }; class dirty_memory_manager; struct table_stats; // Managed by lw_shared_ptr<>. class memtable final : public enable_lw_shared_from_this , public boost::intrusive::list_base_hook> , private dirty_memory_manager_logalloc::size_tracked_region , public logalloc::region_listener { public: using partitions_type = double_decker; private: dirty_memory_manager& _dirty_mgr; mutation_cleaner _cleaner; memtable_list *_memtable_list; schema_ptr _schema; memtable_table_shared_data& _table_shared_data; partitions_type partitions; size_t nr_partitions = 0; db::replay_position _replay_position; db::rp_set _rp_set; // mutation source to which reads fall-back after mark_flushed() // so that memtable contents can be moved away while there are // still active readers. This is needed for this mutation_source // to be monotonic (not loose writes). Monotonicity of each // mutation_source is necessary for the combined mutation source to be // monotonic. That combined source in this case is cache + memtable. mutation_source_opt _underlying; bool _merging_into_cache = false; // Tracks the difference between the amount of memory "spooled" during the flush // and the memory freed during the flush. // // If positive, this is equal to the difference between the amount of "spooled" memory // registered in dirty_memory_manager with account_potentially_cleaned_up_memory // and unregistered with revert_potentially_cleaned_up_memory. // If negative, the above difference is 0. int64_t _flushed_memory = 0; // For most of the time, this is equal to occupancy().total_memory. // But we want to know the current memory usage in our logalloc::region_listener // handlers, and at that point in time occupancy() is undefined. (LSA can choose to // update it before or after the handler). So we track it ourselves, based on the deltas // passed to the handlers. uint64_t _total_memory = 0; // During LSA compaction, _total_memory can fluctuate up and down. // But we are only interested in the maximal total decrease since the beginning of flush. // This tracks the lowest value of _total_memory seen during the flush. uint64_t _total_memory_low_watermark_during_flush = 0; bool _merged_into_cache = false; replica::table_stats& _table_stats; class memtable_encoding_stats_collector : public encoding_stats_collector { private: min_max_tracker min_max_timestamp; min_tracker min_live_timestamp; min_tracker min_live_row_marker_timestamp; void update_timestamp(api::timestamp_type ts, is_live is_live) noexcept { if (ts == api::missing_timestamp) { return; } encoding_stats_collector::update_timestamp(ts); min_max_timestamp.update(ts); if (is_live) { min_live_timestamp.update(ts); } } void update_live_row_marker_timestamp(api::timestamp_type ts) noexcept { min_live_row_marker_timestamp.update(ts); } public: memtable_encoding_stats_collector() noexcept; void update(atomic_cell_view cell) noexcept; void update(tombstone tomb) noexcept; void update(const ::schema& s, const row& r, column_kind kind); void update(const range_tombstone& rt) noexcept; void update(const row_marker& marker) noexcept; void update(const ::schema& s, const deletable_row& dr); void update(const ::schema& s, const mutation_partition& mp); api::timestamp_type get_min_timestamp() const noexcept { return min_max_timestamp.min(); } api::timestamp_type get_max_timestamp() const noexcept { return min_max_timestamp.max(); } api::timestamp_type get_min_live_timestamp() const noexcept { return min_live_timestamp.get(); } api::timestamp_type get_min_live_row_marker_timestamp() const noexcept { return min_live_row_marker_timestamp.get(); } } _stats_collector; std::optional _tombstone_gc_snapshot; void update(db::rp_handle&&); friend class ::row_cache; friend class memtable_entry; friend class flush_reader; friend class flush_memory_accounter; friend class partition_snapshot_read_accounter; private: std::ranges::subrange slice(const dht::partition_range& r) const; partition_entry& find_or_create_partition(const dht::decorated_key& key); partition_entry& find_or_create_partition_slow(partition_key_view key); void upgrade_entry(memtable_entry&); void add_flushed_memory(uint64_t); void remove_flushed_memory(uint64_t); void clear() noexcept; public: explicit memtable(schema_ptr schema, dirty_memory_manager&, memtable_table_shared_data& shared_data, replica::table_stats& table_stats, memtable_list *memtable_list = nullptr, seastar::scheduling_group compaction_scheduling_group = seastar::current_scheduling_group(), shared_tombstone_gc_state* shared_gc_state = nullptr); // Used for testing that want to control the flush process. explicit memtable(schema_ptr schema); ~memtable(); // Clears this memtable gradually without consuming the whole CPU. // Never resolves with a failed future. future<> clear_gently() noexcept; schema_ptr schema() const noexcept { return _schema; } void set_schema(schema_ptr) noexcept; future<> apply(memtable&, reader_permit); // Applies mutation to this memtable. // The mutation is upgraded to current schema. void apply(const mutation& m, db::rp_handle&& = {}); // The mutation is upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {}); void evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept; static memtable& from_region(logalloc::region& r) noexcept { return static_cast(r); } const logalloc::region& region() const noexcept { return *this; } logalloc::region& region() noexcept { return *this; } encoding_stats get_encoding_stats() const noexcept { return _stats_collector.get(); } api::timestamp_type get_min_timestamp() const noexcept { return _stats_collector.get_min_timestamp(); } api::timestamp_type get_max_timestamp() const noexcept { return _stats_collector.get_max_timestamp(); } api::timestamp_type get_min_live_timestamp() const noexcept { return _stats_collector.get_min_live_timestamp(); } api::timestamp_type get_min_live_row_marker_timestamp() const noexcept { return _stats_collector.get_min_live_row_marker_timestamp(); } mutation_cleaner& cleaner() noexcept { return _cleaner; } bool contains_partition(const dht::decorated_key& key) const; public: memtable_list* get_memtable_list() noexcept { return _memtable_list; } size_t partition_count() const noexcept { return nr_partitions; } logalloc::occupancy_stats occupancy() const noexcept; // Creates a reader of data in this memtable for given partition range. // // Live readers share ownership of the memtable instance, so caller // doesn't need to ensure that memtable remains live. // // The 'range' parameter must be live as long as the reader is being used // // Mutations returned by the reader will all have given schema. mutation_reader make_mutation_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state_ptr = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) { if (auto reader_opt = make_mutation_reader_opt(s, permit, range, slice, std::move(trace_state_ptr), fwd, fwd_mr)) { return std::move(*reader_opt); } [[unlikely]] return make_empty_mutation_reader(std::move(s), std::move(permit)); } // Same as make_mutation_reader, but returns an empty optional instead of a no-op reader when there is nothing to // read. This is an optimization. mutation_reader_opt make_mutation_reader_opt(schema_ptr query_schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state_ptr = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes); mutation_reader make_mutation_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range = query::full_partition_range) { auto& full_slice = s->full_slice(); return make_mutation_reader(s, std::move(permit), range, full_slice); } mutation_reader make_flush_reader(schema_ptr, reader_permit permit); mutation_source as_data_source(); bool empty() const noexcept { return partitions.empty(); } void mark_flushed(mutation_source) noexcept; bool is_merging_to_cache() const noexcept; bool is_flushed() const noexcept; void on_detach_from_region_group() noexcept; void revert_flushed_memory() noexcept; const db::replay_position& replay_position() const noexcept { return _replay_position; } /** * Returns the current rp_set, and resets the * stored one to empty. Only used for flushing * purposes, to one-shot report discarded rp:s * to commitlog */ db::rp_set get_and_discard_rp_set() noexcept { return std::exchange(_rp_set, {}); } friend class iterator_reader; dirty_memory_manager& get_dirty_memory_manager() noexcept { return _dirty_mgr; } const tombstone_gc_state_snapshot* get_tombstone_gc_state_snapshot() const noexcept { return _tombstone_gc_snapshot ? &_tombstone_gc_snapshot.value() : nullptr; } // Implementation of region_listener. virtual void increase_usage(logalloc::region* r, ssize_t delta) override; virtual void decrease_evictable_usage(logalloc::region* r) override; virtual void decrease_usage(logalloc::region* r, ssize_t delta) override; virtual void add(logalloc::region* r) override; virtual void del(logalloc::region* r) override; virtual void moved(logalloc::region* old_address, logalloc::region* new_address) override; friend fmt::formatter; }; } template <> struct fmt::formatter { constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } auto format(const replica::memtable_entry&, fmt::format_context& ctx) const -> decltype(ctx.out()); }; template <> struct fmt::formatter { constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } auto format(replica::memtable&, fmt::format_context& ctx) const -> decltype(ctx.out()); };