/* * Copyright (C) 2014-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "utils/assert.hh" #include "memtable.hh" #include "replica/database.hh" #include "mutation/frozen_mutation.hh" #include "partition_snapshot_reader.hh" #include "partition_builder.hh" #include "mutation/mutation_partition_view.hh" #include "readers/empty.hh" #include "readers/forwardable.hh" #include "sstables/types.hh" namespace replica { static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( bool is_reversed, reader_permit permit, dht::decorated_key dk, query::clustering_key_filter_ranges crr, partition_snapshot_ptr snp, bool digest_requested, logalloc::region& region, logalloc::allocating_section& read_section, std::any pointer_to_container, streamed_mutation::forwarding fwd, memtable& memtable); memtable::memtable_encoding_stats_collector::memtable_encoding_stats_collector() noexcept : min_max_timestamp(0, 0) , min_live_timestamp(api::max_timestamp) , min_live_row_marker_timestamp(api::max_timestamp) {} void memtable::memtable_encoding_stats_collector::update(atomic_cell_view cell) noexcept { is_live is_live = ::is_live(cell.is_live()); update_timestamp(cell.timestamp(), is_live); if (cell.is_live_and_has_ttl()) { update_ttl(cell.ttl()); update_local_deletion_time(cell.expiry()); } else if (!is_live) { update_local_deletion_time(cell.deletion_time()); } } void memtable::memtable_encoding_stats_collector::update(tombstone tomb) noexcept { if (tomb) { update_timestamp(tomb.timestamp, is_live::no); update_local_deletion_time(tomb.deletion_time); } } void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const row& r, column_kind kind) { r.for_each_cell([this, &s, kind](column_id id, const atomic_cell_or_collection& item) { auto& col = s.column_at(kind, id); if (col.is_atomic()) { update(item.as_atomic_cell(col)); } else { item.as_collection_mutation().with_deserialized(*col.type, [&] (collection_mutation_view_description mview) { // Note: when some of the collection cells are dead and some are live // we need to encode a "live" deletion_time for the living ones. // It is not strictly required to update encoding_stats for the latter case // since { .min(), .max() } will not affect the encoding_stats // minimum values. (See #4035) update(mview.tomb); for (auto& entry : mview.cells) { update(entry.second); } }); } }); } void memtable::memtable_encoding_stats_collector::update(const range_tombstone& rt) noexcept { update(rt.tomb); } void memtable::memtable_encoding_stats_collector::update(const row_marker& marker) noexcept { if (marker.is_missing()) { return; } auto timestamp = marker.timestamp(); if (!marker.is_live()) { update_timestamp(timestamp, is_live::no); update_ttl(gc_clock::duration(sstables::expired_liveness_ttl)); update_local_deletion_time(marker.deletion_time()); } else { update_timestamp(timestamp, is_live::yes); update_live_row_marker_timestamp(timestamp); if (marker.is_expiring()) { update_ttl(marker.ttl()); update_local_deletion_time(marker.expiry()); } } } void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const deletable_row& dr) { update(dr.marker()); row_tombstone row_tomb = dr.deleted_at(); update(row_tomb.regular()); update(row_tomb.tomb()); update(s, dr.cells(), column_kind::regular_column); } void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const mutation_partition& mp) { update(mp.partition_tombstone()); update(s, mp.static_row().get(), column_kind::static_column); for (auto&& row_entry : mp.clustered_rows()) { update(s, row_entry.row()); } for (auto&& rt : mp.row_tombstones()) { update(rt.tombstone()); } } memtable::memtable(schema_ptr schema, dirty_memory_manager& dmm, memtable_table_shared_data& table_shared_data, replica::table_stats& table_stats, memtable_list* memtable_list, seastar::scheduling_group compaction_scheduling_group, shared_tombstone_gc_state* shared_gc_state) : dirty_memory_manager_logalloc::size_tracked_region() , _dirty_mgr(dmm) , _cleaner(*this, no_cache_tracker, table_stats.memtable_app_stats, compaction_scheduling_group) , _memtable_list(memtable_list) , _schema(std::move(schema)) , _table_shared_data(table_shared_data) , partitions(dht::raw_token_less_comparator{}) , _table_stats(table_stats) { if (shared_gc_state) { _tombstone_gc_snapshot.emplace(shared_gc_state->snapshot()); } logalloc::region::listen(this); } static thread_local dirty_memory_manager mgr_for_tests; static thread_local replica::table_stats stats_for_tests; static thread_local memtable_table_shared_data memtable_shared_data_for_tests; memtable::memtable(schema_ptr schema) : memtable(std::move(schema), mgr_for_tests, memtable_shared_data_for_tests, stats_for_tests) { } memtable::~memtable() { revert_flushed_memory(); clear(); logalloc::region::unlisten(); } void memtable::evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept { e.partition().evict(cleaner); nr_partitions--; } void memtable::clear() noexcept { with_allocator(allocator(), [this] { partitions.clear_and_dispose([this] (memtable_entry* e) noexcept { evict_entry(*e, _cleaner); }); }); } future<> memtable::clear_gently() noexcept { return futurize_invoke([this] { auto t = std::make_unique([this] { auto& alloc = allocator(); auto p = std::move(partitions); nr_partitions = 0; while (!p.empty()) { with_allocator(alloc, [&] () noexcept { while (!p.empty()) { if (p.begin()->clear_gently() == stop_iteration::no) { break; } p.begin().erase(dht::raw_token_less_comparator{}); if (need_preempt()) { break; } } }); seastar::thread::yield(); } /* * The collection is not guaranteed to free everything * with the last erase. If anything gets freed in destructor, * it will be unaccounted from wrong allocator, so handle it */ with_allocator(alloc, [&p] { p.clear(); }); }); auto f = t->join(); return f.then([t = std::move(t)] {}); }).handle_exception([this] (auto e) { this->clear(); }); } partition_entry& memtable::find_or_create_partition_slow(partition_key_view key) { SCYLLA_ASSERT(!reclaiming_enabled()); // FIXME: Perform lookup using std::pair // to avoid unconditional copy of the partition key. // We can't do it right now because std::map<> which holds // partitions doesn't support heterogeneous lookup. // We could switch to boost::intrusive_map<> similar to what we have for row keys. auto& outer = current_allocator(); return with_allocator(standard_allocator(), [&, this] () -> partition_entry& { auto dk = dht::decorate_key(*_schema, key); return with_allocator(outer, [&dk, this] () -> partition_entry& { return find_or_create_partition(dk); }); }); } partition_entry& memtable::find_or_create_partition(const dht::decorated_key& key) { SCYLLA_ASSERT(!reclaiming_enabled()); // call lower_bound so we have a hint for the insert, just in case. partitions_type::bound_hint hint; auto i = partitions.lower_bound(key, dht::ring_position_comparator(*_schema), hint); if (i == partitions.end() || !hint.match) { partitions_type::iterator entry = partitions.emplace_before(i, key.token().raw(), hint, _schema, dht::decorated_key(key), mutation_partition(*_schema)); ++nr_partitions; ++_table_stats.memtable_partition_insertions; if (!hint.emplace_keeps_iterators()) { current_allocator().invalidate_references(); } return entry->partition(); } else { ++_table_stats.memtable_partition_hits; upgrade_entry(*i); } return i->partition(); } bool memtable::contains_partition(const dht::decorated_key& key) const { return partitions.find(key, dht::ring_position_comparator(*_schema)) != partitions.end(); } std::ranges::subrange memtable::slice(const dht::partition_range& range) const { if (query::is_single_partition(range)) { const query::ring_position& pos = range.start()->value(); auto i = partitions.find(pos, dht::ring_position_comparator(*_schema)); if (i != partitions.end()) { return {i, std::next(i)}; } else { return {i, i}; } } else { auto cmp = dht::ring_position_comparator(*_schema); auto i1 = range.start() ? (range.start()->is_inclusive() ? partitions.lower_bound(range.start()->value(), cmp) : partitions.upper_bound(range.start()->value(), cmp)) : partitions.cbegin(); auto i2 = range.end() ? (range.end()->is_inclusive() ? partitions.upper_bound(range.end()->value(), cmp) : partitions.lower_bound(range.end()->value(), cmp)) : partitions.cend(); return {i1, i2}; } } class iterator_reader { // DO NOT RELEASE the memtable! Keep a reference to it, so it stays in // memtable_list::_flushed_memtables_with_active_reads and so that it keeps // blocking tombstone GC of tombstone in the cache, which cover data that // used to be in this memtable, and which will possibly be produced by this // reader later on. lw_shared_ptr _memtable; schema_ptr _schema; const dht::partition_range* _range; std::optional _last; memtable::partitions_type::iterator _i; memtable::partitions_type::iterator _end; uint64_t _last_reclaim_counter; size_t _last_partition_count = 0; memtable::partitions_type::iterator lookup_end() { auto cmp = dht::ring_position_comparator(*_memtable->_schema); return _range->end() ? (_range->end()->is_inclusive() ? _memtable->partitions.upper_bound(_range->end()->value(), cmp) : _memtable->partitions.lower_bound(_range->end()->value(), cmp)) : _memtable->partitions.end(); } void update_iterators() { // We must be prepared that iterators may get invalidated during compaction. auto current_reclaim_counter = _memtable->reclaim_counter(); auto cmp = dht::ring_position_comparator(*_memtable->_schema); if (_last) { if (current_reclaim_counter != _last_reclaim_counter || _last_partition_count != _memtable->partition_count()) { _i = _memtable->partitions.upper_bound(*_last, cmp); _end = lookup_end(); _last_partition_count = _memtable->partition_count(); } } else { // Initial lookup _i = _range->start() ? (_range->start()->is_inclusive() ? _memtable->partitions.lower_bound(_range->start()->value(), cmp) : _memtable->partitions.upper_bound(_range->start()->value(), cmp)) : _memtable->partitions.begin(); _end = lookup_end(); _last_partition_count = _memtable->partition_count(); } _last_reclaim_counter = current_reclaim_counter; } protected: iterator_reader(schema_ptr s, lw_shared_ptr m, const dht::partition_range& range) : _memtable(std::move(m)) , _schema(std::move(s)) , _range(&range) { } memtable_entry* fetch_entry() { update_iterators(); if (_i == _end) { return nullptr; } else { memtable_entry& e = *_i; _memtable->upgrade_entry(e); return &e; } } void advance_iterator() { ++_i; } void update_last(dht::decorated_key last) { _last = std::move(last); } logalloc::allocating_section& read_section() { return _memtable->_table_shared_data.read_section; } lw_shared_ptr mtbl() { return _memtable; } schema_ptr schema() { return _schema; } logalloc::region& region() { return *_memtable; }; std::optional get_delegate_range() { // We cannot run concurrently with row_cache::update(). if (_memtable->is_flushed()) { return _last ? _range->split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : *_range; } return {}; } mutation_reader delegate_reader(reader_permit permit, const dht::partition_range& delegate, const query::partition_slice& slice, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { auto ret = _memtable->_underlying->make_mutation_reader(_schema, std::move(permit), delegate, slice, nullptr, fwd, fwd_mr); _last = {}; return ret; } future<> fast_forward_to(const dht::partition_range& pr) { _range = ≺ _last = { }; return make_ready_future<>(); } }; class partition_snapshot_read_accounter { memtable& _mt; public: explicit partition_snapshot_read_accounter(memtable& mt): _mt(mt) {} void operator()(const clustering_row& cr) { if (cr.tomb()) { ++_mt._table_stats.memtable_row_tombstone_reads; } } void operator()(const static_row& sr) {} void operator()(const range_tombstone_change& rt) { ++_mt._table_stats.memtable_range_tombstone_reads; } void operator()(const partition_start& ph) {} void operator()(const partition_end& eop) {} }; class scanning_reader final : public mutation_reader::impl, private iterator_reader { std::optional _delegate_range; mutation_reader_opt _delegate; const query::partition_slice& _slice; mutation_reader::forwarding _fwd_mr; struct consumer { scanning_reader* _reader; explicit consumer(scanning_reader* r) : _reader(r) {} stop_iteration operator()(mutation_fragment_v2 mf) { _reader->push_mutation_fragment(std::move(mf)); return stop_iteration(_reader->is_buffer_full()); } }; future<> fill_buffer_from_delegate() { return _delegate->consume_pausable(consumer(this)).then([this] { if (_delegate->is_end_of_stream() && _delegate->is_buffer_empty()) { if (_delegate_range) { _end_of_stream = true; } else { return close_delegate(); } } return make_ready_future<>(); }); } future<> close_delegate() noexcept { return _delegate ? _delegate->close() : make_ready_future<>(); }; public: scanning_reader(schema_ptr s, lw_shared_ptr m, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) : impl(s, std::move(permit)) , iterator_reader(s, std::move(m), range) , _slice(slice) , _fwd_mr(fwd_mr) { } virtual future<> fill_buffer() override { return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] { if (!_delegate) { _delegate_range = get_delegate_range(); if (_delegate_range) { _delegate = delegate_reader(_permit, *_delegate_range, _slice, streamed_mutation::forwarding::no, _fwd_mr); } else { auto key_and_snp = read_section()(region(), [&] () -> std::optional> { memtable_entry *e = fetch_entry(); if (!e) { return { }; } else { // FIXME: Introduce a memtable specific reader that will be returned from // memtable_entry::read and will allow filling the buffer without the overhead of // virtual calls, intermediate buffers and futures. auto key = e->key(); auto snp = e->snapshot(*mtbl()); advance_iterator(); return std::pair(std::move(key), std::move(snp)); } }); if (key_and_snp) { update_last(key_and_snp->first); auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key()); bool digest_requested = _slice.options.contains(); bool is_reversed = _slice.is_reversed(); _delegate = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl()); _delegate->upgrade_schema(schema()); } else { _end_of_stream = true; } } } return is_end_of_stream() ? make_ready_future<>() : fill_buffer_from_delegate(); }); } virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { if (!_delegate_range) { return close_delegate(); } else { return _delegate->next_partition(); } } return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr) override { _end_of_stream = false; clear_buffer(); if (_delegate_range) { return _delegate->fast_forward_to(pr); } else { return close_delegate().then([this, &pr] { return iterator_reader::fast_forward_to(pr); }); } } virtual future<> fast_forward_to(position_range cr) override { throw std::runtime_error("This reader can't be fast forwarded to another partition."); }; virtual future<> close() noexcept override { return close_delegate(); }; }; void memtable::add_flushed_memory(uint64_t delta) { _flushed_memory += delta; if (_flushed_memory > 0) { _dirty_mgr.account_potentially_cleaned_up_memory(this, std::min(delta, _flushed_memory)); } } void memtable::remove_flushed_memory(uint64_t delta) { if (_flushed_memory > 0) { _dirty_mgr.revert_potentially_cleaned_up_memory(this, std::min(delta, _flushed_memory)); } _flushed_memory -= delta; } void memtable::on_detach_from_region_group() noexcept { _merged_into_cache = true; revert_flushed_memory(); } void memtable::revert_flushed_memory() noexcept { if (_flushed_memory > 0) { _dirty_mgr.revert_potentially_cleaned_up_memory(this, _flushed_memory); } _flushed_memory = 0; _total_memory_low_watermark_during_flush = _total_memory; } class flush_memory_accounter { memtable& _mt; public: void update_bytes_read(uint64_t delta) { _mt.add_flushed_memory(delta); } explicit flush_memory_accounter(memtable& mt) : _mt(mt) {} ~flush_memory_accounter() { SCYLLA_ASSERT(_mt._flushed_memory <= static_cast(_mt.occupancy().total_space())); } uint64_t compute_size(memtable_entry& e, partition_snapshot& snp) { return e.size_in_allocator_without_rows(_mt.allocator()) + _mt.allocator().object_memory_size_in_allocator(&*snp.version()); } }; class partition_snapshot_flush_accounter { const schema& _schema; flush_memory_accounter& _accounter; public: partition_snapshot_flush_accounter(const schema& s, flush_memory_accounter& acct) : _schema(s), _accounter(acct) {} // We will be passed mutation fragments here, and they are allocated using the standard // allocator. So we can't compute the size in memtable precisely. However, precise accounting is // hard anyway, since we may be holding multiple snapshots of the partitions, and the // partition_snapshot_reader may compose them. In doing so, we move memory to the standard // allocation. As long as our size read here is lesser or equal to the size in the memtables, we // are safe, and worst case we will allow a bit fewer requests in. void operator()(const range_tombstone_change& rtc) { _accounter.update_bytes_read(rtc.minimal_memory_usage(_schema)); } void operator()(const static_row& sr) { _accounter.update_bytes_read(sr.external_memory_usage(_schema)); } void operator()(const partition_start& ph) {} void operator()(const partition_end& eop) {} void operator()(const clustering_row& cr) { // Every clustering row is stored in a rows_entry object, and that has some significant // overhead - so add it here. We will be a bit short on our estimate because we can't know // what is the size in the allocator for this rows_entry object: we may have many snapshots, // and we don't know which one(s) contributed to the generation of this mutation fragment. // // We will add the size of the struct here, and that should be good enough. _accounter.update_bytes_read(sizeof(rows_entry) + cr.minimal_external_memory_usage(_schema)); } }; static mutation_reader make_partition_snapshot_flat_reader_from_snp_schema( bool is_reversed, reader_permit permit, dht::decorated_key dk, query::clustering_key_filter_ranges crr, partition_snapshot_ptr snp, bool digest_requested, logalloc::region& region, logalloc::allocating_section& read_section, std::any pointer_to_container, streamed_mutation::forwarding fwd, memtable& memtable) { if (is_reversed) { schema_ptr rev_snp_schema = snp->schema()->make_reversed(); return make_partition_snapshot_flat_reader(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); } else { schema_ptr snp_schema = snp->schema(); return make_partition_snapshot_flat_reader(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable); } } class flush_reader final : public mutation_reader::impl, private iterator_reader { // FIXME: Similarly to scanning_reader we have an underlying // mutation_reader for each partition. This is suboptimal. // Partition snapshot reader should be devirtualised and called directly // without using any intermediate buffers. mutation_reader_opt _partition_reader; flush_memory_accounter _flushed_memory; public: flush_reader(schema_ptr s, reader_permit permit, lw_shared_ptr m) : impl(s, std::move(permit)) , iterator_reader(std::move(s), m, query::full_partition_range) , _flushed_memory(*m) {} flush_reader(const flush_reader&) = delete; flush_reader(flush_reader&&) = delete; flush_reader& operator=(flush_reader&&) = delete; flush_reader& operator=(const flush_reader&) = delete; private: void get_next_partition() { uint64_t component_size = 0; auto key_and_snp = read_section()(region(), [&] () -> std::optional> { memtable_entry* e = fetch_entry(); if (e) { auto dk = e->key(); auto snp = e->snapshot(*mtbl()); component_size = _flushed_memory.compute_size(*e, *snp); advance_iterator(); return std::pair(std::move(dk), std::move(snp)); } return { }; }); if (key_and_snp) { _flushed_memory.update_bytes_read(component_size); update_last(key_and_snp->first); auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key()); auto snp_schema = key_and_snp->second->schema(); _partition_reader = make_partition_snapshot_flat_reader(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory); _partition_reader->upgrade_schema(schema()); } } future<> close_partition_reader() noexcept { return _partition_reader ? _partition_reader->close() : make_ready_future<>(); } public: virtual future<> fill_buffer() override { return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] { if (!_partition_reader) { get_next_partition(); if (!_partition_reader) { _end_of_stream = true; return make_ready_future<>(); } } return _partition_reader->consume_pausable([this] (mutation_fragment_v2 mf) { push_mutation_fragment(std::move(mf)); return stop_iteration(is_buffer_full()); }).then([this] { if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) { return _partition_reader->close(); } return make_ready_future<>(); }); }); } virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { return close_partition_reader(); } return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&) override { return make_exception_future<>(make_backtraced_exception_ptr()); } virtual future<> fast_forward_to(position_range) override { return make_exception_future<>(make_backtraced_exception_ptr()); } virtual future<> close() noexcept override { return close_partition_reader(); } }; partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) { return _pe.read(mtbl.region(), mtbl.cleaner(), no_cache_tracker); } mutation_reader_opt memtable::make_mutation_reader_opt(schema_ptr query_schema, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { bool is_reversed = slice.is_reversed(); if (query::is_single_partition(range) && !fwd_mr) { const query::ring_position& pos = range.start()->value(); auto snp = _table_shared_data.read_section(*this, [&] () -> partition_snapshot_ptr { auto i = partitions.find(pos, dht::ring_position_comparator(*_schema)); if (i != partitions.end()) { upgrade_entry(*i); return i->snapshot(*this); } else { return { }; } }); if (!snp) { return {}; } auto dk = pos.as_decorated_key(); auto cr = query::clustering_key_filter_ranges::get_ranges(*query_schema, slice, dk.key()); bool digest_requested = slice.options.contains(); auto rd = make_partition_snapshot_flat_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this); rd.upgrade_schema(query_schema); return rd; } else { auto res = make_mutation_reader(std::move(query_schema), shared_from_this(), std::move(permit), range, slice, fwd_mr); if (fwd == streamed_mutation::forwarding::yes) { return make_forwardable(std::move(res)); } else { return res; } } } mutation_reader memtable::make_flush_reader(schema_ptr s, reader_permit permit) { if (!_merged_into_cache) { revert_flushed_memory(); return make_mutation_reader(std::move(s), std::move(permit), shared_from_this()); } else { auto& full_slice = s->full_slice(); return make_mutation_reader(std::move(s), shared_from_this(), std::move(permit), query::full_partition_range, full_slice, mutation_reader::forwarding::no); } } void memtable::update(db::rp_handle&& h) { db::replay_position rp = h; if (_replay_position < rp) { _replay_position = rp; } _rp_set.put(std::move(h)); } future<> memtable::apply(memtable& mt, reader_permit permit) { if (auto reader_opt = mt.make_mutation_reader_opt(_schema, std::move(permit), query::full_partition_range, _schema->full_slice())) { return with_closeable(std::move(*reader_opt), [this] (auto&& rd) mutable { return consume_partitions(rd, [self = this->shared_from_this()] (mutation&& m) { self->apply(m); return stop_iteration::no; }); }); } [[unlikely]] return make_ready_future<>(); } void memtable::apply(const mutation& m, db::rp_handle&& h) { with_allocator(allocator(), [this, &m] { _table_shared_data.allocating_section(*this, [&, this] { auto& p = find_or_create_partition(m.decorated_key()); _stats_collector.update(*m.schema(), m.partition()); p.apply(region(), cleaner(), *_schema, m.partition(), *m.schema(), _table_stats.memtable_app_stats); }); }); update(std::move(h)); } void memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h) { with_allocator(allocator(), [this, &m, &m_schema] { _table_shared_data.allocating_section(*this, [&, this] { auto& p = find_or_create_partition_slow(m.key()); mutation_partition mp(*m_schema); partition_builder pb(*m_schema, mp); m.partition().accept(*m_schema, pb); _stats_collector.update(*m_schema, mp); p.apply(region(), cleaner(), *_schema, std::move(mp), *m_schema, _table_stats.memtable_app_stats); }); }); update(std::move(h)); } logalloc::occupancy_stats memtable::occupancy() const noexcept { return logalloc::region::occupancy(); } mutation_source memtable::as_data_source() { return mutation_source([mt = shared_from_this()] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { return mt->make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr); }); } memtable_entry::memtable_entry(memtable_entry&& o) noexcept : _key(std::move(o._key)) , _pe(std::move(o._pe)) , _flags(o._flags) { } stop_iteration memtable_entry::clear_gently() noexcept { return _pe.clear_gently(no_cache_tracker); } void memtable::mark_flushed(mutation_source underlying) noexcept { _underlying = std::move(underlying); } bool memtable::is_merging_to_cache() const noexcept { return _merging_into_cache; } bool memtable::is_flushed() const noexcept { return bool(_underlying); } void memtable_entry::upgrade_schema(logalloc::region& r, const schema_ptr& s, mutation_cleaner& cleaner) { if (schema() != s) { partition().upgrade(r, s, cleaner, no_cache_tracker); } } void memtable::upgrade_entry(memtable_entry& e) { if (e.schema() != _schema) { SCYLLA_ASSERT(!reclaiming_enabled()); e.upgrade_schema(region(), _schema, cleaner()); } } void memtable::set_schema(schema_ptr new_schema) noexcept { _schema = std::move(new_schema); } size_t memtable_entry::object_memory_size(allocation_strategy& allocator) { return memtable::partitions_type::estimated_object_memory_size_in_allocator(allocator, this); } } auto fmt::formatter::format(const replica::memtable_entry& mt, fmt::format_context& ctx) const -> decltype(ctx.out()) { return fmt::format_to(ctx.out(), "{{{}: {}}}", mt.key(), partition_entry::printer(mt.partition())); } auto fmt::formatter::format(replica::memtable& mt, fmt::format_context& ctx) const -> decltype(ctx.out()) { logalloc::reclaim_lock rl(mt); return fmt::format_to(ctx.out(), "{{memtable: [{}]}}", fmt::join(mt.partitions, ",\n")); } void replica::memtable::increase_usage(logalloc::region* r, ssize_t delta) { SCYLLA_ASSERT(delta >= 0); _dirty_mgr.region_group().increase_usage(r); _dirty_mgr.region_group().update_unspooled(delta); _total_memory += delta; } void replica::memtable::decrease_evictable_usage(logalloc::region* r) { _dirty_mgr.region_group().decrease_usage(r); } void replica::memtable::decrease_usage(logalloc::region* r, ssize_t delta) { SCYLLA_ASSERT(delta <= 0); _dirty_mgr.region_group().decrease_usage(r); _dirty_mgr.region_group().update_unspooled(delta); _total_memory += delta; if (_total_memory < _total_memory_low_watermark_during_flush) { remove_flushed_memory(_total_memory_low_watermark_during_flush - _total_memory); _total_memory_low_watermark_during_flush = _total_memory; } } void replica::memtable::add(logalloc::region* r) { _dirty_mgr.region_group().add(r); } void replica::memtable::del(logalloc::region* r) { _dirty_mgr.region_group().del(r); } void replica::memtable::moved(logalloc::region* old_address, logalloc::region* new_address) { _dirty_mgr.region_group().moved(old_address, new_address); }