Unfreeze the frozen_mutation, possibly splitting it based on max_rows. The process_mutation function is called for each split mutation. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
168 lines
5.8 KiB
C++
168 lines
5.8 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "schema/schema.hh"
|
|
#include "mutation/mutation.hh"
|
|
#include "mutation/mutation_partition.hh"
|
|
#include "mutation/mutation_partition_visitor.hh"
|
|
#include "mutation/tombstone.hh"
|
|
#include "mutation/atomic_cell.hh"
|
|
#include "mutation/range_tombstone.hh"
|
|
#include "mutation/collection_mutation.hh"
|
|
|
|
// Partition visitor which builds mutation_partition corresponding to the data its fed with.
|
|
class partition_builder final : public mutation_partition_visitor {
|
|
private:
|
|
const schema& _schema;
|
|
mutation_partition& _partition;
|
|
deletable_row* _current_row;
|
|
public:
|
|
// @p will hold the result of building.
|
|
// @p must be empty.
|
|
partition_builder(const schema& s, mutation_partition& p)
|
|
: _schema(s)
|
|
, _partition(p)
|
|
{ }
|
|
|
|
virtual void accept_partition_tombstone(tombstone t) override {
|
|
_partition.apply(t);
|
|
}
|
|
|
|
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
|
|
auto& cdef = _schema.static_column_at(id);
|
|
accept_static_cell(id, atomic_cell(*cdef.type, cell));
|
|
}
|
|
|
|
void accept_static_cell(column_id id, atomic_cell&& cell) {
|
|
row& r = _partition.static_row().maybe_create();
|
|
r.append_cell(id, atomic_cell_or_collection(std::move(cell)));
|
|
}
|
|
|
|
virtual void accept_static_cell(column_id id, collection_mutation_view collection) override {
|
|
row& r = _partition.static_row().maybe_create();
|
|
r.append_cell(id, collection_mutation(*_schema.static_column_at(id).type, std::move(collection)));
|
|
}
|
|
|
|
virtual void accept_row_tombstone(const range_tombstone& rt) override {
|
|
_partition.apply_row_tombstone(_schema, rt);
|
|
}
|
|
|
|
virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
|
|
deletable_row& r = _partition.append_clustered_row(_schema, key, dummy, continuous);
|
|
r.apply(rm);
|
|
r.apply(deleted_at);
|
|
_current_row = &r;
|
|
}
|
|
|
|
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
|
|
auto& cdef = _schema.regular_column_at(id);
|
|
accept_row_cell(id, atomic_cell(*cdef.type, cell));
|
|
}
|
|
|
|
void accept_row_cell(column_id id, atomic_cell&& cell) {
|
|
row& r = _current_row->cells();
|
|
r.append_cell(id, std::move(cell));
|
|
}
|
|
|
|
virtual void accept_row_cell(column_id id, collection_mutation_view collection) override {
|
|
row& r = _current_row->cells();
|
|
r.append_cell(id, collection_mutation(*_schema.regular_column_at(id).type, std::move(collection)));
|
|
}
|
|
};
|
|
|
|
class partition_split_builder final : public async_mutation_partition_visitor {
|
|
schema_ptr _schema;
|
|
partition_key_view _key;
|
|
size_t _max_rows;
|
|
size_t _current_rows = 0;
|
|
std::function<future<>(mutation)> _process_mutation;
|
|
|
|
std::optional<mutation> _m;
|
|
mutation_partition* _partition;
|
|
deletable_row* _current_row;
|
|
|
|
void make_mutation() {
|
|
_m = mutation(_schema, _key);
|
|
_partition = &_m->partition();
|
|
_current_row = nullptr;
|
|
_current_rows = 0;
|
|
}
|
|
|
|
future<> maybe_process_mutation() {
|
|
if (_current_rows >= _max_rows) {
|
|
co_await _process_mutation(std::move(*_m));
|
|
make_mutation();
|
|
}
|
|
}
|
|
|
|
public:
|
|
partition_split_builder(schema_ptr s, partition_key_view key, size_t max_rows, std::function<future<>(mutation)> process_mutation)
|
|
: _schema(s)
|
|
, _key(key)
|
|
, _max_rows(max_rows)
|
|
, _process_mutation(process_mutation)
|
|
{
|
|
make_mutation();
|
|
}
|
|
|
|
virtual void accept_partition_tombstone(tombstone t) override {
|
|
_partition->apply(t);
|
|
}
|
|
|
|
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
|
|
auto& cdef = _schema->static_column_at(id);
|
|
accept_static_cell(id, atomic_cell(*cdef.type, cell));
|
|
}
|
|
|
|
void accept_static_cell(column_id id, atomic_cell&& cell) {
|
|
row& r = _partition->static_row().maybe_create();
|
|
r.append_cell(id, atomic_cell_or_collection(std::move(cell)));
|
|
}
|
|
|
|
virtual void accept_static_cell(column_id id, collection_mutation_view collection) override {
|
|
row& r = _partition->static_row().maybe_create();
|
|
r.append_cell(id, collection_mutation(*_schema->static_column_at(id).type, std::move(collection)));
|
|
}
|
|
|
|
virtual future<> accept_row_tombstone(const range_tombstone& rt) override {
|
|
co_await maybe_process_mutation();
|
|
_partition->apply_row_tombstone(*_schema, rt);
|
|
++_current_rows;
|
|
}
|
|
|
|
virtual future<> accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
|
|
co_await maybe_process_mutation();
|
|
deletable_row& r = _partition->append_clustered_row(*_schema, key, dummy, continuous);
|
|
r.apply(rm);
|
|
r.apply(deleted_at);
|
|
_current_row = &r;
|
|
++_current_rows;
|
|
}
|
|
|
|
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
|
|
auto& cdef = _schema->regular_column_at(id);
|
|
accept_row_cell(id, atomic_cell(*cdef.type, cell));
|
|
}
|
|
|
|
void accept_row_cell(column_id id, atomic_cell&& cell) {
|
|
row& r = _current_row->cells();
|
|
r.append_cell(id, std::move(cell));
|
|
}
|
|
|
|
virtual void accept_row_cell(column_id id, collection_mutation_view collection) override {
|
|
row& r = _current_row->cells();
|
|
r.append_cell(id, collection_mutation(*_schema->regular_column_at(id).type, std::move(collection)));
|
|
}
|
|
|
|
virtual future<> accept_end_of_partition() override {
|
|
return _process_mutation(std::move(*_m));
|
|
}
|
|
};
|