Files
scylladb/replica/memtable.cc
Botond Dénes 756837c5b4 partition_snapshot_reader: remove "flat" from name
The "flat" migration is long done, this distinction is no longer
meaningful.
2026-01-26 16:52:46 +02:00

917 lines
35 KiB
C++

/*
* Copyright (C) 2014-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "utils/assert.hh"
#include "memtable.hh"
#include "replica/database.hh"
#include "mutation/frozen_mutation.hh"
#include "replica/partition_snapshot_reader.hh"
#include "partition_builder.hh"
#include "mutation/mutation_partition_view.hh"
#include "readers/empty.hh"
#include "readers/forwardable.hh"
#include "sstables/types.hh"
namespace replica {
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
std::any pointer_to_container,
streamed_mutation::forwarding fwd, memtable& memtable);
memtable::memtable_encoding_stats_collector::memtable_encoding_stats_collector() noexcept
: min_max_timestamp(0, 0)
, min_live_timestamp(api::max_timestamp)
, min_live_row_marker_timestamp(api::max_timestamp)
{}
void memtable::memtable_encoding_stats_collector::update(atomic_cell_view cell) noexcept {
is_live is_live = ::is_live(cell.is_live());
update_timestamp(cell.timestamp(), is_live);
if (cell.is_live_and_has_ttl()) {
update_ttl(cell.ttl());
update_local_deletion_time(cell.expiry());
} else if (!is_live) {
update_local_deletion_time(cell.deletion_time());
}
}
void memtable::memtable_encoding_stats_collector::update(tombstone tomb) noexcept {
if (tomb) {
update_timestamp(tomb.timestamp, is_live::no);
update_local_deletion_time(tomb.deletion_time);
}
}
void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const row& r, column_kind kind) {
r.for_each_cell([this, &s, kind](column_id id, const atomic_cell_or_collection& item) {
auto& col = s.column_at(kind, id);
if (col.is_atomic()) {
update(item.as_atomic_cell(col));
} else {
item.as_collection_mutation().with_deserialized(*col.type, [&] (collection_mutation_view_description mview) {
// Note: when some of the collection cells are dead and some are live
// we need to encode a "live" deletion_time for the living ones.
// It is not strictly required to update encoding_stats for the latter case
// since { <int64_t>.min(), <int32_t>.max() } will not affect the encoding_stats
// minimum values. (See #4035)
update(mview.tomb);
for (auto& entry : mview.cells) {
update(entry.second);
}
});
}
});
}
void memtable::memtable_encoding_stats_collector::update(const range_tombstone& rt) noexcept {
update(rt.tomb);
}
void memtable::memtable_encoding_stats_collector::update(const row_marker& marker) noexcept {
if (marker.is_missing()) {
return;
}
auto timestamp = marker.timestamp();
if (!marker.is_live()) {
update_timestamp(timestamp, is_live::no);
update_ttl(gc_clock::duration(sstables::expired_liveness_ttl));
update_local_deletion_time(marker.deletion_time());
} else {
update_timestamp(timestamp, is_live::yes);
update_live_row_marker_timestamp(timestamp);
if (marker.is_expiring()) {
update_ttl(marker.ttl());
update_local_deletion_time(marker.expiry());
}
}
}
void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const deletable_row& dr) {
update(dr.marker());
row_tombstone row_tomb = dr.deleted_at();
update(row_tomb.regular());
update(row_tomb.tomb());
update(s, dr.cells(), column_kind::regular_column);
}
void memtable::memtable_encoding_stats_collector::update(const ::schema& s, const mutation_partition& mp) {
update(mp.partition_tombstone());
update(s, mp.static_row().get(), column_kind::static_column);
for (auto&& row_entry : mp.clustered_rows()) {
update(s, row_entry.row());
}
for (auto&& rt : mp.row_tombstones()) {
update(rt.tombstone());
}
}
memtable::memtable(schema_ptr schema, dirty_memory_manager& dmm,
memtable_table_shared_data& table_shared_data,
replica::table_stats& table_stats,
memtable_list* memtable_list,
seastar::scheduling_group compaction_scheduling_group,
shared_tombstone_gc_state* shared_gc_state)
: dirty_memory_manager_logalloc::size_tracked_region()
, _dirty_mgr(dmm)
, _cleaner(*this, no_cache_tracker, table_stats.memtable_app_stats, compaction_scheduling_group)
, _memtable_list(memtable_list)
, _schema(std::move(schema))
, _table_shared_data(table_shared_data)
, partitions(dht::raw_token_less_comparator{})
, _table_stats(table_stats) {
if (shared_gc_state) {
_tombstone_gc_snapshot.emplace(shared_gc_state->snapshot());
}
logalloc::region::listen(this);
}
static thread_local dirty_memory_manager mgr_for_tests;
static thread_local replica::table_stats stats_for_tests;
static thread_local memtable_table_shared_data memtable_shared_data_for_tests;
memtable::memtable(schema_ptr schema)
: memtable(std::move(schema), mgr_for_tests,
memtable_shared_data_for_tests, stats_for_tests)
{ }
memtable::~memtable() {
revert_flushed_memory();
clear();
logalloc::region::unlisten();
}
void memtable::evict_entry(memtable_entry& e, mutation_cleaner& cleaner) noexcept {
e.partition().evict(cleaner);
nr_partitions--;
}
void memtable::clear() noexcept {
with_allocator(allocator(), [this] {
partitions.clear_and_dispose([this] (memtable_entry* e) noexcept {
evict_entry(*e, _cleaner);
});
});
}
future<> memtable::clear_gently() noexcept {
return futurize_invoke([this] {
auto t = std::make_unique<seastar::thread>([this] {
auto& alloc = allocator();
auto p = std::move(partitions);
nr_partitions = 0;
while (!p.empty()) {
with_allocator(alloc, [&] () noexcept {
while (!p.empty()) {
if (p.begin()->clear_gently() == stop_iteration::no) {
break;
}
p.begin().erase(dht::raw_token_less_comparator{});
if (need_preempt()) {
break;
}
}
});
seastar::thread::yield();
}
/*
* The collection is not guaranteed to free everything
* with the last erase. If anything gets freed in destructor,
* it will be unaccounted from wrong allocator, so handle it
*/
with_allocator(alloc, [&p] { p.clear(); });
});
auto f = t->join();
return f.then([t = std::move(t)] {});
}).handle_exception([this] (auto e) {
this->clear();
});
}
partition_entry&
memtable::find_or_create_partition_slow(partition_key_view key) {
SCYLLA_ASSERT(!reclaiming_enabled());
// FIXME: Perform lookup using std::pair<token, partition_key_view>
// to avoid unconditional copy of the partition key.
// We can't do it right now because std::map<> which holds
// partitions doesn't support heterogeneous lookup.
// We could switch to boost::intrusive_map<> similar to what we have for row keys.
auto& outer = current_allocator();
return with_allocator(standard_allocator(), [&, this] () -> partition_entry& {
auto dk = dht::decorate_key(*_schema, key);
return with_allocator(outer, [&dk, this] () -> partition_entry& {
return find_or_create_partition(dk);
});
});
}
partition_entry&
memtable::find_or_create_partition(const dht::decorated_key& key) {
SCYLLA_ASSERT(!reclaiming_enabled());
// call lower_bound so we have a hint for the insert, just in case.
partitions_type::bound_hint hint;
auto i = partitions.lower_bound(key, dht::ring_position_comparator(*_schema), hint);
if (i == partitions.end() || !hint.match) {
partitions_type::iterator entry = partitions.emplace_before(i,
key.token().raw(), hint,
_schema, dht::decorated_key(key), mutation_partition(*_schema));
++nr_partitions;
++_table_stats.memtable_partition_insertions;
if (!hint.emplace_keeps_iterators()) {
current_allocator().invalidate_references();
}
return entry->partition();
} else {
++_table_stats.memtable_partition_hits;
upgrade_entry(*i);
}
return i->partition();
}
bool
memtable::contains_partition(const dht::decorated_key& key) const {
return partitions.find(key, dht::ring_position_comparator(*_schema)) != partitions.end();
}
std::ranges::subrange<memtable::partitions_type::const_iterator>
memtable::slice(const dht::partition_range& range) const {
if (query::is_single_partition(range)) {
const query::ring_position& pos = range.start()->value();
auto i = partitions.find(pos, dht::ring_position_comparator(*_schema));
if (i != partitions.end()) {
return {i, std::next(i)};
} else {
return {i, i};
}
} else {
auto cmp = dht::ring_position_comparator(*_schema);
auto i1 = range.start()
? (range.start()->is_inclusive()
? partitions.lower_bound(range.start()->value(), cmp)
: partitions.upper_bound(range.start()->value(), cmp))
: partitions.cbegin();
auto i2 = range.end()
? (range.end()->is_inclusive()
? partitions.upper_bound(range.end()->value(), cmp)
: partitions.lower_bound(range.end()->value(), cmp))
: partitions.cend();
return {i1, i2};
}
}
class iterator_reader {
// DO NOT RELEASE the memtable! Keep a reference to it, so it stays in
// memtable_list::_flushed_memtables_with_active_reads and so that it keeps
// blocking tombstone GC of tombstone in the cache, which cover data that
// used to be in this memtable, and which will possibly be produced by this
// reader later on.
lw_shared_ptr<memtable> _memtable;
schema_ptr _schema;
const dht::partition_range* _range;
std::optional<dht::decorated_key> _last;
memtable::partitions_type::iterator _i;
memtable::partitions_type::iterator _end;
uint64_t _last_reclaim_counter;
size_t _last_partition_count = 0;
memtable::partitions_type::iterator lookup_end() {
auto cmp = dht::ring_position_comparator(*_memtable->_schema);
return _range->end()
? (_range->end()->is_inclusive()
? _memtable->partitions.upper_bound(_range->end()->value(), cmp)
: _memtable->partitions.lower_bound(_range->end()->value(), cmp))
: _memtable->partitions.end();
}
void update_iterators() {
// We must be prepared that iterators may get invalidated during compaction.
auto current_reclaim_counter = _memtable->reclaim_counter();
auto cmp = dht::ring_position_comparator(*_memtable->_schema);
if (_last) {
if (current_reclaim_counter != _last_reclaim_counter ||
_last_partition_count != _memtable->partition_count()) {
_i = _memtable->partitions.upper_bound(*_last, cmp);
_end = lookup_end();
_last_partition_count = _memtable->partition_count();
}
} else {
// Initial lookup
_i = _range->start()
? (_range->start()->is_inclusive()
? _memtable->partitions.lower_bound(_range->start()->value(), cmp)
: _memtable->partitions.upper_bound(_range->start()->value(), cmp))
: _memtable->partitions.begin();
_end = lookup_end();
_last_partition_count = _memtable->partition_count();
}
_last_reclaim_counter = current_reclaim_counter;
}
protected:
iterator_reader(schema_ptr s,
lw_shared_ptr<memtable> m,
const dht::partition_range& range)
: _memtable(std::move(m))
, _schema(std::move(s))
, _range(&range)
{ }
memtable_entry* fetch_entry() {
update_iterators();
if (_i == _end) {
return nullptr;
} else {
memtable_entry& e = *_i;
_memtable->upgrade_entry(e);
return &e;
}
}
void advance_iterator() {
++_i;
}
void update_last(dht::decorated_key last) {
_last = std::move(last);
}
logalloc::allocating_section& read_section() {
return _memtable->_table_shared_data.read_section;
}
lw_shared_ptr<memtable> mtbl() {
return _memtable;
}
schema_ptr schema() {
return _schema;
}
logalloc::region& region() {
return *_memtable;
};
std::optional<dht::partition_range> get_delegate_range() {
// We cannot run concurrently with row_cache::update().
if (_memtable->is_flushed()) {
return _last ? _range->split_after(*_last, dht::ring_position_comparator(*_memtable->_schema)) : *_range;
}
return {};
}
mutation_reader delegate_reader(reader_permit permit,
const dht::partition_range& delegate,
const query::partition_slice& slice,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
auto ret = _memtable->_underlying->make_mutation_reader(_schema, std::move(permit), delegate, slice, nullptr, fwd, fwd_mr);
_last = {};
return ret;
}
future<> fast_forward_to(const dht::partition_range& pr) {
_range = &pr;
_last = { };
return make_ready_future<>();
}
};
class partition_snapshot_read_accounter {
memtable& _mt;
public:
explicit partition_snapshot_read_accounter(memtable& mt): _mt(mt) {}
void operator()(const clustering_row& cr) {
if (cr.tomb()) {
++_mt._table_stats.memtable_row_tombstone_reads;
}
}
void operator()(const static_row& sr) {}
void operator()(const range_tombstone_change& rt) {
++_mt._table_stats.memtable_range_tombstone_reads;
}
void operator()(const partition_start& ph) {}
void operator()(const partition_end& eop) {}
};
class scanning_reader final : public mutation_reader::impl, private iterator_reader {
std::optional<dht::partition_range> _delegate_range;
mutation_reader_opt _delegate;
const query::partition_slice& _slice;
mutation_reader::forwarding _fwd_mr;
struct consumer {
scanning_reader* _reader;
explicit consumer(scanning_reader* r) : _reader(r) {}
stop_iteration operator()(mutation_fragment_v2 mf) {
_reader->push_mutation_fragment(std::move(mf));
return stop_iteration(_reader->is_buffer_full());
}
};
future<> fill_buffer_from_delegate() {
return _delegate->consume_pausable(consumer(this)).then([this] {
if (_delegate->is_end_of_stream() && _delegate->is_buffer_empty()) {
if (_delegate_range) {
_end_of_stream = true;
} else {
return close_delegate();
}
}
return make_ready_future<>();
});
}
future<> close_delegate() noexcept {
return _delegate ? _delegate->close() : make_ready_future<>();
};
public:
scanning_reader(schema_ptr s,
lw_shared_ptr<memtable> m,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
mutation_reader::forwarding fwd_mr)
: impl(s, std::move(permit))
, iterator_reader(s, std::move(m), range)
, _slice(slice)
, _fwd_mr(fwd_mr)
{ }
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
if (!_delegate) {
_delegate_range = get_delegate_range();
if (_delegate_range) {
_delegate = delegate_reader(_permit, *_delegate_range, _slice, streamed_mutation::forwarding::no, _fwd_mr);
} else {
auto key_and_snp = read_section()(region(), [&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry *e = fetch_entry();
if (!e) {
return { };
} else {
// FIXME: Introduce a memtable specific reader that will be returned from
// memtable_entry::read and will allow filling the buffer without the overhead of
// virtual calls, intermediate buffers and futures.
auto key = e->key();
auto snp = e->snapshot(*mtbl());
advance_iterator();
return std::pair(std::move(key), std::move(snp));
}
});
if (key_and_snp) {
update_last(key_and_snp->first);
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), _slice, key_and_snp->first.key());
bool digest_requested = _slice.options.contains<query::partition_slice::option::with_digest>();
bool is_reversed = _slice.is_reversed();
_delegate = make_partition_snapshot_reader_from_snp_schema(is_reversed, _permit, std::move(key_and_snp->first), std::move(cr), std::move(key_and_snp->second), digest_requested, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *mtbl());
_delegate->upgrade_schema(schema());
} else {
_end_of_stream = true;
}
}
}
return is_end_of_stream() ? make_ready_future<>() : fill_buffer_from_delegate();
});
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
if (!_delegate_range) {
return close_delegate();
} else {
return _delegate->next_partition();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
_end_of_stream = false;
clear_buffer();
if (_delegate_range) {
return _delegate->fast_forward_to(pr);
} else {
return close_delegate().then([this, &pr] {
return iterator_reader::fast_forward_to(pr);
});
}
}
virtual future<> fast_forward_to(position_range cr) override {
throw std::runtime_error("This reader can't be fast forwarded to another partition.");
};
virtual future<> close() noexcept override {
return close_delegate();
};
};
void memtable::add_flushed_memory(uint64_t delta) {
_flushed_memory += delta;
if (_flushed_memory > 0) {
_dirty_mgr.account_potentially_cleaned_up_memory(this, std::min<int64_t>(delta, _flushed_memory));
}
}
void memtable::remove_flushed_memory(uint64_t delta) {
if (_flushed_memory > 0) {
_dirty_mgr.revert_potentially_cleaned_up_memory(this, std::min<int64_t>(delta, _flushed_memory));
}
_flushed_memory -= delta;
}
void memtable::on_detach_from_region_group() noexcept {
_merged_into_cache = true;
revert_flushed_memory();
}
void memtable::revert_flushed_memory() noexcept {
if (_flushed_memory > 0) {
_dirty_mgr.revert_potentially_cleaned_up_memory(this, _flushed_memory);
}
_flushed_memory = 0;
_total_memory_low_watermark_during_flush = _total_memory;
}
class flush_memory_accounter {
memtable& _mt;
public:
void update_bytes_read(uint64_t delta) {
_mt.add_flushed_memory(delta);
}
explicit flush_memory_accounter(memtable& mt)
: _mt(mt)
{}
~flush_memory_accounter() {
SCYLLA_ASSERT(_mt._flushed_memory <= static_cast<int64_t>(_mt.occupancy().total_space()));
}
uint64_t compute_size(memtable_entry& e, partition_snapshot& snp) {
return e.size_in_allocator_without_rows(_mt.allocator())
+ _mt.allocator().object_memory_size_in_allocator(&*snp.version());
}
};
class partition_snapshot_flush_accounter {
const schema& _schema;
flush_memory_accounter& _accounter;
public:
partition_snapshot_flush_accounter(const schema& s, flush_memory_accounter& acct)
: _schema(s), _accounter(acct) {}
// We will be passed mutation fragments here, and they are allocated using the standard
// allocator. So we can't compute the size in memtable precisely. However, precise accounting is
// hard anyway, since we may be holding multiple snapshots of the partitions, and the
// partition_snapshot_reader may compose them. In doing so, we move memory to the standard
// allocation. As long as our size read here is lesser or equal to the size in the memtables, we
// are safe, and worst case we will allow a bit fewer requests in.
void operator()(const range_tombstone_change& rtc) {
_accounter.update_bytes_read(rtc.minimal_memory_usage(_schema));
}
void operator()(const static_row& sr) {
_accounter.update_bytes_read(sr.external_memory_usage(_schema));
}
void operator()(const partition_start& ph) {}
void operator()(const partition_end& eop) {}
void operator()(const clustering_row& cr) {
// Every clustering row is stored in a rows_entry object, and that has some significant
// overhead - so add it here. We will be a bit short on our estimate because we can't know
// what is the size in the allocator for this rows_entry object: we may have many snapshots,
// and we don't know which one(s) contributed to the generation of this mutation fragment.
//
// We will add the size of the struct here, and that should be good enough.
_accounter.update_bytes_read(sizeof(rows_entry) + cr.minimal_external_memory_usage(_schema));
}
};
static mutation_reader make_partition_snapshot_reader_from_snp_schema(
bool is_reversed,
reader_permit permit,
dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
partition_snapshot_ptr snp,
bool digest_requested,
logalloc::region& region,
logalloc::allocating_section& read_section,
std::any pointer_to_container,
streamed_mutation::forwarding fwd, memtable& memtable) {
if (is_reversed) {
schema_ptr rev_snp_schema = snp->schema()->make_reversed();
return make_partition_snapshot_reader<true, partition_snapshot_read_accounter>(std::move(rev_snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
} else {
schema_ptr snp_schema = snp->schema();
return make_partition_snapshot_reader<false, partition_snapshot_read_accounter>(std::move(snp_schema), std::move(permit), std::move(dk), std::move(crr), std::move(snp), digest_requested, region, read_section, pointer_to_container, fwd, memtable);
}
}
class flush_reader final : public mutation_reader::impl, private iterator_reader {
// FIXME: Similarly to scanning_reader we have an underlying
// mutation_reader for each partition. This is suboptimal.
// Partition snapshot reader should be devirtualised and called directly
// without using any intermediate buffers.
mutation_reader_opt _partition_reader;
flush_memory_accounter _flushed_memory;
public:
flush_reader(schema_ptr s, reader_permit permit, lw_shared_ptr<memtable> m)
: impl(s, std::move(permit))
, iterator_reader(std::move(s), m, query::full_partition_range)
, _flushed_memory(*m)
{}
flush_reader(const flush_reader&) = delete;
flush_reader(flush_reader&&) = delete;
flush_reader& operator=(flush_reader&&) = delete;
flush_reader& operator=(const flush_reader&) = delete;
private:
void get_next_partition() {
uint64_t component_size = 0;
auto key_and_snp = read_section()(region(), [&] () -> std::optional<std::pair<dht::decorated_key, partition_snapshot_ptr>> {
memtable_entry* e = fetch_entry();
if (e) {
auto dk = e->key();
auto snp = e->snapshot(*mtbl());
component_size = _flushed_memory.compute_size(*e, *snp);
advance_iterator();
return std::pair(std::move(dk), std::move(snp));
}
return { };
});
if (key_and_snp) {
_flushed_memory.update_bytes_read(component_size);
update_last(key_and_snp->first);
auto cr = query::clustering_key_filter_ranges::get_ranges(*schema(), schema()->full_slice(), key_and_snp->first.key());
auto snp_schema = key_and_snp->second->schema();
_partition_reader = make_partition_snapshot_reader<false, partition_snapshot_flush_accounter>(snp_schema, _permit, std::move(key_and_snp->first), std::move(cr),
std::move(key_and_snp->second), false, region(), read_section(), mtbl(), streamed_mutation::forwarding::no, *snp_schema, _flushed_memory);
_partition_reader->upgrade_schema(schema());
}
}
future<> close_partition_reader() noexcept {
return _partition_reader ? _partition_reader->close() : make_ready_future<>();
}
public:
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
if (!_partition_reader) {
get_next_partition();
if (!_partition_reader) {
_end_of_stream = true;
return make_ready_future<>();
}
}
return _partition_reader->consume_pausable([this] (mutation_fragment_v2 mf) {
push_mutation_fragment(std::move(mf));
return stop_iteration(is_buffer_full());
}).then([this] {
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
return _partition_reader->close();
}
return make_ready_future<>();
});
});
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
return close_partition_reader();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
return close_partition_reader();
}
};
partition_snapshot_ptr memtable_entry::snapshot(memtable& mtbl) {
return _pe.read(mtbl.region(), mtbl.cleaner(), no_cache_tracker);
}
mutation_reader_opt
memtable::make_mutation_reader_opt(schema_ptr query_schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
bool is_reversed = slice.is_reversed();
if (query::is_single_partition(range) && !fwd_mr) {
const query::ring_position& pos = range.start()->value();
auto snp = _table_shared_data.read_section(*this, [&] () -> partition_snapshot_ptr {
auto i = partitions.find(pos, dht::ring_position_comparator(*_schema));
if (i != partitions.end()) {
upgrade_entry(*i);
return i->snapshot(*this);
} else {
return { };
}
});
if (!snp) {
return {};
}
auto dk = pos.as_decorated_key();
auto cr = query::clustering_key_filter_ranges::get_ranges(*query_schema, slice, dk.key());
bool digest_requested = slice.options.contains<query::partition_slice::option::with_digest>();
auto rd = make_partition_snapshot_reader_from_snp_schema(is_reversed, std::move(permit), std::move(dk), std::move(cr), std::move(snp), digest_requested, *this, _table_shared_data.read_section, shared_from_this(), fwd, *this);
rd.upgrade_schema(query_schema);
return rd;
} else {
auto res = make_mutation_reader<scanning_reader>(std::move(query_schema), shared_from_this(), std::move(permit), range, slice, fwd_mr);
if (fwd == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(res));
} else {
return res;
}
}
}
mutation_reader
memtable::make_flush_reader(schema_ptr s, reader_permit permit) {
if (!_merged_into_cache) {
revert_flushed_memory();
return make_mutation_reader<flush_reader>(std::move(s), std::move(permit), shared_from_this());
} else {
auto& full_slice = s->full_slice();
return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), std::move(permit),
query::full_partition_range, full_slice, mutation_reader::forwarding::no);
}
}
void
memtable::update(db::rp_handle&& h) {
db::replay_position rp = h;
if (_replay_position < rp) {
_replay_position = rp;
}
_rp_set.put(std::move(h));
}
future<>
memtable::apply(memtable& mt, reader_permit permit) {
if (auto reader_opt = mt.make_mutation_reader_opt(_schema, std::move(permit), query::full_partition_range, _schema->full_slice())) {
return with_closeable(std::move(*reader_opt), [this] (auto&& rd) mutable {
return consume_partitions(rd, [self = this->shared_from_this()] (mutation&& m) {
self->apply(m);
return stop_iteration::no;
});
});
}
[[unlikely]] return make_ready_future<>();
}
void
memtable::apply(const mutation& m, db::rp_handle&& h) {
with_allocator(allocator(), [this, &m] {
_table_shared_data.allocating_section(*this, [&, this] {
auto& p = find_or_create_partition(m.decorated_key());
_stats_collector.update(*m.schema(), m.partition());
p.apply(region(), cleaner(), *_schema, m.partition(), *m.schema(), _table_stats.memtable_app_stats);
});
});
update(std::move(h));
}
void
memtable::apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& h) {
with_allocator(allocator(), [this, &m, &m_schema] {
_table_shared_data.allocating_section(*this, [&, this] {
auto& p = find_or_create_partition_slow(m.key());
mutation_partition mp(*m_schema);
partition_builder pb(*m_schema, mp);
m.partition().accept(*m_schema, pb);
_stats_collector.update(*m_schema, mp);
p.apply(region(), cleaner(), *_schema, std::move(mp), *m_schema, _table_stats.memtable_app_stats);
});
});
update(std::move(h));
}
logalloc::occupancy_stats memtable::occupancy() const noexcept {
return logalloc::region::occupancy();
}
mutation_source memtable::as_data_source() {
return mutation_source([mt = shared_from_this()] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return mt->make_mutation_reader(std::move(s), std::move(permit), range, slice, std::move(trace_state), fwd, fwd_mr);
});
}
memtable_entry::memtable_entry(memtable_entry&& o) noexcept
: _key(std::move(o._key))
, _pe(std::move(o._pe))
, _flags(o._flags)
{ }
stop_iteration memtable_entry::clear_gently() noexcept {
return _pe.clear_gently(no_cache_tracker);
}
void memtable::mark_flushed(mutation_source underlying) noexcept {
_underlying = std::move(underlying);
}
bool memtable::is_merging_to_cache() const noexcept {
return _merging_into_cache;
}
bool memtable::is_flushed() const noexcept {
return bool(_underlying);
}
void memtable_entry::upgrade_schema(logalloc::region& r, const schema_ptr& s, mutation_cleaner& cleaner) {
if (schema() != s) {
partition().upgrade(r, s, cleaner, no_cache_tracker);
}
}
void memtable::upgrade_entry(memtable_entry& e) {
if (e.schema() != _schema) {
SCYLLA_ASSERT(!reclaiming_enabled());
e.upgrade_schema(region(), _schema, cleaner());
}
}
void memtable::set_schema(schema_ptr new_schema) noexcept {
_schema = std::move(new_schema);
}
size_t memtable_entry::object_memory_size(allocation_strategy& allocator) {
return memtable::partitions_type::estimated_object_memory_size_in_allocator(allocator, this);
}
}
auto fmt::formatter<replica::memtable_entry>::format(const replica::memtable_entry& mt,
fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{{{}: {}}}", mt.key(), partition_entry::printer(mt.partition()));
}
auto fmt::formatter<replica::memtable>::format(replica::memtable& mt,
fmt::format_context& ctx) const -> decltype(ctx.out()) {
logalloc::reclaim_lock rl(mt);
return fmt::format_to(ctx.out(), "{{memtable: [{}]}}", fmt::join(mt.partitions, ",\n"));
}
void replica::memtable::increase_usage(logalloc::region* r, ssize_t delta) {
SCYLLA_ASSERT(delta >= 0);
_dirty_mgr.region_group().increase_usage(r);
_dirty_mgr.region_group().update_unspooled(delta);
_total_memory += delta;
}
void replica::memtable::decrease_evictable_usage(logalloc::region* r) {
_dirty_mgr.region_group().decrease_usage(r);
}
void replica::memtable::decrease_usage(logalloc::region* r, ssize_t delta) {
SCYLLA_ASSERT(delta <= 0);
_dirty_mgr.region_group().decrease_usage(r);
_dirty_mgr.region_group().update_unspooled(delta);
_total_memory += delta;
if (_total_memory < _total_memory_low_watermark_during_flush) {
remove_flushed_memory(_total_memory_low_watermark_during_flush - _total_memory);
_total_memory_low_watermark_during_flush = _total_memory;
}
}
void replica::memtable::add(logalloc::region* r) {
_dirty_mgr.region_group().add(r);
}
void replica::memtable::del(logalloc::region* r) {
_dirty_mgr.region_group().del(r);
}
void replica::memtable::moved(logalloc::region* old_address, logalloc::region* new_address) {
_dirty_mgr.region_group().moved(old_address, new_address);
}