diff --git a/db/virtual_table.cc b/db/virtual_table.cc index 782ca7e416..f4f9e90dda 100644 --- a/db/virtual_table.cc +++ b/db/virtual_table.cc @@ -39,6 +39,7 @@ */ #include "db/virtual_table.hh" +#include "db/chained_delegating_reader.hh" namespace db { @@ -61,4 +62,48 @@ bool virtual_table::contains_key(const dht::partition_range& pr, const dht::deco return pr.contains(dk, dht::ring_position_comparator(*_s)); } +mutation_source memtable_filling_virtual_table::as_mutation_source() { + return mutation_source([this] (schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + + struct my_units { + reader_permit::resource_units units; + uint64_t memory_used; + + my_units(reader_permit::resource_units&& units) : units(std::move(units)), memory_used(0) {} + }; + + auto units = make_lw_shared(permit.consume_memory(0)); + + auto populate = [this, mt = make_lw_shared(schema()), s, units, range, slice, pc, trace_state, fwd, fwd_mr] (db::timeout_clock::time_point timeout) mutable { + auto mutation_sink = [units, mt] (mutation m) mutable { + mt->apply(m); + units->units.add(units->units.permit().consume_memory(mt->occupancy().used_space() - units->memory_used)); + units->memory_used = mt->occupancy().used_space(); + }; + + return execute(mutation_sink, timeout).then([this, mt, s, units, &range, &slice, &pc, &trace_state, &fwd, &fwd_mr] () { + auto rd = make_restricted_flat_reader(mt->as_data_source(), s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr); + + if (!_shard_aware) { + rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool { + return this_shard_owns(dk); + }); + } + + return std::move(rd); + }); + }; + + // populate keeps the memtable alive. + return make_flat_mutation_reader(s, std::move(populate), units->units.permit()); + }); +} + } \ No newline at end of file diff --git a/db/virtual_table.hh b/db/virtual_table.hh index 93de444660..1f685c372c 100644 --- a/db/virtual_table.hh +++ b/db/virtual_table.hh @@ -22,6 +22,7 @@ #pragma once #include "mutation_reader.hh" +#include "memtable.hh" #include "schema.hh" #include "database_fwd.hh" @@ -55,4 +56,16 @@ public: void set_database(database& db) { _db = &db; } }; +// Produces results by filling a memtable on each read. +// Use when the amount of data is not significant relative to shard's memory size. +class memtable_filling_virtual_table : public virtual_table { +public: + using virtual_table::virtual_table; + + virtual future<> execute(std::function mutation_sink, db::timeout_clock::time_point timeout) { return make_ready_future<>(); } + + mutation_source as_mutation_source() override; +}; + + }