mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-09 16:33:35 +00:00
When compacting a mutation fragment stream (e.g. for sstable compaction, data query, repair), the compactor needs to accumulate range tombstones which are relevant for the yet-to-be-processed range. See range_tombstone_accumulator. One problem is that it has unbounded memory footprint because the accumulator needs to keep track of all the tombstoned ranges which are still active. Another, although more benign, problem is computational complexity needed to maintain that data structure. The fix is to get rid of the overlap of range tombstones in the mutation fragment stream. In v2 of the stream, there is no longer a range_tombstone fragment. Deletions of ranges of rows within a given partition are represented with range_tombstone_change fragments. At any point in the stream there is a single active clustered tombstone. It is initially equal to the neutral tombstone when the stream of each partition starts. The range_tombstone_change fragment type signify changes of the active clustered tombstone. All fragments emitted while a given clustered tombstone is active are affected by that tombstone. Like with the old range_tombstone fragments, the clustered tombstone is independent from the partition tombstone carried in partition_start. The v2 stream is strict about range tombstone trimming. It emits range tombstone changes which reflect range tombstones trimmed to query restrictions, and fast-forwarding ranges. This makes the stream more canonical, meaning that for a given set of writes, querying the database should produce the same stream of fragments for a given restrictions. There is less ambiguity in how the writes are represented in the fragment stream. It wasn't the case with v1. For example, A given set of deletions could be produced either as one range_tombstone, or may, split and/or deoverlapped with other fragments. Making a stream canonical is easier for diff-calculating. The classes related to mutation fragment streams were cloned: flat_mutation_reader_v2, mutation_fragment_v2, and related concepts. Refs #8625.
123 lines
4.7 KiB
C++
123 lines
4.7 KiB
C++
/*
|
|
* Copyright (C) 2017-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "mutation_fragment.hh"
|
|
#include "converting_mutation_partition_applier.hh"
|
|
|
|
// A StreamedMutationTransformer which transforms the stream to a different schema
|
|
class schema_upgrader {
|
|
schema_ptr _prev;
|
|
schema_ptr _new;
|
|
std::optional<reader_permit> _permit;
|
|
private:
|
|
row transform(row&& r, column_kind kind) {
|
|
row new_row;
|
|
r.for_each_cell([&] (column_id id, atomic_cell_or_collection& cell) {
|
|
const column_definition& col = _prev->column_at(kind, id);
|
|
const column_definition* new_col = _new->get_column_definition(col.name());
|
|
if (new_col) {
|
|
converting_mutation_partition_applier::append_cell(new_row, kind, *new_col, col, std::move(cell));
|
|
}
|
|
});
|
|
return new_row;
|
|
}
|
|
public:
|
|
schema_upgrader(schema_ptr s)
|
|
: _new(std::move(s))
|
|
{ }
|
|
schema_ptr operator()(schema_ptr old) {
|
|
_prev = std::move(old);
|
|
return _new;
|
|
}
|
|
mutation_fragment consume(static_row&& row) {
|
|
return mutation_fragment(*_new, std::move(*_permit), static_row(transform(std::move(row.cells()), column_kind::static_column)));
|
|
}
|
|
mutation_fragment consume(clustering_row&& row) {
|
|
return mutation_fragment(*_new, std::move(*_permit), clustering_row(row.key(), row.tomb(), row.marker(),
|
|
transform(std::move(row.cells()), column_kind::regular_column)));
|
|
}
|
|
mutation_fragment consume(range_tombstone&& rt) {
|
|
return mutation_fragment(*_new, std::move(*_permit), std::move(rt));
|
|
}
|
|
mutation_fragment consume(partition_start&& ph) {
|
|
return mutation_fragment(*_new, std::move(*_permit), std::move(ph));
|
|
}
|
|
mutation_fragment consume(partition_end&& eop) {
|
|
return mutation_fragment(*_new, std::move(*_permit), std::move(eop));
|
|
}
|
|
mutation_fragment operator()(mutation_fragment&& mf) {
|
|
_permit = mf.permit();
|
|
return std::move(mf).consume(*this);
|
|
}
|
|
};
|
|
|
|
// A StreamedMutationTransformer which transforms the stream to a different schema
|
|
class schema_upgrader_v2 {
|
|
schema_ptr _prev;
|
|
schema_ptr _new;
|
|
std::optional<reader_permit> _permit;
|
|
private:
|
|
row transform(row&& r, column_kind kind) {
|
|
row new_row;
|
|
r.for_each_cell([&] (column_id id, atomic_cell_or_collection& cell) {
|
|
const column_definition& col = _prev->column_at(kind, id);
|
|
const column_definition* new_col = _new->get_column_definition(col.name());
|
|
if (new_col) {
|
|
converting_mutation_partition_applier::append_cell(new_row, kind, *new_col, col, std::move(cell));
|
|
}
|
|
});
|
|
return new_row;
|
|
}
|
|
public:
|
|
schema_upgrader_v2(schema_ptr s)
|
|
: _new(std::move(s))
|
|
{ }
|
|
schema_ptr operator()(schema_ptr old) {
|
|
_prev = std::move(old);
|
|
return _new;
|
|
}
|
|
mutation_fragment_v2 consume(static_row&& row) {
|
|
return mutation_fragment_v2(*_new, std::move(*_permit), static_row(transform(std::move(row.cells()), column_kind::static_column)));
|
|
}
|
|
mutation_fragment_v2 consume(clustering_row&& row) {
|
|
return mutation_fragment_v2(*_new, std::move(*_permit), clustering_row(row.key(), row.tomb(), row.marker(),
|
|
transform(std::move(row.cells()), column_kind::regular_column)));
|
|
}
|
|
mutation_fragment_v2 consume(range_tombstone_change&& rt) {
|
|
return mutation_fragment_v2(*_new, std::move(*_permit), std::move(rt));
|
|
}
|
|
mutation_fragment_v2 consume(partition_start&& ph) {
|
|
return mutation_fragment_v2(*_new, std::move(*_permit), std::move(ph));
|
|
}
|
|
mutation_fragment_v2 consume(partition_end&& eop) {
|
|
return mutation_fragment_v2(*_new, std::move(*_permit), std::move(eop));
|
|
}
|
|
mutation_fragment_v2 operator()(mutation_fragment_v2&& mf) {
|
|
_permit = mf.permit();
|
|
return std::move(mf).consume(*this);
|
|
}
|
|
};
|
|
|
|
static_assert(StreamedMutationTranformer<schema_upgrader>);
|
|
static_assert(StreamedMutationTranformerV2<schema_upgrader_v2>);
|