diff --git a/frozen_mutation.hh b/frozen_mutation.hh index 0c085ef413..b409191767 100644 --- a/frozen_mutation.hh +++ b/frozen_mutation.hh @@ -11,15 +11,134 @@ #include "dht/i_partitioner.hh" #include "replica/database_fwd.hh" #include "mutation_fragment.hh" +#include "mutation_fragment_v2.hh" +#include "mutation_partition_view.hh" +#include "mutation_consumer_concepts.hh" +#include "range_tombstone_change_generator.hh" class mutation; -class mutation_partition_view; class flat_mutation_reader_v2; namespace ser { class mutation_view; } +template +struct frozen_mutation_consume_result { + stop_iteration stop; + Result result; +}; + +template<> +struct frozen_mutation_consume_result { + stop_iteration stop; +}; + +// mutation_partition_view visitor which consumes a frozen_mutation. +template +class frozen_mutation_consumer_adaptor final : public mutation_partition_view_virtual_visitor { +private: + const schema& _schema; + std::optional _dk; + lazy_row _static_row; + range_tombstone_change_generator _rt_gen; + alloc_strategy_unique_ptr _current_row_entry; + deletable_row* _current_row = nullptr; + Consumer& _consumer; + stop_iteration _stop_consuming = stop_iteration::no; + + void flush_rows_and_tombstones(position_in_partition_view pos) { + if (!_static_row.empty()) { + auto row = std::move(_static_row.get_existing()); + if (!_stop_consuming) { + _stop_consuming = _consumer.consume(static_row(std::move(row))); + } + } + _rt_gen.flush(pos, [this] (range_tombstone_change rt) { + if (!_stop_consuming) { + _stop_consuming = _consumer.consume(std::move(rt)); + } + }); + if (_current_row) { + auto row_entry = std::move(_current_row_entry); + _current_row = nullptr; + if (!_stop_consuming) { + _stop_consuming = _consumer.consume(clustering_row(std::move(*row_entry))); + } + } + } + +public: + frozen_mutation_consumer_adaptor(schema_ptr s, Consumer& consumer) + : _schema(*s) + , _rt_gen(_schema) + , _consumer(consumer) + { + } + + Consumer& consumer() { + return _consumer; + } + + void on_new_partition(const partition_key& key) { + _rt_gen.reset(); + _dk = dht::decorate_key(_schema, key); + _consumer.consume_new_partition(*_dk); + } + + virtual void accept_partition_tombstone(tombstone t) override { + _consumer.consume(t); + } + + virtual void accept_static_cell(column_id id, atomic_cell cell) override { + row& r = _static_row.maybe_create(); + r.append_cell(id, atomic_cell_or_collection(std::move(cell))); + } + + virtual void accept_static_cell(column_id id, collection_mutation_view collection) override { + row& r = _static_row.maybe_create(); + r.append_cell(id, collection_mutation(*_schema.static_column_at(id).type, std::move(collection))); + } + + virtual void accept_row_tombstone(range_tombstone rt) override { + flush_rows_and_tombstones(rt.position()); + _rt_gen.consume(std::move(rt)); + } + + virtual void accept_row(position_in_partition_view key, row_tombstone deleted_at, row_marker rm, is_dummy dummy, is_continuous continuous) override { + flush_rows_and_tombstones(key); + _current_row_entry = alloc_strategy_unique_ptr(current_allocator().construct(_schema, key, dummy, continuous)); + deletable_row& r = _current_row_entry->row(); + r.apply(rm); + r.apply(deleted_at); + _current_row = &r; + } + + void accept_row_cell(column_id id, atomic_cell cell) override { + row& r = _current_row->cells(); + r.append_cell(id, std::move(cell)); + } + + virtual void accept_row_cell(column_id id, collection_mutation_view collection) override { + row& r = _current_row->cells(); + r.append_cell(id, collection_mutation(*_schema.regular_column_at(id).type, std::move(collection))); + } + + auto on_end_of_partition() { + flush_rows_and_tombstones(position_in_partition::after_all_clustered_rows()); + if (_consumer.consume_end_of_partition()) { + _stop_consuming = stop_iteration::yes; + } + using consume_res_type = decltype(_consumer.consume_end_of_stream()); + if constexpr (std::is_same_v) { + _consumer.consume_end_of_stream(); + return frozen_mutation_consume_result{_stop_consuming}; + } else { + return frozen_mutation_consume_result{_stop_consuming, _consumer.consume_end_of_stream()}; + } + } +}; + // Immutable, compact form of mutation. // // This form is primarily destined to be sent over the network channel. @@ -59,6 +178,21 @@ public: // Automatically upgrades the stored mutation to the supplied schema with custom column mapping. mutation unfreeze_upgrading(schema_ptr schema, const column_mapping& cm) const; + // Consumes the frozen mutation's content. + // + // The consume operation is stoppable: + // * To stop, return stop_iteration::yes from one of the consume() methods; + // * The consume will now stop and return; + // + // Note that `consume_end_of_partition()` and `consume_end_of_stream()` + // will be called each time the consume is stopping, regardless of whether + // you are pausing or the consumption is ending for good. + template + auto consume(schema_ptr s, Consumer& consumer) const -> frozen_mutation_consume_result; + + template + auto consume(schema_ptr s, frozen_mutation_consumer_adaptor& adaptor) const -> frozen_mutation_consume_result; + struct printer { const frozen_mutation& self; schema_ptr schema; @@ -123,3 +257,21 @@ public: frozen_mutation_fragment freeze(const schema& s, const mutation_fragment& mf); +template +auto frozen_mutation::consume(schema_ptr s, frozen_mutation_consumer_adaptor& adaptor) const -> frozen_mutation_consume_result { + check_schema_version(schema_version(), *s); + try { + adaptor.on_new_partition(_pk); + partition().accept(s->get_column_mapping(), adaptor); + return adaptor.on_end_of_partition(); + } catch (...) { + std::throw_with_nested(std::runtime_error(format( + "frozen_mutation::consume(): failed consuming mutation {} of {}.{}", key(), s->ks_name(), s->cf_name()))); + } +} + +template +auto frozen_mutation::consume(schema_ptr s, Consumer& consumer) const -> frozen_mutation_consume_result { + frozen_mutation_consumer_adaptor adaptor(s, consumer); + return consume(s, adaptor); +} diff --git a/mutation_partition.cc b/mutation_partition.cc index d821ec54f2..d8d22574fb 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2073,10 +2073,21 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa max_partitions, query_result_builder(*s, builder)); const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no; - for (const partition& p : r.partitions()) { - const auto res = p.mut().unfreeze(s).consume(consumer, reverse); - if (res.stop == stop_iteration::yes) { - break; + // FIXME: frozen_mutation::consume supports only forward consumers + if (reverse == consume_in_reverse::no) { + frozen_mutation_consumer_adaptor adaptor(s, consumer); + for (const partition& p : r.partitions()) { + const auto res = p.mut().consume(s, adaptor); + if (res.stop == stop_iteration::yes) { + break; + } + } + } else { + for (const partition& p : r.partitions()) { + const auto res = p.mut().unfreeze(s).consume(consumer, reverse); + if (res.stop == stop_iteration::yes) { + break; + } } } if (r.is_short_read()) { diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 9141640049..6c56858e2a 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -28,6 +28,7 @@ #include "schema_builder.hh" #include "partition_slice_builder.hh" #include "readers/from_mutations_v2.hh" +#include "mutation_rebuilder.hh" using namespace std::literals::chrono_literals; @@ -558,3 +559,21 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) { BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory()); } +SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_consumer) { + random_mutation_generator gen(random_mutation_generator::generate_counters::no); + schema_ptr s = gen.schema(); + std::vector mutations = gen(1); + const mutation& m = mutations[0]; + frozen_mutation fm = freeze(m); + + // sanity check unfreeze first + mutation um = fm.unfreeze(s); + BOOST_REQUIRE_EQUAL(um, m); + + // Rebuild mutation by consuming from the frozen_mutation + mutation_rebuilder_v2 rebuilder(s); + auto res = fm.consume(s, rebuilder); + BOOST_REQUIRE(res.result); + const auto& rebuilt = *res.result; + BOOST_REQUIRE_EQUAL(rebuilt, m); +}