Files
scylladb/db/virtual_table.cc
Mikołaj Sielużycki 1d84a254c0 flat_mutation_reader: Split readers by file and remove unnecessary includes.
The flat_mutation_reader files were conflated and contained multiple
readers, which were not strictly necessary. Splitting optimizes both
iterative compilation times, as touching rarely used readers doesn't
recompile large chunks of codebase. Total compilation times are also
improved, as the size of flat_mutation_reader.hh and
flat_mutation_reader_v2.hh have been reduced and those files are
included by many file in the codebase.

With changes

real	29m14.051s
user	168m39.071s
sys	5m13.443s

Without changes

real	30m36.203s
user	175m43.354s
sys	5m26.376s

Closes #10194
2022-03-14 13:20:25 +02:00

188 lines
7.0 KiB
C++

/*
*/
/*
* Modified by ScyllaDB
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include "db/virtual_table.hh"
#include "db/chained_delegating_reader.hh"
#include "readers/reversing.hh"
#include "readers/forwardable.hh"
namespace db {
void virtual_table::set_cell(row& cr, const bytes& column_name, data_value value) {
auto ts = api::new_timestamp();
auto cdef = schema()->get_column_definition(column_name);
if (!cdef) {
throw_with_backtrace<std::runtime_error>(format("column not found: {}", column_name));
}
if (!value.is_null()) {
cr.apply(*cdef, atomic_cell::make_live(*cdef->type, ts, value.serialize_nonnull()));
}
}
bool virtual_table::this_shard_owns(const dht::decorated_key& dk) const {
return dht::shard_of(*_s, dk.token()) == this_shard_id();
}
bool virtual_table::contains_key(const dht::partition_range& pr, const dht::decorated_key& dk) const {
return pr.contains(dk, dht::ring_position_comparator(*_s));
}
mutation_source memtable_filling_virtual_table::as_mutation_source() {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
struct my_units {
reader_permit::resource_units units;
uint64_t memory_used;
my_units(reader_permit::resource_units&& units) : units(std::move(units)), memory_used(0) {}
};
auto units = make_lw_shared<my_units>(permit.consume_memory(0));
auto populate = [this, mt = make_lw_shared<replica::memtable>(schema()), s, units, range, slice, pc, trace_state, fwd, fwd_mr] () mutable {
auto mutation_sink = [units, mt] (mutation m) mutable {
mt->apply(m);
units->units.add(units->units.permit().consume_memory(mt->occupancy().used_space() - units->memory_used));
units->memory_used = mt->occupancy().used_space();
};
return execute(mutation_sink).then([this, mt, s, units, &range, &slice, &pc, &trace_state, &fwd, &fwd_mr] () {
auto rd = mt->as_data_source().make_reader_v2(s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr);
if (!_shard_aware) {
rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool {
return this_shard_owns(dk);
});
}
return rd;
});
};
// populate keeps the memtable alive.
return make_flat_mutation_reader_v2<chained_delegating_reader>(s, std::move(populate), units->units.permit());
});
}
mutation_source streaming_virtual_table::as_mutation_source() {
return mutation_source([this] (schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& query_slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
std::unique_ptr<query::partition_slice> unreversed_slice;
bool reversed = query_slice.is_reversed();
if (reversed) {
s = s->make_reversed();
unreversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*s, query_slice));
}
const auto& slice = reversed ? *unreversed_slice : query_slice;
// We cannot pass the partition_range directly to execute()
// because it is not guaranteed to be alive until execute() resolves.
// It is only guaranteed to be alive as long as the returned reader is alive.
// We achieve safety by mediating access through query_restrictions. When the reader
// dies, pr is cleared and execute() will get an exception.
struct my_result_collector : public result_collector, public query_restrictions {
queue_reader_handle handle;
// Valid until handle.is_terminated(), which is set to true when the
// queue_reader dies.
const dht::partition_range* pr;
mutation_reader::forwarding fwd_mr;
my_result_collector(schema_ptr s, reader_permit p, const dht::partition_range* pr, queue_reader_handle&& handle)
: result_collector(s, p)
, handle(std::move(handle))
, pr(pr)
{ }
// result_collector
future<> take(mutation_fragment fragment) override {
return handle.push(std::move(fragment));
}
// query_restrictions
const dht::partition_range& partition_range() const override {
if (handle.is_terminated()) {
throw std::runtime_error("read abandoned");
}
return *pr;
}
};
auto reader_and_handle = make_queue_reader(s, permit);
auto consumer = std::make_unique<my_result_collector>(s, permit, &pr, std::move(reader_and_handle.second));
auto f = execute(permit, *consumer, *consumer);
// It is safe to discard this future because:
// - after calling `handle.push_end_of_stream()` the reader can be discarded;
// - if the reader dies first, `execute()` will get an exception on attempt to push fragments.
(void)f.then_wrapped([c = std::move(consumer)] (auto&& f) {
if (f.failed()) {
c->handle.abort(f.get_exception());
} else if (!c->handle.is_terminated()) {
c->handle.push_end_of_stream();
}
});
auto rd = make_slicing_filtering_reader(std::move(reader_and_handle.first), pr, slice);
if (!_shard_aware) {
rd = downgrade_to_v1(make_filtering_reader(upgrade_to_v2(std::move(rd)), [this] (const dht::decorated_key& dk) -> bool {
return this_shard_owns(dk);
}));
}
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
if (fwd == streamed_mutation::forwarding::yes) {
rd = make_forwardable(std::move(rd));
}
return rd;
});
}
future<> result_collector::emit_partition_start(dht::decorated_key dk) {
return take(mutation_fragment(*_schema, _permit, partition_start(std::move(dk), {})));
}
future<> result_collector::emit_partition_end() {
return take(mutation_fragment(*_schema, _permit, partition_end()));
}
future<> result_collector::emit_row(clustering_row&& cr) {
return take(mutation_fragment(*_schema, _permit, std::move(cr)));
}
future<> virtual_table::apply(const frozen_mutation&) {
return make_exception_future<>(
virtual_table_update_exception("this virtual table doesn't allow updates")
);
}
}