Adapt flat_mutation_reader_v2 to the new version of the API

When compacting a mutation fragment stream (e.g. for sstable
compaction, data query, repair), the compactor needs to accumulate
range tombstones which are relevant for the yet-to-be-processed range.
See range_tombstone_accumulator. One problem is that it has unbounded
memory footprint because the accumulator needs to keep track of all
the tombstoned ranges which are still active.

Another, although more benign, problem is computational complexity
needed to maintain that data structure.

The fix is to get rid of the overlap of range tombstones in the
mutation fragment stream. In v2 of the stream, there is no longer a
range_tombstone fragment. Deletions of ranges of rows within a given
partition are represented with range_tombstone_change fragments. At
any point in the stream there is a single active clustered
tombstone. It is initially equal to the neutral tombstone when the
stream of each partition starts. The range_tombstone_change fragment
type signify changes of the active clustered tombstone. All fragments
emitted while a given clustered tombstone is active are affected by
that tombstone. Like with the old range_tombstone fragments, the
clustered tombstone is independent from the partition tombstone
carried in partition_start.

The v2 stream is strict about range tombstone trimming. It emits range
tombstone changes which reflect range tombstones trimmed to query
restrictions, and fast-forwarding ranges. This makes the stream more
canonical, meaning that for a given set of writes, querying the
database should produce the same stream of fragments for a given
restrictions. There is less ambiguity in how the writes are
represented in the fragment stream. It wasn't the case with v1. For
example, A given set of deletions could be produced either as one
range_tombstone, or may, split and/or deoverlapped with other
fragments. Making a stream canonical is easier for diff-calculating.

The classes related to mutation fragment streams were cloned:
flat_mutation_reader_v2, mutation_fragment_v2, and related concepts.

Refs #8625.
This commit is contained in:
Tomasz Grabiec
2021-05-20 19:44:55 +02:00
parent e3309322c3
commit 08b5773c12
5 changed files with 108 additions and 65 deletions

View File

@@ -26,7 +26,7 @@
#include "dht/i_partitioner.hh"
#include "position_in_partition.hh"
#include "flat_mutation_reader_v2.hh"
#include "flat_mutation_reader.hh"
#include "mutation_fragment_v2.hh"
#include "tracing/trace_state.hh"
#include "mutation.hh"
@@ -91,14 +91,23 @@ concept FlattenedConsumerFilterV2 =
///
/// stream ::= partition*
/// partition ::= partition_start static_row? clustered* partition_end
/// clustered ::= clustering_row | range_tombstone
/// clustered ::= clustering_row | range_tombstone_change
///
/// The range_tombstone fragments can have ranges which overlap with other
/// range_tombstone fragments.
/// Deletions of ranges of rows within a given partition are represented with range_tombstone_change fragments.
/// At any point in the stream there is a single active clustered tombstone.
/// It is initially equal to the neutral tombstone when the stream of each partition starts.
/// range_tombstone_change fragments signify changes of the active clustered tombstone.
/// All fragments emitted while a given clustered tombstone is active are affected by that tombstone.
/// The clustered tombstone is independent from the partition tombstone carried in partition_start.
/// The partition tombstone takes effect for all fragments within the partition.
///
/// Consecutive range_tombstone fragments can have the same position(), so they
/// are weakly ordered. This makes merging two streams easier, and is
/// relied upon by combined_mutation_reader.
/// The stream guarantees that each partition ends with a neutral active clustered tombstone
/// by closing active tombstones with a range_tombstone_change.
/// In fast-forwarding mode, each sub-stream ends with a neutral active clustered tombstone.
///
/// All fragments within a partition have weakly monotonically increasing position().
/// Consecutive range_tombstone_change fragments may share the position.
/// All clustering row fragments within a partition have strictly monotonically increasing position().
///
/// \section Clustering restrictions
///
@@ -113,23 +122,20 @@ concept FlattenedConsumerFilterV2 =
/// 0) The stream must contain fragments corresponding to all writes
/// which are relevant to the requested ranges.
///
/// 1) The stream _may_ contain fragments with information
/// about _some_ of the writes which are relevant to clustering ranges
/// outside of the requested ranges. For example, it may return
/// some of the range tombstones relevant to the clustering ranges
/// outside of the requested ranges, but this information may not
/// be complete (e.g. there could be a more recent deletion).
/// 1) The ranges of non-neutral clustered tombstones must be enclosed in requested
/// ranges. In other words, range tombstones don't extend beyond boundaries of requested ranges.
///
/// 2) The stream will not contain writes which are absent in the unrestricted stream,
/// 2) The stream will not return writes which are absent in the unrestricted stream,
/// both for the requested clustering ranges and not requested ranges.
/// This means that it's safe to populate cache with all the returned information.
/// Even though it may be incomplete for non-requested ranges, it won't contain
/// incorrect information.
///
/// 3) All clustering_row fragments have position() which is within the requested
/// ranges.
/// 3) All clustered fragments have position() which is within the requested
/// ranges or, in case of range_tombstone_change fragments, equal to the end bound.
///
/// 4) range_tombstone fragments can have position() outside of requested ranges.
/// 4) Streams may produce redundant range_tombstone_change fragments
/// which do not change the current clustered tombstone, or have the same position.
///
/// \section Intra-partition fast-forwarding mode
///
@@ -153,9 +159,7 @@ concept FlattenedConsumerFilterV2 =
/// 3) The position_range passed to fast_forward_to() is a clustering key restriction.
/// Same rules apply as with clustering restrictions described above.
///
/// 4) range_tombstones produced in earlier sub-stream which are also relevant
/// for next sub-streams do not have to be repeated. They _may_ be repeated
/// with a starting position trimmed.
/// 4) The sub-stream will not end with a non-neutral active clustered tombstone. All range tombstones are closed.
///
/// 5) partition_end is never emitted, the user needs to call next_partition()
/// to move to the next partition in the original stream, which will open
@@ -316,7 +320,7 @@ public:
future<stop_iteration> consume(clustering_row&& cr) {
return handle_result(_consumer.consume(std::move(cr)));
}
future<stop_iteration> consume(range_tombstone&& rt) {
future<stop_iteration> consume(range_tombstone_change&& rt) {
return handle_result(_consumer.consume(std::move(rt)));
}
future<stop_iteration> consume(partition_start&& ps) {

View File

@@ -160,10 +160,10 @@ mutation_fragment_v2::mutation_fragment_v2(const schema& s, reader_permit permit
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
}
mutation_fragment_v2::mutation_fragment_v2(const schema& s, reader_permit permit, range_tombstone&& r)
: _kind(kind::range_tombstone), _data(std::make_unique<data>(std::move(permit)))
mutation_fragment_v2::mutation_fragment_v2(const schema& s, reader_permit permit, range_tombstone_change&& r)
: _kind(kind::range_tombstone_change), _data(std::make_unique<data>(std::move(permit)))
{
new (&_data->_range_tombstone) range_tombstone(std::move(r));
new (&_data->_range_tombstone_chg) range_tombstone_change(std::move(r));
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
}
@@ -190,8 +190,8 @@ void mutation_fragment_v2::destroy_data() noexcept
case kind::clustering_row:
_data->_clustering_row.~clustering_row();
break;
case kind::range_tombstone:
_data->_range_tombstone.~range_tombstone();
case kind::range_tombstone_change:
_data->_range_tombstone_chg.~range_tombstone_change();
break;
case kind::partition_start:
_data->_partition_start.~partition_start();
@@ -207,6 +207,7 @@ namespace {
struct get_key_visitor {
const clustering_key_prefix& operator()(const clustering_row& cr) { return cr.key(); }
const clustering_key_prefix& operator()(const range_tombstone& rt) { return rt.start; }
const clustering_key_prefix& operator()(const range_tombstone_change& rt) { return rt.position().key(); }
template <typename T>
const clustering_key_prefix& operator()(const T&) { abort(); }
};
@@ -335,7 +336,7 @@ std::ostream& operator<<(std::ostream& os, mutation_fragment_v2::kind k)
switch (k) {
case mutation_fragment_v2::kind::static_row: return os << "static row";
case mutation_fragment_v2::kind::clustering_row: return os << "clustering row";
case mutation_fragment_v2::kind::range_tombstone: return os << "range tombstone";
case mutation_fragment_v2::kind::range_tombstone_change: return os << "range tombstone change";
case mutation_fragment_v2::kind::partition_start: return os << "partition start";
case mutation_fragment_v2::kind::partition_end: return os << "partition end";
}
@@ -443,12 +444,6 @@ bool mutation_fragment_v2::relevant_for_range(const schema& s, position_in_parti
return false;
}
bool mutation_fragment_v2::relevant_for_range_assuming_after(const schema& s, position_in_partition_view pos) const {
position_in_partition::less_compare cmp(s);
// Range tombstones overlapping with the new range are let in
return is_range_tombstone() && cmp(pos, as_range_tombstone().end_position());
}
std::ostream& operator<<(std::ostream& out, const range_tombstone_stream& rtl) {
return out << rtl._list;
}

View File

@@ -33,17 +33,56 @@
#include "db/timeout_clock.hh"
#include "reader_permit.hh"
// Mutation fragment which represents a range tombstone boundary.
//
// The range_tombstone_change::tombstone() method returns the tombstone which takes effect
// for positions >= range_tombstone_change::position() in the stream, until the next
// range_tombstone_change is encountered.
//
// Note, a range_tombstone_change with an empty tombstone() ends the range tombstone.
// An empty tombstone naturally does not cover any timestamp.
class range_tombstone_change {
position_in_partition _pos;
::tombstone _tomb;
public:
range_tombstone_change(position_in_partition pos, tombstone tomb)
: _pos(std::move(pos))
, _tomb(tomb)
{ }
range_tombstone_change(position_in_partition_view pos, tombstone tomb)
: _pos(pos)
, _tomb(tomb)
{ }
const position_in_partition& position() const {
return _pos;
}
void set_position(position_in_partition pos) {
_pos = std::move(pos);
}
::tombstone tombstone() const {
return _tomb;
}
size_t external_memory_usage(const schema& s) const {
return _pos.external_memory_usage();
}
bool equal(const schema& s, const range_tombstone_change& other) const {
position_in_partition::equal_compare eq(s);
return _tomb == other._tomb && eq(_pos, other._pos);
}
friend std::ostream& operator<<(std::ostream& out, const range_tombstone_change&);
};
template<typename T, typename ReturnType>
concept MutationFragmentConsumerV2 =
requires(T t,
static_row sr,
clustering_row cr,
range_tombstone rt,
range_tombstone_change rt_chg,
partition_start ph,
partition_end pe) {
{ t.consume(std::move(sr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(cr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(rt)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(rt_chg)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(ph)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(pe)) } -> std::same_as<ReturnType>;
};
@@ -53,7 +92,7 @@ concept MutationFragmentVisitorV2 =
requires(T t,
const static_row& sr,
const clustering_row& cr,
const range_tombstone& rt,
const range_tombstone_change& rt,
const partition_start& ph,
const partition_end& eop) {
{ t(sr) } -> std::same_as<ReturnType>;
@@ -65,7 +104,7 @@ concept MutationFragmentVisitorV2 =
template<typename T, typename ReturnType>
concept FragmentConsumerReturningV2 =
requires(T t, static_row sr, clustering_row cr, range_tombstone rt, tombstone tomb) {
requires(T t, static_row sr, clustering_row cr, range_tombstone_change rt, tombstone tomb) {
{ t.consume(std::move(sr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(cr)) } -> std::same_as<ReturnType>;
{ t.consume(std::move(rt)) } -> std::same_as<ReturnType>;
@@ -80,7 +119,7 @@ public:
enum class kind {
static_row,
clustering_row,
range_tombstone,
range_tombstone_change,
partition_start,
partition_end,
};
@@ -93,7 +132,7 @@ private:
union {
static_row _static_row;
clustering_row _clustering_row;
range_tombstone _range_tombstone;
range_tombstone_change _range_tombstone_chg;
partition_start _partition_start;
partition_end _partition_end;
};
@@ -122,7 +161,7 @@ public:
mutation_fragment_v2(const schema& s, reader_permit permit, static_row&& r);
mutation_fragment_v2(const schema& s, reader_permit permit, clustering_row&& r);
mutation_fragment_v2(const schema& s, reader_permit permit, range_tombstone&& r);
mutation_fragment_v2(const schema& s, reader_permit permit, range_tombstone_change&& r);
mutation_fragment_v2(const schema& s, reader_permit permit, partition_start&& r);
mutation_fragment_v2(const schema& s, reader_permit permit, partition_end&& r);
@@ -135,8 +174,8 @@ public:
case kind::clustering_row:
new (&_data->_clustering_row) clustering_row(s, o._data->_clustering_row);
break;
case kind::range_tombstone:
new (&_data->_range_tombstone) range_tombstone(o._data->_range_tombstone);
case kind::range_tombstone_change:
new (&_data->_range_tombstone_chg) range_tombstone_change(o._data->_range_tombstone_chg);
break;
case kind::partition_start:
new (&_data->_partition_start) partition_start(o._data->_partition_start);
@@ -164,17 +203,10 @@ public:
position_in_partition_view position() const;
// Returns the range of positions for which this fragment holds relevant information.
position_range range() const;
// Checks if this fragment may be relevant for any range starting at given position.
bool relevant_for_range(const schema& s, position_in_partition_view pos) const;
// Like relevant_for_range() but makes use of assumption that pos is greater
// than the starting position of this fragment.
bool relevant_for_range_assuming_after(const schema& s, position_in_partition_view pos) const;
bool has_key() const { return is_clustering_row() || is_range_tombstone(); }
bool has_key() const { return is_clustering_row() || is_range_tombstone_change(); }
// Requirements: has_key() == true
const clustering_key_prefix& key() const;
@@ -183,7 +215,7 @@ public:
bool is_static_row() const { return _kind == kind::static_row; }
bool is_clustering_row() const { return _kind == kind::clustering_row; }
bool is_range_tombstone() const { return _kind == kind::range_tombstone; }
bool is_range_tombstone_change() const { return _kind == kind::range_tombstone_change; }
bool is_partition_start() const { return _kind == kind::partition_start; }
bool is_end_of_partition() const { return _kind == kind::partition_end; }
@@ -195,8 +227,8 @@ public:
fn(_data->_clustering_row);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
}
void mutate_as_range_tombstone(const schema& s, std::invocable<range_tombstone&> auto&& fn) {
fn(_data->_range_tombstone);
void mutate_as_range_tombstone_change(const schema& s, std::invocable<range_tombstone_change&> auto&& fn) {
fn(_data->_range_tombstone_chg);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
}
void mutate_as_partition_start(const schema& s, std::invocable<partition_start&> auto&& fn) {
@@ -206,13 +238,13 @@ public:
static_row&& as_static_row() && { return std::move(_data->_static_row); }
clustering_row&& as_clustering_row() && { return std::move(_data->_clustering_row); }
range_tombstone&& as_range_tombstone() && { return std::move(_data->_range_tombstone); }
range_tombstone_change&& as_range_tombstone_change() && { return std::move(_data->_range_tombstone_chg); }
partition_start&& as_partition_start() && { return std::move(_data->_partition_start); }
partition_end&& as_end_of_partition() && { return std::move(_data->_partition_end); }
const static_row& as_static_row() const & { return _data->_static_row; }
const clustering_row& as_clustering_row() const & { return _data->_clustering_row; }
const range_tombstone& as_range_tombstone() const & { return _data->_range_tombstone; }
const range_tombstone_change& as_range_tombstone_change() const & { return _data->_range_tombstone_chg; }
const partition_start& as_partition_start() const & { return _data->_partition_start; }
const partition_end& as_end_of_partition() const & { return _data->_partition_end; }
@@ -220,7 +252,7 @@ public:
void apply(const schema& s, mutation_fragment_v2&& mf);
template<typename Consumer>
requires MutationFragmentConsumerV2<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone>()))>
requires MutationFragmentConsumerV2<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone_change>()))>
decltype(auto) consume(Consumer& consumer) && {
_data->_memory.reset();
switch (_kind) {
@@ -228,8 +260,8 @@ public:
return consumer.consume(std::move(_data->_static_row));
case kind::clustering_row:
return consumer.consume(std::move(_data->_clustering_row));
case kind::range_tombstone:
return consumer.consume(std::move(_data->_range_tombstone));
case kind::range_tombstone_change:
return consumer.consume(std::move(_data->_range_tombstone_chg));
case kind::partition_start:
return consumer.consume(std::move(_data->_partition_start));
case kind::partition_end:
@@ -246,8 +278,8 @@ public:
return visitor(as_static_row());
case kind::clustering_row:
return visitor(as_clustering_row());
case kind::range_tombstone:
return visitor(as_range_tombstone());
case kind::range_tombstone_change:
return visitor(as_range_tombstone_change());
case kind::partition_start:
return visitor(as_partition_start());
case kind::partition_end:
@@ -273,8 +305,8 @@ public:
return as_static_row().equal(s, other.as_static_row());
case kind::clustering_row:
return as_clustering_row().equal(s, other.as_clustering_row());
case kind::range_tombstone:
return as_range_tombstone().equal(s, other.as_range_tombstone());
case kind::range_tombstone_change:
return as_range_tombstone_change().equal(s, other.as_range_tombstone_change());
case kind::partition_start:
return as_partition_start().equal(s, other.as_partition_start());
case kind::partition_end:
@@ -287,9 +319,16 @@ public:
// merged into one fragment with apply() which represents the sum of
// writes represented by each of the fragments.
// Fragments which have the same position() but are not mergeable
// can be emitted one after the other in the stream.
// and at least one of them is not a range_tombstone_change can be emitted one after the other in the stream.
//
// Undefined for range_tombstone_change.
// Merging range tombstones requires a more complicated handling
// because range_tombstone_change doesn't represent a write on its own, only
// with a matching change for the end bound. It's not enough to chose one fragment over another,
// the upper bound of the winning tombstone needs to be taken into account when merging
// later range_tombstone_change fragments in the stream.
bool mergeable_with(const mutation_fragment_v2& mf) const {
return _kind == mf._kind && _kind != kind::range_tombstone;
return _kind == mf._kind && _kind != kind::range_tombstone_change;
}
class printer {

View File

@@ -21,6 +21,7 @@
#include "range_tombstone.hh"
#include "mutation_fragment.hh"
#include "mutation_fragment_v2.hh"
#include <boost/range/algorithm/upper_bound.hpp>
@@ -32,6 +33,10 @@ std::ostream& operator<<(std::ostream& out, const range_tombstone& rt) {
}
}
std::ostream& operator<<(std::ostream& out, const range_tombstone_change& rt) {
return out << "{range_tombstone_change: pos=" << rt.position() << ", " << rt.tombstone() << "}";
}
std::optional<range_tombstone> range_tombstone::apply(const schema& s, range_tombstone&& src)
{
bound_view::compare cmp(s);

View File

@@ -103,7 +103,7 @@ public:
return mutation_fragment_v2(*_new, std::move(*_permit), clustering_row(row.key(), row.tomb(), row.marker(),
transform(std::move(row.cells()), column_kind::regular_column)));
}
mutation_fragment_v2 consume(range_tombstone&& rt) {
mutation_fragment_v2 consume(range_tombstone_change&& rt) {
return mutation_fragment_v2(*_new, std::move(*_permit), std::move(rt));
}
mutation_fragment_v2 consume(partition_start&& ph) {