Move `mutation_fragment_v2::kind` enum into data object, reducing size of the object from 16 to 8 bytes on current machines.
388 lines
15 KiB
C++
388 lines
15 KiB
C++
/*
|
|
* Copyright (C) 2021-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "mutation_fragment.hh"
|
|
#include "position_in_partition.hh"
|
|
|
|
#include <fmt/core.h>
|
|
#include <optional>
|
|
#include <seastar/util/optimized_optional.hh>
|
|
|
|
#include "reader_permit.hh"
|
|
|
|
// Mutation fragment which represents a range tombstone boundary.
|
|
//
|
|
// The range_tombstone_change::tombstone() method returns the tombstone which takes effect
|
|
// for positions >= range_tombstone_change::position() in the stream, until the next
|
|
// range_tombstone_change is encountered.
|
|
//
|
|
// Note, a range_tombstone_change with an empty tombstone() ends the range tombstone.
|
|
// An empty tombstone naturally does not cover any timestamp.
|
|
class range_tombstone_change {
|
|
position_in_partition _pos;
|
|
::tombstone _tomb;
|
|
public:
|
|
range_tombstone_change(position_in_partition pos, tombstone tomb)
|
|
: _pos(std::move(pos))
|
|
, _tomb(tomb)
|
|
{ }
|
|
range_tombstone_change(position_in_partition_view pos, tombstone tomb)
|
|
: _pos(pos)
|
|
, _tomb(tomb)
|
|
{ }
|
|
const position_in_partition& position() const & {
|
|
return _pos;
|
|
}
|
|
position_in_partition position() && {
|
|
return std::move(_pos);
|
|
}
|
|
void set_position(position_in_partition pos) {
|
|
_pos = std::move(pos);
|
|
}
|
|
::tombstone tombstone() const {
|
|
return _tomb;
|
|
}
|
|
void set_tombstone(::tombstone tomb) {
|
|
_tomb = tomb;
|
|
}
|
|
size_t external_memory_usage(const schema& s) const {
|
|
return _pos.external_memory_usage();
|
|
}
|
|
size_t minimal_external_memory_usage(const schema&) const noexcept {
|
|
if (_pos.has_key()) {
|
|
return _pos.key().minimal_external_memory_usage();
|
|
}
|
|
return 0;
|
|
}
|
|
size_t memory_usage(const schema& s) const noexcept {
|
|
return sizeof(range_tombstone_change) + external_memory_usage(s);
|
|
}
|
|
size_t minimal_memory_usage(const schema& s) const noexcept {
|
|
return sizeof(range_tombstone_change) + minimal_external_memory_usage(s);
|
|
}
|
|
bool equal(const schema& s, const range_tombstone_change& other) const {
|
|
position_in_partition::equal_compare eq(s);
|
|
return _tomb == other._tomb && eq(_pos, other._pos);
|
|
}
|
|
};
|
|
|
|
template<>
|
|
struct fmt::formatter<range_tombstone_change> : fmt::formatter<string_view> {
|
|
template <typename FormatContext>
|
|
auto format(const range_tombstone_change& rt, FormatContext& ctx) const {
|
|
return fmt::format_to(ctx.out(), "{{range_tombstone_change: pos={}, {}}}", rt.position(), rt.tombstone());
|
|
}
|
|
};
|
|
|
|
template<typename T, typename ReturnType>
|
|
concept MutationFragmentConsumerV2 =
|
|
requires(T& t,
|
|
static_row sr,
|
|
clustering_row cr,
|
|
range_tombstone_change rt_chg,
|
|
partition_start ph,
|
|
partition_end pe) {
|
|
{ t.consume(std::move(sr)) } -> std::same_as<ReturnType>;
|
|
{ t.consume(std::move(cr)) } -> std::same_as<ReturnType>;
|
|
{ t.consume(std::move(rt_chg)) } -> std::same_as<ReturnType>;
|
|
{ t.consume(std::move(ph)) } -> std::same_as<ReturnType>;
|
|
{ t.consume(std::move(pe)) } -> std::same_as<ReturnType>;
|
|
};
|
|
|
|
template<typename T, typename ReturnType>
|
|
concept MutationFragmentVisitorV2 =
|
|
requires(T t,
|
|
const static_row& sr,
|
|
const clustering_row& cr,
|
|
const range_tombstone_change& rt,
|
|
const partition_start& ph,
|
|
const partition_end& eop) {
|
|
{ t(sr) } -> std::same_as<ReturnType>;
|
|
{ t(cr) } -> std::same_as<ReturnType>;
|
|
{ t(rt) } -> std::same_as<ReturnType>;
|
|
{ t(ph) } -> std::same_as<ReturnType>;
|
|
{ t(eop) } -> std::same_as<ReturnType>;
|
|
};
|
|
|
|
class mutation_fragment_v2 {
|
|
public:
|
|
enum class kind : std::uint8_t {
|
|
static_row,
|
|
clustering_row,
|
|
range_tombstone_change,
|
|
partition_start,
|
|
partition_end,
|
|
};
|
|
private:
|
|
struct data {
|
|
data(reader_permit permit, kind _kind) : _memory(permit.consume_memory()), _kind(_kind) { }
|
|
~data() { }
|
|
|
|
reader_permit::resource_units _memory;
|
|
kind _kind;
|
|
union {
|
|
static_row _static_row;
|
|
clustering_row _clustering_row;
|
|
range_tombstone_change _range_tombstone_chg;
|
|
partition_start _partition_start;
|
|
partition_end _partition_end;
|
|
};
|
|
};
|
|
private:
|
|
std::unique_ptr<data> _data;
|
|
|
|
mutation_fragment_v2() = default;
|
|
explicit operator bool() const noexcept { return bool(_data); }
|
|
void destroy_data() noexcept;
|
|
void reset_memory(const schema& s, std::optional<reader_resources> res = {});
|
|
friend class optimized_optional<mutation_fragment_v2>;
|
|
|
|
friend class position_in_partition;
|
|
public:
|
|
struct clustering_row_tag_t { };
|
|
|
|
template<typename... Args>
|
|
mutation_fragment_v2(clustering_row_tag_t, const schema& s, reader_permit permit, Args&&... args)
|
|
: _data(std::make_unique<data>(std::move(permit), kind::clustering_row))
|
|
{
|
|
new (&_data->_clustering_row) clustering_row(std::forward<Args>(args)...);
|
|
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
|
|
}
|
|
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, static_row&& r);
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, clustering_row&& r);
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, range_tombstone_change&& r);
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, partition_start&& r);
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, partition_end&& r);
|
|
|
|
mutation_fragment_v2(const schema& s, reader_permit permit, const mutation_fragment_v2& o)
|
|
: _data(std::make_unique<data>(std::move(permit), o._data->_kind)) {
|
|
switch (o._data->_kind) {
|
|
case kind::static_row:
|
|
new (&_data->_static_row) static_row(s, o._data->_static_row);
|
|
break;
|
|
case kind::clustering_row:
|
|
new (&_data->_clustering_row) clustering_row(s, o._data->_clustering_row);
|
|
break;
|
|
case kind::range_tombstone_change:
|
|
new (&_data->_range_tombstone_chg) range_tombstone_change(o._data->_range_tombstone_chg);
|
|
break;
|
|
case kind::partition_start:
|
|
new (&_data->_partition_start) partition_start(o._data->_partition_start);
|
|
break;
|
|
case kind::partition_end:
|
|
new (&_data->_partition_end) partition_end(o._data->_partition_end);
|
|
break;
|
|
}
|
|
_data->_memory.reset_to(o._data->_memory.resources());
|
|
}
|
|
mutation_fragment_v2(mutation_fragment_v2&& other) = default;
|
|
mutation_fragment_v2& operator=(mutation_fragment_v2&& other) noexcept {
|
|
if (this != &other) {
|
|
this->~mutation_fragment_v2();
|
|
new (this) mutation_fragment_v2(std::move(other));
|
|
}
|
|
return *this;
|
|
}
|
|
[[gnu::always_inline]]
|
|
~mutation_fragment_v2() {
|
|
if (_data) {
|
|
destroy_data();
|
|
}
|
|
}
|
|
|
|
position_in_partition_view position() const;
|
|
|
|
// Checks if this fragment may be relevant for any range starting at given position.
|
|
bool relevant_for_range(const schema& s, position_in_partition_view pos) const;
|
|
|
|
bool has_key() const { return is_clustering_row() || is_range_tombstone_change(); }
|
|
|
|
// Requirements: has_key() == true
|
|
const clustering_key_prefix& key() const;
|
|
|
|
kind mutation_fragment_kind() const { return _data->_kind; }
|
|
|
|
bool is_static_row() const { return _data->_kind == kind::static_row; }
|
|
bool is_clustering_row() const { return _data->_kind == kind::clustering_row; }
|
|
bool is_range_tombstone_change() const { return _data->_kind == kind::range_tombstone_change; }
|
|
bool is_partition_start() const { return _data->_kind == kind::partition_start; }
|
|
bool is_end_of_partition() const { return _data->_kind == kind::partition_end; }
|
|
|
|
void mutate_as_static_row(const schema& s, std::invocable<static_row&> auto&& fn) {
|
|
fn(_data->_static_row);
|
|
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
|
|
}
|
|
void mutate_as_clustering_row(const schema& s, std::invocable<clustering_row&> auto&& fn) {
|
|
fn(_data->_clustering_row);
|
|
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
|
|
}
|
|
void mutate_as_range_tombstone_change(const schema& s, std::invocable<range_tombstone_change&> auto&& fn) {
|
|
fn(_data->_range_tombstone_chg);
|
|
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
|
|
}
|
|
void mutate_as_partition_start(const schema& s, std::invocable<partition_start&> auto&& fn) {
|
|
fn(_data->_partition_start);
|
|
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
|
|
}
|
|
|
|
static_row&& as_static_row() && { return std::move(_data->_static_row); }
|
|
clustering_row&& as_clustering_row() && { return std::move(_data->_clustering_row); }
|
|
range_tombstone_change&& as_range_tombstone_change() && { return std::move(_data->_range_tombstone_chg); }
|
|
partition_start&& as_partition_start() && { return std::move(_data->_partition_start); }
|
|
partition_end&& as_end_of_partition() && { return std::move(_data->_partition_end); }
|
|
|
|
const static_row& as_static_row() const & { return _data->_static_row; }
|
|
const clustering_row& as_clustering_row() const & { return _data->_clustering_row; }
|
|
const range_tombstone_change& as_range_tombstone_change() const & { return _data->_range_tombstone_chg; }
|
|
const partition_start& as_partition_start() const & { return _data->_partition_start; }
|
|
const partition_end& as_end_of_partition() const & { return _data->_partition_end; }
|
|
|
|
// Requirements: mergeable_with(mf)
|
|
void apply(const schema& s, mutation_fragment_v2&& mf);
|
|
|
|
template<typename Consumer>
|
|
requires MutationFragmentConsumerV2<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone_change>()))>
|
|
decltype(auto) consume(Consumer& consumer) && {
|
|
_data->_memory.reset_to_zero();
|
|
switch (_data->_kind) {
|
|
case kind::static_row:
|
|
return consumer.consume(std::move(_data->_static_row));
|
|
case kind::clustering_row:
|
|
return consumer.consume(std::move(_data->_clustering_row));
|
|
case kind::range_tombstone_change:
|
|
return consumer.consume(std::move(_data->_range_tombstone_chg));
|
|
case kind::partition_start:
|
|
return consumer.consume(std::move(_data->_partition_start));
|
|
case kind::partition_end:
|
|
return consumer.consume(std::move(_data->_partition_end));
|
|
}
|
|
abort();
|
|
}
|
|
|
|
template<typename Visitor>
|
|
requires MutationFragmentVisitorV2<Visitor, decltype(std::declval<Visitor>()(std::declval<static_row&>()))>
|
|
decltype(auto) visit(Visitor&& visitor) const {
|
|
switch (_data->_kind) {
|
|
case kind::static_row:
|
|
return visitor(as_static_row());
|
|
case kind::clustering_row:
|
|
return visitor(as_clustering_row());
|
|
case kind::range_tombstone_change:
|
|
return visitor(as_range_tombstone_change());
|
|
case kind::partition_start:
|
|
return visitor(as_partition_start());
|
|
case kind::partition_end:
|
|
return visitor(as_end_of_partition());
|
|
}
|
|
abort();
|
|
}
|
|
|
|
size_t memory_usage() const {
|
|
return _data->_memory.resources().memory;
|
|
}
|
|
|
|
reader_permit permit() const {
|
|
return _data->_memory.permit();
|
|
}
|
|
|
|
bool equal(const schema& s, const mutation_fragment_v2& other) const {
|
|
if (other._data->_kind != _data->_kind) {
|
|
return false;
|
|
}
|
|
switch (_data->_kind) {
|
|
case kind::static_row:
|
|
return as_static_row().equal(s, other.as_static_row());
|
|
case kind::clustering_row:
|
|
return as_clustering_row().equal(s, other.as_clustering_row());
|
|
case kind::range_tombstone_change:
|
|
return as_range_tombstone_change().equal(s, other.as_range_tombstone_change());
|
|
case kind::partition_start:
|
|
return as_partition_start().equal(s, other.as_partition_start());
|
|
case kind::partition_end:
|
|
return as_end_of_partition().equal(s, other.as_end_of_partition());
|
|
}
|
|
abort();
|
|
}
|
|
|
|
// Fragments which have the same position() and are mergeable can be
|
|
// merged into one fragment with apply() which represents the sum of
|
|
// writes represented by each of the fragments.
|
|
// Fragments which have the same position() but are not mergeable
|
|
// and at least one of them is not a range_tombstone_change can be emitted one after the other in the stream.
|
|
//
|
|
// Undefined for range_tombstone_change.
|
|
// Merging range tombstones requires a more complicated handling
|
|
// because range_tombstone_change doesn't represent a write on its own, only
|
|
// with a matching change for the end bound. It's not enough to chose one fragment over another,
|
|
// the upper bound of the winning tombstone needs to be taken into account when merging
|
|
// later range_tombstone_change fragments in the stream.
|
|
bool mergeable_with(const mutation_fragment_v2& mf) const {
|
|
return _data->_kind == mf._data->_kind && _data->_kind != kind::range_tombstone_change;
|
|
}
|
|
|
|
class printer {
|
|
const schema& _schema;
|
|
const mutation_fragment_v2& _mutation_fragment;
|
|
public:
|
|
printer(const schema& s, const mutation_fragment_v2& mf) : _schema(s), _mutation_fragment(mf) { }
|
|
printer(const printer&) = delete;
|
|
printer(printer&&) = delete;
|
|
|
|
friend fmt::formatter<printer>;
|
|
};
|
|
friend fmt::formatter<printer>;
|
|
|
|
private:
|
|
size_t calculate_memory_usage(const schema& s) const {
|
|
return sizeof(data) + visit([&s] (auto& mf) -> size_t { return mf.external_memory_usage(s); });
|
|
}
|
|
};
|
|
|
|
template <> struct fmt::formatter<mutation_fragment_v2::printer> : fmt::formatter<string_view> {
|
|
auto format(const mutation_fragment_v2::printer&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
|
};
|
|
|
|
std::ostream& operator<<(std::ostream&, mutation_fragment_v2::kind);
|
|
|
|
template <> struct fmt::formatter<mutation_fragment_v2::kind> : fmt::formatter<string_view> {
|
|
template <typename FormatContext>
|
|
auto format(mutation_fragment_v2::kind k, FormatContext& ctx) const {
|
|
string_view name = "UNEXPECTED";
|
|
switch (k) {
|
|
case mutation_fragment_v2::kind::static_row:
|
|
name = "static row";
|
|
break;
|
|
case mutation_fragment_v2::kind::clustering_row:
|
|
name = "clustering row";
|
|
break;
|
|
case mutation_fragment_v2::kind::range_tombstone_change:
|
|
name = "range tombstone change";
|
|
break;
|
|
case mutation_fragment_v2::kind::partition_start:
|
|
name = "partition start";
|
|
break;
|
|
case mutation_fragment_v2::kind::partition_end:
|
|
name = "partition end";
|
|
break;
|
|
}
|
|
return formatter<string_view>::format(name, ctx);
|
|
}
|
|
};
|
|
|
|
// F gets a stream element as an argument and returns the new value which replaces that element
|
|
// in the transformed stream.
|
|
template<typename F>
|
|
concept StreamedMutationTranformerV2 =
|
|
requires(F f, mutation_fragment_v2 mf, schema_ptr s) {
|
|
{ f(std::move(mf)) } -> std::same_as<mutation_fragment_v2>;
|
|
{ f(s) } -> std::same_as<schema_ptr>;
|
|
};
|