Files
scylladb/mutation/mutation_fragment.hh
Radosław Cybulski 90ebea5ebb Move mutation_fragment::kind into data object
Move `mutation_fragment::kind` enum into data object, reducing size
of the object from 16 to 8 bytes on current machines.
2025-05-26 11:06:54 +02:00

604 lines
23 KiB
C++

/*
* Copyright (C) 2016-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "mutation_partition.hh"
#include "position_in_partition.hh"
#include <optional>
#include <seastar/util/optimized_optional.hh>
#include "reader_permit.hh"
#include "mutation_fragment_fwd.hh"
#include "mutation/mutation_partition.hh"
// mutation_fragments are the objects that streamed_mutation are going to
// stream. They can represent:
// - a static row
// - a clustering row
// - a range tombstone
//
// There exists an ordering (implemented in position_in_partition class) between
// mutation_fragment objects. It reflects the order in which content of
// partition appears in the sstables.
class clustering_row {
clustering_key_prefix _ck;
deletable_row _row;
public:
explicit clustering_row(clustering_key_prefix ck) : _ck(std::move(ck)) { }
clustering_row(clustering_key_prefix ck, row_tombstone t, row_marker marker, row cells)
: _ck(std::move(ck)), _row(std::move(t), std::move(marker), std::move(cells)) {
_row.maybe_shadow();
}
clustering_row(clustering_key_prefix ck, deletable_row&& row)
: _ck(std::move(ck)), _row(std::move(row)) { }
clustering_row(const schema& s, const clustering_row& other)
: _ck(other._ck), _row(s, other._row) { }
clustering_row(const schema& s, const rows_entry& re)
: _ck(re.key()), _row(s, re.row()) { }
clustering_row(rows_entry&& re)
: _ck(std::move(re.key())), _row(std::move(re.row())) {}
clustering_key_prefix& key() { return _ck; }
const clustering_key_prefix& key() const { return _ck; }
void remove_tombstone() { _row.remove_tombstone(); }
row_tombstone tomb() const { return _row.deleted_at(); }
const row_marker& marker() const { return _row.marker(); }
row_marker& marker() { return _row.marker(); }
const row& cells() const { return _row.cells(); }
row& cells() { return _row.cells(); }
bool empty() const { return _row.empty(); }
bool is_live(const schema& s, tombstone base_tombstone = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const {
return _row.is_live(s, column_kind::regular_column, std::move(base_tombstone), std::move(now));
}
void apply(const schema& s, clustering_row&& cr) {
_row.apply(s, std::move(cr._row));
}
void apply(const schema& s, const clustering_row& cr) {
_row.apply(s, deletable_row(s, cr._row));
}
void set_cell(const column_definition& def, atomic_cell_or_collection&& value) {
_row.cells().apply(def, std::move(value));
}
void apply(row_marker rm) {
_row.apply(std::move(rm));
}
void apply(tombstone t) {
_row.apply(std::move(t));
}
void apply(shadowable_tombstone t) {
_row.apply(std::move(t));
}
void apply(const schema& s, const rows_entry& r) {
_row.apply(s, deletable_row(s, r.row()));
}
void apply(const schema& s, const deletable_row& r) {
_row.apply(s, r);
}
void apply(const schema& our_schema, const schema& their_schema, const deletable_row& r) {
_row.apply(our_schema, their_schema, r);
}
position_in_partition_view position() const;
size_t external_memory_usage(const schema& s) const {
return _ck.external_memory_usage() + _row.cells().external_memory_usage(s, column_kind::regular_column);
}
size_t minimal_external_memory_usage(const schema& s) const {
return _ck.minimal_external_memory_usage() + _row.cells().external_memory_usage(s, column_kind::regular_column);
}
size_t memory_usage(const schema& s) const {
return sizeof(clustering_row) + external_memory_usage(s);
}
bool equal(const schema& s, const clustering_row& other) const {
return _ck.equal(s, other._ck)
&& _row.equal(column_kind::regular_column, s, other._row, s);
}
class printer {
const schema& _schema;
const clustering_row& _clustering_row;
public:
printer(const schema& s, const clustering_row& r) : _schema(s), _clustering_row(r) { }
printer(const printer&) = delete;
printer(printer&&) = delete;
friend fmt::formatter<printer>;
};
friend fmt::formatter<printer>;
deletable_row as_deletable_row() && { return std::move(_row); }
const deletable_row& as_deletable_row() const & { return _row; }
};
class static_row {
row _cells;
public:
static_row() = default;
static_row(const schema& s, const static_row& other) : static_row(s, other._cells) { }
explicit static_row(const schema& s, const row& r) : _cells(s, column_kind::static_column, r) { }
explicit static_row(row&& r) : _cells(std::move(r)) { }
row& cells() { return _cells; }
const row& cells() const { return _cells; }
bool empty() const {
return _cells.empty();
}
bool is_live(const schema& s, gc_clock::time_point now = gc_clock::time_point::min()) const {
return _cells.is_live(s, column_kind::static_column, tombstone(), now);
}
void apply(const schema& s, const row& r) {
_cells.apply(s, column_kind::static_column, r);
}
void apply(const schema& s, static_row&& sr) {
_cells.apply(s, column_kind::static_column, std::move(sr._cells));
}
void set_cell(const column_definition& def, atomic_cell_or_collection&& value) {
_cells.apply(def, std::move(value));
}
position_in_partition_view position() const;
size_t external_memory_usage(const schema& s) const {
return _cells.external_memory_usage(s, column_kind::static_column);
}
size_t memory_usage(const schema& s) const {
return sizeof(static_row) + external_memory_usage(s);
}
bool equal(const schema& s, const static_row& other) const {
return _cells.equal(column_kind::static_column, s, other._cells, s);
}
class printer {
const schema& _schema;
const static_row& _static_row;
public:
printer(const schema& s, const static_row& r) : _schema(s), _static_row(r) { }
printer(const printer&) = delete;
printer(printer&&) = delete;
friend fmt::formatter<printer>;
};
friend fmt::formatter<printer>;
};
class partition_start final {
dht::decorated_key _key;
tombstone _partition_tombstone;
public:
partition_start(dht::decorated_key pk, tombstone pt)
: _key(std::move(pk))
, _partition_tombstone(std::move(pt))
{ }
dht::decorated_key& key() { return _key; }
const dht::decorated_key& key() const { return _key; }
const tombstone& partition_tombstone() const { return _partition_tombstone; }
tombstone& partition_tombstone() { return _partition_tombstone; }
position_in_partition_view position() const;
size_t external_memory_usage(const schema&) const {
return _key.external_memory_usage();
}
size_t memory_usage(const schema& s) const {
return sizeof(partition_start) + external_memory_usage(s);
}
bool equal(const schema& s, const partition_start& other) const {
return _key.equal(s, other._key) && _partition_tombstone == other._partition_tombstone;
}
friend fmt::formatter<partition_start>;
};
class partition_end final {
public:
position_in_partition_view position() const;
size_t external_memory_usage(const schema&) const {
return 0;
}
size_t memory_usage(const schema& s) const {
return sizeof(partition_end) + external_memory_usage(s);
}
bool equal(const schema& s, const partition_end& other) const {
return true;
}
};
template<typename T, typename ReturnType>
concept MutationFragmentConsumer =
requires(T& t, static_row sr, clustering_row cr, range_tombstone rt, partition_start ph, partition_end pe) {
{ t.consume(std::move(sr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(cr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(rt)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(ph)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(pe)) } -> std::same_as<ReturnType>;
};
template<typename T, typename ReturnType>
concept MutationFragmentVisitor =
requires(T t, const static_row& sr, const clustering_row& cr, const range_tombstone& rt, const partition_start& ph, const partition_end& eop) {
{ t(sr) } -> std::same_as<ReturnType>;
{ t(cr) } -> std::same_as<ReturnType>;
{ t(rt) } -> std::same_as<ReturnType>;
{ t(ph) } -> std::same_as<ReturnType>;
{ t(eop) } -> std::same_as<ReturnType>;
};
class mutation_fragment {
public:
enum class kind : std::uint8_t {
static_row,
clustering_row,
range_tombstone,
partition_start,
partition_end,
};
private:
struct data {
data(reader_permit permit, kind _kind) : _memory(permit.consume_memory()), _kind(_kind) { }
~data() { }
reader_permit::resource_units _memory;
kind _kind;
union {
static_row _static_row;
clustering_row _clustering_row;
range_tombstone _range_tombstone;
partition_start _partition_start;
partition_end _partition_end;
};
};
private:
std::unique_ptr<data> _data;
mutation_fragment() = default;
explicit operator bool() const noexcept { return bool(_data); }
void destroy_data() noexcept;
void reset_memory(const schema& s, std::optional<reader_resources> res = {});
friend class optimized_optional<mutation_fragment>;
friend class position_in_partition;
public:
struct clustering_row_tag_t { };
template<typename... Args>
mutation_fragment(clustering_row_tag_t, const schema& s, reader_permit permit, Args&&... args)
: _data(std::make_unique<data>(std::move(permit), kind::clustering_row))
{
new (&_data->_clustering_row) clustering_row(std::forward<Args>(args)...);
reset_memory(s);
}
mutation_fragment(const schema& s, reader_permit permit, static_row&& r);
mutation_fragment(const schema& s, reader_permit permit, clustering_row&& r);
mutation_fragment(const schema& s, reader_permit permit, range_tombstone&& r);
mutation_fragment(const schema& s, reader_permit permit, partition_start&& r);
mutation_fragment(const schema& s, reader_permit permit, partition_end&& r);
mutation_fragment(const schema& s, reader_permit permit, const mutation_fragment& o)
: _data(std::make_unique<data>(std::move(permit), o._data->_kind)) {
switch (_data->_kind) {
case kind::static_row:
new (&_data->_static_row) static_row(s, o._data->_static_row);
break;
case kind::clustering_row:
new (&_data->_clustering_row) clustering_row(s, o._data->_clustering_row);
break;
case kind::range_tombstone:
new (&_data->_range_tombstone) range_tombstone(o._data->_range_tombstone);
break;
case kind::partition_start:
new (&_data->_partition_start) partition_start(o._data->_partition_start);
break;
case kind::partition_end:
new (&_data->_partition_end) partition_end(o._data->_partition_end);
break;
}
reset_memory(s, o._data->_memory.resources());
}
mutation_fragment(mutation_fragment&& other) = default;
mutation_fragment& operator=(mutation_fragment&& other) noexcept {
if (this != &other) {
this->~mutation_fragment();
new (this) mutation_fragment(std::move(other));
}
return *this;
}
[[gnu::always_inline]]
~mutation_fragment() {
if (_data) {
destroy_data();
}
}
position_in_partition_view position() const;
// Returns the range of positions for which this fragment holds relevant information.
position_range range(const schema& s) const;
// Checks if this fragment may be relevant for any range starting at given position.
bool relevant_for_range(const schema& s, position_in_partition_view pos) const;
// Like relevant_for_range() but makes use of assumption that pos is greater
// than the starting position of this fragment.
bool relevant_for_range_assuming_after(const schema& s, position_in_partition_view pos) const;
bool has_key() const { return is_clustering_row() || is_range_tombstone(); }
// Requirements: has_key() == true
const clustering_key_prefix& key() const;
kind mutation_fragment_kind() const { return _data->_kind; }
bool is_static_row() const { return _data->_kind == kind::static_row; }
bool is_clustering_row() const { return _data->_kind == kind::clustering_row; }
bool is_range_tombstone() const { return _data->_kind == kind::range_tombstone; }
bool is_partition_start() const { return _data->_kind == kind::partition_start; }
bool is_end_of_partition() const { return _data->_kind == kind::partition_end; }
void mutate_as_static_row(const schema& s, std::invocable<static_row&> auto&& fn) {
fn(_data->_static_row);
reset_memory(s);
}
void mutate_as_clustering_row(const schema& s, std::invocable<clustering_row&> auto&& fn) {
fn(_data->_clustering_row);
reset_memory(s);
}
void mutate_as_range_tombstone(const schema& s, std::invocable<range_tombstone&> auto&& fn) {
fn(_data->_range_tombstone);
reset_memory(s);
}
void mutate_as_partition_start(const schema& s, std::invocable<partition_start&> auto&& fn) {
fn(_data->_partition_start);
reset_memory(s);
}
static_row&& as_static_row() && { return std::move(_data->_static_row); }
clustering_row&& as_clustering_row() && { return std::move(_data->_clustering_row); }
range_tombstone&& as_range_tombstone() && { return std::move(_data->_range_tombstone); }
partition_start&& as_partition_start() && { return std::move(_data->_partition_start); }
partition_end&& as_end_of_partition() && { return std::move(_data->_partition_end); }
const static_row& as_static_row() const & { return _data->_static_row; }
const clustering_row& as_clustering_row() const & { return _data->_clustering_row; }
const range_tombstone& as_range_tombstone() const & { return _data->_range_tombstone; }
const partition_start& as_partition_start() const & { return _data->_partition_start; }
const partition_end& as_end_of_partition() const & { return _data->_partition_end; }
// Requirements: mergeable_with(mf)
void apply(const schema& s, mutation_fragment&& mf);
template<typename Consumer>
requires MutationFragmentConsumer<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone>()))>
decltype(auto) consume(Consumer& consumer) && {
_data->_memory.reset_to_zero();
switch (_data->_kind) {
case kind::static_row:
return consumer.consume(std::move(_data->_static_row));
case kind::clustering_row:
return consumer.consume(std::move(_data->_clustering_row));
case kind::range_tombstone:
return consumer.consume(std::move(_data->_range_tombstone));
case kind::partition_start:
return consumer.consume(std::move(_data->_partition_start));
case kind::partition_end:
return consumer.consume(std::move(_data->_partition_end));
}
abort();
}
template<typename Visitor>
requires MutationFragmentVisitor<Visitor, decltype(std::declval<Visitor>()(std::declval<static_row&>()))>
decltype(auto) visit(Visitor&& visitor) const {
switch (_data->_kind) {
case kind::static_row:
return visitor(as_static_row());
case kind::clustering_row:
return visitor(as_clustering_row());
case kind::range_tombstone:
return visitor(as_range_tombstone());
case kind::partition_start:
return visitor(as_partition_start());
case kind::partition_end:
return visitor(as_end_of_partition());
}
abort();
}
size_t memory_usage() const {
return _data->_memory.resources().memory;
}
reader_permit permit() const {
return _data->_memory.permit();
}
bool equal(const schema& s, const mutation_fragment& other) const {
if (other._data->_kind != _data->_kind) {
return false;
}
switch (_data->_kind) {
case kind::static_row:
return as_static_row().equal(s, other.as_static_row());
case kind::clustering_row:
return as_clustering_row().equal(s, other.as_clustering_row());
case kind::range_tombstone:
return as_range_tombstone().equal(s, other.as_range_tombstone());
case kind::partition_start:
return as_partition_start().equal(s, other.as_partition_start());
case kind::partition_end:
return as_end_of_partition().equal(s, other.as_end_of_partition());
}
abort();
}
// Fragments which have the same position() and are mergeable can be
// merged into one fragment with apply() which represents the sum of
// writes represented by each of the fragments.
// Fragments which have the same position() but are not mergeable
// can be emitted one after the other in the stream.
bool mergeable_with(const mutation_fragment& mf) const {
return _data->_kind == mf._data->_kind && _data->_kind != kind::range_tombstone;
}
class printer {
const schema& _schema;
const mutation_fragment& _mutation_fragment;
public:
printer(const schema& s, const mutation_fragment& mf) : _schema(s), _mutation_fragment(mf) { }
printer(const printer&) = delete;
printer(printer&&) = delete;
friend fmt::formatter<printer>;
};
friend fmt::formatter<printer>;
private:
size_t calculate_memory_usage(const schema& s) const {
return sizeof(data) + visit([&s] (auto& mf) -> size_t { return mf.external_memory_usage(s); });
}
};
inline position_in_partition_view static_row::position() const
{
return position_in_partition_view(position_in_partition_view::static_row_tag_t());
}
inline position_in_partition_view clustering_row::position() const
{
return position_in_partition_view(position_in_partition_view::clustering_row_tag_t(), _ck);
}
inline position_in_partition_view partition_start::position() const
{
return position_in_partition_view::for_partition_start();
}
inline position_in_partition_view partition_end::position() const
{
return position_in_partition_view::for_partition_end();
}
// range_tombstone_stream is a helper object that simplifies producing a stream
// of range tombstones and merging it with a stream of clustering rows.
// Tombstones are added using apply() and retrieved using get_next().
//
// get_next(const rows_entry&) and get_next(const mutation_fragment&) allow
// merging the stream of tombstones with a stream of clustering rows. If these
// overloads return disengaged optional it means that there is no tombstone
// in the stream that should be emitted before the object given as an argument.
// (And, consequently, if the optional is engaged that tombstone should be
// emitted first). After calling any of these overloads with a mutation_fragment
// which is at some position in partition P no range tombstone can be added to
// the stream which start bound is before that position.
//
// get_next() overload which doesn't take any arguments is used to return the
// remaining tombstones. After it was called no new tombstones can be added
// to the stream.
class range_tombstone_stream {
const schema& _schema;
reader_permit _permit;
position_in_partition::less_compare _cmp;
range_tombstone_list _list;
private:
mutation_fragment_opt do_get_next();
public:
range_tombstone_stream(const schema& s, reader_permit permit) : _schema(s), _permit(std::move(permit)), _cmp(s), _list(s) { }
mutation_fragment_opt get_next(const rows_entry&);
mutation_fragment_opt get_next(const mutation_fragment&);
// Returns next fragment with position before upper_bound or disengaged optional if no such fragments are left.
mutation_fragment_opt get_next(position_in_partition_view upper_bound);
mutation_fragment_opt get_next();
// Precondition: !empty()
const range_tombstone& peek_next() const;
// Forgets all tombstones which are not relevant for any range starting at given position.
void forward_to(position_in_partition_view);
void apply(range_tombstone&& rt) {
_list.apply(_schema, std::move(rt));
}
void reset();
bool empty() const;
friend fmt::formatter<range_tombstone_stream>;
};
// F gets a stream element as an argument and returns the new value which replaces that element
// in the transformed stream.
template<typename F>
concept StreamedMutationTranformer =
requires(F f, mutation_fragment mf, schema_ptr s) {
{ f(std::move(mf)) } -> std::same_as<mutation_fragment>;
{ f(s) } -> std::same_as<schema_ptr>;
};
class xx_hasher;
template<>
struct appending_hash<mutation_fragment> {
template<typename Hasher>
void operator()(Hasher& h, const mutation_fragment& mf, const schema& s) const;
};
template <> struct fmt::formatter<clustering_row::printer> : fmt::formatter<string_view> {
auto format(const clustering_row::printer& p, fmt::format_context& ctx) const {
auto& row = p._clustering_row;
return fmt::format_to(ctx.out(), "{{clustering_row: ck {} dr {}}}",
row._ck, deletable_row::printer(p._schema, row._row));
}
};
template <> struct fmt::formatter<static_row::printer> : fmt::formatter<string_view> {
auto format(const static_row::printer& p, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{static_row: {}}}",
row::printer(p._schema, column_kind::static_column, p._static_row._cells));
}
};
template <> struct fmt::formatter<partition_start> : fmt::formatter<string_view> {
auto format(const partition_start& ph, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{partition_start: pk {} partition_tombstone {}}}",
ph._key, ph._partition_tombstone);
}
};
template <> struct fmt::formatter<partition_end> : fmt::formatter<string_view> {
auto format(const partition_end&, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{{partition_end}}");
}
};
template <> struct fmt::formatter<mutation_fragment::printer> : fmt::formatter<string_view> {
auto format(const mutation_fragment::printer&, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <> struct fmt::formatter<mutation_fragment::kind> : fmt::formatter<string_view> {
auto format(mutation_fragment::kind, fmt::format_context& ctx) const -> decltype(ctx.out());
};
template <> struct fmt::formatter<range_tombstone_stream> : fmt::formatter<string_view> {
auto format(const range_tombstone_stream& rtl, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", rtl._list);
}
};