frozen_mutation: introduce consume method
Allowing to consume the frozen_mutation directly to a stream rather than unfreezing it first and then consuming the unfrozen mutation. Streaming directly from the frozen_mutation saves both cpu and memory, and will make it easier to be made async as a follow, to allow yielding, e.g. between rows. This is used today only in to_data_query_result which is invoked on the read-repair path. Refs #10038 Fixes #10021 Test: unit(release) Signed-off-by: Benny Halevy <bhalevy@scylladb.com> Message-Id: <20220405055807.1834494-1-bhalevy@scylladb.com>
This commit is contained in:
committed by
Botond Dénes
parent
67e0590bbc
commit
abbf5de68c
@@ -11,15 +11,134 @@
|
||||
#include "dht/i_partitioner.hh"
|
||||
#include "replica/database_fwd.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
#include "mutation_fragment_v2.hh"
|
||||
#include "mutation_partition_view.hh"
|
||||
#include "mutation_consumer_concepts.hh"
|
||||
#include "range_tombstone_change_generator.hh"
|
||||
|
||||
class mutation;
|
||||
class mutation_partition_view;
|
||||
class flat_mutation_reader_v2;
|
||||
|
||||
namespace ser {
|
||||
class mutation_view;
|
||||
}
|
||||
|
||||
template<typename Result>
|
||||
struct frozen_mutation_consume_result {
|
||||
stop_iteration stop;
|
||||
Result result;
|
||||
};
|
||||
|
||||
template<>
|
||||
struct frozen_mutation_consume_result<void> {
|
||||
stop_iteration stop;
|
||||
};
|
||||
|
||||
// mutation_partition_view visitor which consumes a frozen_mutation.
|
||||
template<FlattenedConsumerV2 Consumer>
|
||||
class frozen_mutation_consumer_adaptor final : public mutation_partition_view_virtual_visitor {
|
||||
private:
|
||||
const schema& _schema;
|
||||
std::optional<dht::decorated_key> _dk;
|
||||
lazy_row _static_row;
|
||||
range_tombstone_change_generator _rt_gen;
|
||||
alloc_strategy_unique_ptr<rows_entry> _current_row_entry;
|
||||
deletable_row* _current_row = nullptr;
|
||||
Consumer& _consumer;
|
||||
stop_iteration _stop_consuming = stop_iteration::no;
|
||||
|
||||
void flush_rows_and_tombstones(position_in_partition_view pos) {
|
||||
if (!_static_row.empty()) {
|
||||
auto row = std::move(_static_row.get_existing());
|
||||
if (!_stop_consuming) {
|
||||
_stop_consuming = _consumer.consume(static_row(std::move(row)));
|
||||
}
|
||||
}
|
||||
_rt_gen.flush(pos, [this] (range_tombstone_change rt) {
|
||||
if (!_stop_consuming) {
|
||||
_stop_consuming = _consumer.consume(std::move(rt));
|
||||
}
|
||||
});
|
||||
if (_current_row) {
|
||||
auto row_entry = std::move(_current_row_entry);
|
||||
_current_row = nullptr;
|
||||
if (!_stop_consuming) {
|
||||
_stop_consuming = _consumer.consume(clustering_row(std::move(*row_entry)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
frozen_mutation_consumer_adaptor(schema_ptr s, Consumer& consumer)
|
||||
: _schema(*s)
|
||||
, _rt_gen(_schema)
|
||||
, _consumer(consumer)
|
||||
{
|
||||
}
|
||||
|
||||
Consumer& consumer() {
|
||||
return _consumer;
|
||||
}
|
||||
|
||||
void on_new_partition(const partition_key& key) {
|
||||
_rt_gen.reset();
|
||||
_dk = dht::decorate_key(_schema, key);
|
||||
_consumer.consume_new_partition(*_dk);
|
||||
}
|
||||
|
||||
virtual void accept_partition_tombstone(tombstone t) override {
|
||||
_consumer.consume(t);
|
||||
}
|
||||
|
||||
virtual void accept_static_cell(column_id id, atomic_cell cell) override {
|
||||
row& r = _static_row.maybe_create();
|
||||
r.append_cell(id, atomic_cell_or_collection(std::move(cell)));
|
||||
}
|
||||
|
||||
virtual void accept_static_cell(column_id id, collection_mutation_view collection) override {
|
||||
row& r = _static_row.maybe_create();
|
||||
r.append_cell(id, collection_mutation(*_schema.static_column_at(id).type, std::move(collection)));
|
||||
}
|
||||
|
||||
virtual void accept_row_tombstone(range_tombstone rt) override {
|
||||
flush_rows_and_tombstones(rt.position());
|
||||
_rt_gen.consume(std::move(rt));
|
||||
}
|
||||
|
||||
virtual void accept_row(position_in_partition_view key, row_tombstone deleted_at, row_marker rm, is_dummy dummy, is_continuous continuous) override {
|
||||
flush_rows_and_tombstones(key);
|
||||
_current_row_entry = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, key, dummy, continuous));
|
||||
deletable_row& r = _current_row_entry->row();
|
||||
r.apply(rm);
|
||||
r.apply(deleted_at);
|
||||
_current_row = &r;
|
||||
}
|
||||
|
||||
void accept_row_cell(column_id id, atomic_cell cell) override {
|
||||
row& r = _current_row->cells();
|
||||
r.append_cell(id, std::move(cell));
|
||||
}
|
||||
|
||||
virtual void accept_row_cell(column_id id, collection_mutation_view collection) override {
|
||||
row& r = _current_row->cells();
|
||||
r.append_cell(id, collection_mutation(*_schema.regular_column_at(id).type, std::move(collection)));
|
||||
}
|
||||
|
||||
auto on_end_of_partition() {
|
||||
flush_rows_and_tombstones(position_in_partition::after_all_clustered_rows());
|
||||
if (_consumer.consume_end_of_partition()) {
|
||||
_stop_consuming = stop_iteration::yes;
|
||||
}
|
||||
using consume_res_type = decltype(_consumer.consume_end_of_stream());
|
||||
if constexpr (std::is_same_v<consume_res_type, void>) {
|
||||
_consumer.consume_end_of_stream();
|
||||
return frozen_mutation_consume_result<void>{_stop_consuming};
|
||||
} else {
|
||||
return frozen_mutation_consume_result<consume_res_type>{_stop_consuming, _consumer.consume_end_of_stream()};
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Immutable, compact form of mutation.
|
||||
//
|
||||
// This form is primarily destined to be sent over the network channel.
|
||||
@@ -59,6 +178,21 @@ public:
|
||||
// Automatically upgrades the stored mutation to the supplied schema with custom column mapping.
|
||||
mutation unfreeze_upgrading(schema_ptr schema, const column_mapping& cm) const;
|
||||
|
||||
// Consumes the frozen mutation's content.
|
||||
//
|
||||
// The consume operation is stoppable:
|
||||
// * To stop, return stop_iteration::yes from one of the consume() methods;
|
||||
// * The consume will now stop and return;
|
||||
//
|
||||
// Note that `consume_end_of_partition()` and `consume_end_of_stream()`
|
||||
// will be called each time the consume is stopping, regardless of whether
|
||||
// you are pausing or the consumption is ending for good.
|
||||
template<FlattenedConsumerV2 Consumer>
|
||||
auto consume(schema_ptr s, Consumer& consumer) const -> frozen_mutation_consume_result<decltype(consumer.consume_end_of_stream())>;
|
||||
|
||||
template<FlattenedConsumerV2 Consumer>
|
||||
auto consume(schema_ptr s, frozen_mutation_consumer_adaptor<Consumer>& adaptor) const -> frozen_mutation_consume_result<decltype(adaptor.consumer().consume_end_of_stream())>;
|
||||
|
||||
struct printer {
|
||||
const frozen_mutation& self;
|
||||
schema_ptr schema;
|
||||
@@ -123,3 +257,21 @@ public:
|
||||
|
||||
frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf);
|
||||
|
||||
template<FlattenedConsumerV2 Consumer>
|
||||
auto frozen_mutation::consume(schema_ptr s, frozen_mutation_consumer_adaptor<Consumer>& adaptor) const -> frozen_mutation_consume_result<decltype(adaptor.consumer().consume_end_of_stream())> {
|
||||
check_schema_version(schema_version(), *s);
|
||||
try {
|
||||
adaptor.on_new_partition(_pk);
|
||||
partition().accept(s->get_column_mapping(), adaptor);
|
||||
return adaptor.on_end_of_partition();
|
||||
} catch (...) {
|
||||
std::throw_with_nested(std::runtime_error(format(
|
||||
"frozen_mutation::consume(): failed consuming mutation {} of {}.{}", key(), s->ks_name(), s->cf_name())));
|
||||
}
|
||||
}
|
||||
|
||||
template<FlattenedConsumerV2 Consumer>
|
||||
auto frozen_mutation::consume(schema_ptr s, Consumer& consumer) const -> frozen_mutation_consume_result<decltype(consumer.consume_end_of_stream())> {
|
||||
frozen_mutation_consumer_adaptor adaptor(s, consumer);
|
||||
return consume(s, adaptor);
|
||||
}
|
||||
|
||||
@@ -2073,10 +2073,21 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
max_partitions, query_result_builder(*s, builder));
|
||||
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
|
||||
|
||||
for (const partition& p : r.partitions()) {
|
||||
const auto res = p.mut().unfreeze(s).consume(consumer, reverse);
|
||||
if (res.stop == stop_iteration::yes) {
|
||||
break;
|
||||
// FIXME: frozen_mutation::consume supports only forward consumers
|
||||
if (reverse == consume_in_reverse::no) {
|
||||
frozen_mutation_consumer_adaptor adaptor(s, consumer);
|
||||
for (const partition& p : r.partitions()) {
|
||||
const auto res = p.mut().consume(s, adaptor);
|
||||
if (res.stop == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (const partition& p : r.partitions()) {
|
||||
const auto res = p.mut().unfreeze(s).consume(consumer, reverse);
|
||||
if (res.stop == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (r.is_short_read()) {
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include "schema_builder.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "mutation_rebuilder.hh"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
@@ -558,3 +559,21 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_consumer) {
|
||||
random_mutation_generator gen(random_mutation_generator::generate_counters::no);
|
||||
schema_ptr s = gen.schema();
|
||||
std::vector<mutation> mutations = gen(1);
|
||||
const mutation& m = mutations[0];
|
||||
frozen_mutation fm = freeze(m);
|
||||
|
||||
// sanity check unfreeze first
|
||||
mutation um = fm.unfreeze(s);
|
||||
BOOST_REQUIRE_EQUAL(um, m);
|
||||
|
||||
// Rebuild mutation by consuming from the frozen_mutation
|
||||
mutation_rebuilder_v2 rebuilder(s);
|
||||
auto res = fm.consume(s, rebuilder);
|
||||
BOOST_REQUIRE(res.result);
|
||||
const auto& rebuilt = *res.result;
|
||||
BOOST_REQUIRE_EQUAL(rebuilt, m);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user