Files
scylladb/mutation/mutation_partition_v2.hh
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

288 lines
14 KiB
C++

/*
* Copyright (C) 2014-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <boost/intrusive/parent_from_member.hpp>
#include "mutation_partition.hh"
#include <ranges>
#ifdef SEASTAR_DEBUG
#include "utils/assert.hh"
#endif
// is_evictable::yes means that the object is part of an evictable snapshots in MVCC,
// and non-evictable one otherwise.
// See docs/dev/mvcc.md for more details.
using is_evictable = bool_class<class evictable_tag>;
// Represents a set of writes made to a single partition.
//
// Like mutation_partition, but intended to be used in cache/memtable
// so the tradeoffs are different. This representation must be memory-efficient
// and must support incremental eviction of its contents. It is used in MVCC so
// algorithms for merging must respect MVCC invariants. See docs/dev/mvcc.md.
//
// The object is schema-dependent. Each instance is governed by some
// specific schema version. Accessors require a reference to the schema object
// of that version.
//
// There is an operation of addition defined on mutation_partition objects
// (also called "apply"), which gives as a result an object representing the
// sum of writes contained in the addends. For instances governed by the same
// schema, addition is commutative and associative.
//
// In addition to representing writes, the object supports specifying a set of
// partition elements called "continuity". This set can be used to represent
// lack of information about certain parts of the partition. It can be
// specified which ranges of clustering keys belong to that set. We say that a
// key range is continuous if all keys in that range belong to the continuity
// set, and discontinuous otherwise. By default everything is continuous.
// The static row may be also continuous or not.
// Partition tombstone is always continuous.
//
// Continuity is ignored by instance equality. It's also transient, not
// preserved by serialization.
//
// Continuity is represented internally using flags on row entries. The key
// range between two consecutive entries (both ends exclusive) is continuous
// if and only if rows_entry::continuous() is true for the later entry. The
// range starting after the last entry is assumed to be continuous. The range
// corresponding to the key of the entry is continuous if and only if
// rows_entry::dummy() is false.
//
// Adding two fully-continuous instances gives a fully-continuous instance.
// Continuity doesn't affect how the write part is added.
//
// Addition of continuity is not commutative in general, but is associative.
// The default continuity merging rules are those required by MVCC to
// preserve its invariants. For details, refer to "Continuity merging rules" section
// in the doc in partition_version.hh.
class mutation_partition_v2 final {
public:
using rows_type = rows_entry::container_type;
friend class size_calculator;
private:
tombstone _tombstone;
lazy_row _static_row;
bool _static_row_continuous = true;
rows_type _rows;
#ifdef SEASTAR_DEBUG
table_schema_version _schema_version;
#endif
friend class converting_mutation_partition_applier;
public:
struct copy_comparators_only {};
struct incomplete_tag {};
// Constructs an empty instance which is fully discontinuous except for the partition tombstone.
mutation_partition_v2(incomplete_tag, const schema& s, tombstone);
static mutation_partition_v2 make_incomplete(const schema& s, tombstone t = {}) {
return mutation_partition_v2(incomplete_tag(), s, t);
}
mutation_partition_v2(const schema& s)
: _rows()
#ifdef SEASTAR_DEBUG
, _schema_version(s.version())
#endif
{ }
mutation_partition_v2(mutation_partition_v2& other, copy_comparators_only)
: _rows()
#ifdef SEASTAR_DEBUG
, _schema_version(other._schema_version)
#endif
{ }
mutation_partition_v2(mutation_partition_v2&&) = default;
// Assumes that p is fully continuous.
mutation_partition_v2(const schema& s, mutation_partition&& p);
mutation_partition_v2(const schema& s, const mutation_partition_v2&);
// Assumes that p is fully continuous.
mutation_partition_v2(const schema& s, const mutation_partition& p);
~mutation_partition_v2();
static mutation_partition_v2& container_of(rows_type&);
mutation_partition_v2& operator=(mutation_partition_v2&& x) noexcept;
bool equal(const schema&, const mutation_partition_v2&) const;
bool equal(const schema& this_schema, const mutation_partition_v2& p, const schema& p_schema) const;
bool equal_continuity(const schema&, const mutation_partition_v2&) const;
// Consistent with equal()
template<typename Hasher>
void feed_hash(Hasher& h, const schema& s) const {
hashing_partition_visitor<Hasher> v(h, s);
accept(s, v);
}
class printer {
const schema& _schema;
const mutation_partition_v2& _mutation_partition;
public:
printer(const schema& s, const mutation_partition_v2& mp) : _schema(s), _mutation_partition(mp) { }
printer(const printer&) = delete;
printer(printer&&) = delete;
friend fmt::formatter<printer>;
};
friend fmt::formatter<printer>;
public:
// Makes sure there is a dummy entry after all clustered rows. Doesn't affect continuity.
// Doesn't invalidate iterators.
void ensure_last_dummy(const schema&);
bool static_row_continuous() const { return _static_row_continuous; }
void set_static_row_continuous(bool value) { _static_row_continuous = value; }
bool is_fully_continuous() const;
void make_fully_continuous();
// Sets or clears continuity of clustering ranges between existing rows.
void set_continuity(const schema&, const position_range& pr, is_continuous);
// Returns clustering row ranges which have continuity matching the is_continuous argument.
clustering_interval_set get_continuity(const schema&, is_continuous = is_continuous::yes) const;
// Returns true iff all keys from given range are marked as continuous, or range is empty.
bool fully_continuous(const schema&, const position_range&);
// Returns true iff all keys from given range are marked as not continuous and range is not empty.
bool fully_discontinuous(const schema&, const position_range&);
// Returns true iff all keys from given range have continuity membership as specified by is_continuous.
bool check_continuity(const schema&, const position_range&, is_continuous) const;
// Frees elements of the partition in batches.
// Returns stop_iteration::yes iff there are no more elements to free.
// Continuity is unspecified after this.
stop_iteration clear_gently(cache_tracker*) noexcept;
// Applies mutation_fragment.
// The fragment must be goverened by the same schema as this object.
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t);
void apply_delete(const schema& schema, range_tombstone rt);
void apply_delete(const schema& schema, clustering_key_prefix&& prefix, tombstone t);
void apply_delete(const schema& schema, clustering_key_prefix_view prefix, tombstone t);
// Equivalent to applying a mutation with an empty row, created with given timestamp
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at);
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at,
gc_clock::duration ttl, gc_clock::time_point expiry);
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply_row_tombstone(const schema& schema, range_tombstone rt);
// Applies p to current object.
// Weak exception guarantees.
// Assumes this and p are not owned by a cache_tracker and non-evictable.
void apply(const schema& s, mutation_partition&&);
void apply(const schema& s, mutation_partition_v2&& p, cache_tracker* = nullptr, is_evictable evictable = is_evictable::no);
// Applies p to this instance.
//
// Monotonic exception guarantees. In case of exception the sum of p and this remains the same as before the exception.
// This instance and p are governed by the same schema.
//
// Must be provided with a pointer to the cache_tracker, which owns both this and p.
//
// Returns stop_iteration::no if the operation was preempted before finished, and stop_iteration::yes otherwise.
// On preemption the sum of this and p stays the same (represents the same set of writes), and the state of this
// object contains at least all the writes it contained before the call (monotonicity). It may contain partial writes.
// Also, some progress is always guaranteed (liveness).
//
// If returns stop_iteration::yes, then the sum of this and p is NO LONGER the same as before the call,
// the state of p is undefined and should not be used for reading.
//
// The operation can be driven to completion like this:
//
// apply_resume res;
// while (apply_monotonically(..., is_preemtable::yes, &res) == stop_iteration::no) { }
//
// If is_preemptible::no is passed as argument then stop_iteration::no is never returned.
//
// If is_preemptible::yes is passed, apply_resume must also be passed,
// same instance each time until stop_iteration::yes is returned.
stop_iteration apply_monotonically(const schema& this_schema, const schema& p_schema, mutation_partition_v2&& p,
cache_tracker*, mutation_application_stats& app_stats, preemption_check, apply_resume&, is_evictable);
// Converts partition to the new schema. When succeeds the partition should only be accessed
// using the new schema.
//
// Strong exception guarantees.
void upgrade(const schema& old_schema, const schema& new_schema);
// Transforms this instance into a minimal one which still represents the same set of writes.
// Does not garbage collect expired data, so the result is clock-independent and
// should produce the same result on all replicas.
// has_redundant_dummies(*this) is guaranteed to be false after this.
void compact(const schema&, cache_tracker*);
mutation_partition as_mutation_partition(const schema&) const;
private:
// Erases the entry if it's safe to do so without changing the logical state of the partition.
// (It's allowed to evict empty row entries, though).
rows_type::iterator maybe_drop(const schema&, cache_tracker*, rows_type::iterator, mutation_application_stats&);
void insert_row(const schema& s, const clustering_key& key, deletable_row&& row);
void insert_row(const schema& s, const clustering_key& key, const deletable_row& row);
public:
// Returns true if the mutation_partition_v2 represents no writes.
bool empty() const;
public:
deletable_row& clustered_row(const schema& s, const clustering_key& key);
deletable_row& clustered_row(const schema& s, clustering_key&& key);
deletable_row& clustered_row(const schema& s, clustering_key_view key);
deletable_row& clustered_row(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
rows_entry& clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
rows_entry& clustered_row(const schema& s, position_in_partition_view pos, is_dummy);
// Throws if the row already exists or if the row was not inserted to the
// last position (one or more greater row already exists).
// Weak exception guarantees.
deletable_row& append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
public:
tombstone partition_tombstone() const { return _tombstone; }
lazy_row& static_row() { return _static_row; }
const lazy_row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const noexcept { return _rows; }
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
rows_type& mutable_clustered_rows() noexcept { return _rows; }
const row* find_row(const schema& s, const clustering_key& key) const;
std::ranges::subrange<rows_type::const_iterator> range(const schema& schema, const query::clustering_range& r) const;
rows_type::const_iterator lower_bound(const schema& schema, const query::clustering_range& r) const;
rows_type::const_iterator upper_bound(const schema& schema, const query::clustering_range& r) const;
rows_type::iterator lower_bound(const schema& schema, const query::clustering_range& r);
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
std::ranges::subrange<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
// Returns an iterator range of rows_entry, with only non-dummy entries.
auto non_dummy_rows() const {
return std::ranges::subrange(_rows.begin(), _rows.end())
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
}
void accept(const schema&, mutation_partition_visitor&) const;
bool is_static_row_live(const schema&,
gc_clock::time_point query_time = gc_clock::time_point::min()) const;
uint64_t row_count() const;
size_t external_memory_usage(const schema&) const;
private:
template<typename Func>
void for_each_row(const schema& schema, const query::clustering_range& row_range, bool reversed, Func&& func) const;
friend class counter_write_query_result_builder;
void check_schema(const schema& s) const {
#ifdef SEASTAR_DEBUG
SCYLLA_ASSERT(s.version() == _schema_version);
#endif
}
};
inline
mutation_partition_v2& mutation_partition_v2::container_of(rows_type& rows) {
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition_v2::_rows);
}
// Returns true iff the mutation contains dummy rows which are redundant,
// meaning that they can be removed without affecting the set of writes represented by the mutation.
bool has_redundant_dummies(const mutation_partition_v2&);
template <> struct fmt::formatter<mutation_partition_v2::printer> : fmt::formatter<string_view> {
auto format(const mutation_partition_v2::printer&, fmt::format_context& ctx) const -> decltype(ctx.out());
};