test/boost/mutation_test: simplify test_compaction_data_stream_split test

This test has very elaborate infrastructure essentially duplicating
mutation, mutation::apply() and mutation::operator==. Drop all this
extra code and use mutations directly instead. This makes migrating the
test to v2 easier.
This commit is contained in:
Botond Dénes
2022-01-17 15:15:51 +02:00
parent 2941803da0
commit dec4e5659b

View File

@@ -2148,176 +2148,6 @@ SEASTAR_THREAD_TEST_CASE(test_collection_compaction) {
namespace {
struct cell_summary {
api::timestamp_type timestamp;
};
struct collection_summary {
tombstone tomb;
std::vector<std::pair<bytes, cell_summary>> cells;
};
using value_summary = std::variant<cell_summary, collection_summary>;
using row_summary = std::map<column_id, value_summary>;
struct static_row_summary {
row_summary cells;
};
struct clustering_row_summary {
clustering_key key;
row_marker marker;
row_tombstone tomb;
row_summary cells;
explicit clustering_row_summary(clustering_key key) : key(std::move(key))
{ }
clustering_row_summary(clustering_key key, row_marker marker, row_tombstone tomb, row_summary cells)
: key(std::move(key)), marker(marker), tomb(tomb), cells(std::move(cells))
{ }
};
class clustering_fragment_summary {
public:
class tri_cmp;
class less_cmp;
private:
std::variant<clustering_row_summary, range_tombstone> _value;
public:
clustering_fragment_summary(clustering_row_summary cr) : _value(std::move(cr)) { }
clustering_fragment_summary(range_tombstone rt) : _value(std::move(rt)) { }
const clustering_key_prefix& key() const {
return std::visit(make_visitor(
[] (const clustering_row_summary& cr) -> const clustering_key& {
return cr.key;
},
[] (const range_tombstone& rt) -> const clustering_key& {
return rt.start;
}),
_value);
}
position_in_partition_view position() const {
return std::visit(make_visitor(
[] (const clustering_row_summary& cr) {
return position_in_partition_view::for_key(cr.key);
},
[] (const range_tombstone& rt) {
return rt.position();
}),
_value);
}
bool is_range_tombstone() const {
return std::holds_alternative<range_tombstone>(_value);
}
bool is_clustering_row() const {
return std::holds_alternative<clustering_row_summary>(_value);
}
const range_tombstone& as_range_tombstone() const {
return std::get<range_tombstone>(_value);
}
const clustering_row_summary& as_clustering_row() const {
return std::get<clustering_row_summary>(_value);
}
range_tombstone& as_range_tombstone() {
return std::get<range_tombstone>(_value);
}
clustering_row_summary& as_clustering_row() {
return std::get<clustering_row_summary>(_value);
}
};
class clustering_fragment_summary::tri_cmp {
position_in_partition::tri_compare _pos_tri_cmp;
bound_view::tri_compare _bv_cmp;
std::strong_ordering rt_tri_cmp(const range_tombstone& a, const range_tombstone& b) const {
auto start_bound_cmp = _pos_tri_cmp(a.position(), b.position());
if (start_bound_cmp != 0) {
return start_bound_cmp;
}
// Range tombstones can have the same start position. In this case use
// the end bound to decide who's "less".
return _bv_cmp(a.end_bound(), b.end_bound());
}
public:
explicit tri_cmp(const schema& schema) : _pos_tri_cmp(schema), _bv_cmp(schema) { }
std::strong_ordering operator()(const clustering_fragment_summary& a, const clustering_fragment_summary& b) const {
if (const auto res = _pos_tri_cmp(a.position(), b.position()); res != 0) {
return res;
}
if (a.is_range_tombstone() && b.is_range_tombstone()) {
return rt_tri_cmp(a.as_range_tombstone(), b.as_range_tombstone());
}
// Sort range tombstones before clustering rows
if (a.is_range_tombstone() || b.is_range_tombstone()) {
return int(b.is_range_tombstone()) <=> int(a.is_range_tombstone());
}
return std::strong_ordering::equal; // two clustering rows
}
};
class clustering_fragment_summary::less_cmp {
clustering_fragment_summary::tri_cmp _tri_cmp;
public:
explicit less_cmp(const schema& schema) : _tri_cmp(schema) { }
bool operator()(const clustering_fragment_summary& a, const clustering_fragment_summary& b) const {
return _tri_cmp(a, b) < 0;
}
};
using collection_element_tri_cmp_type = std::function<std::strong_ordering(const std::pair<bytes, cell_summary>&, const std::pair<bytes, cell_summary>&)>;
collection_element_tri_cmp_type
collection_element_tri_cmp(const abstract_type& type) {
return visit(type, make_visitor(
[] (const collection_type_impl& ctype) -> collection_element_tri_cmp_type {
return [tri_cmp = serialized_tri_compare(ctype.name_comparator()->as_tri_comparator())]
(const std::pair<bytes, cell_summary>& a, const std::pair<bytes, cell_summary>& b) {
return tri_cmp(a.first, b.first);
};
},
[] (const user_type_impl& utype) -> collection_element_tri_cmp_type {
return [] (const std::pair<bytes, cell_summary>& a, const std::pair<bytes, cell_summary>& b) {
auto ai = deserialize_field_index(a.first);
auto bi = deserialize_field_index(b.first);
return ai <=> bi;
};
},
[] (const abstract_type& o) -> collection_element_tri_cmp_type {
BOOST_FAIL(format("collection_element_tri_cmp: unknown type {}", o.name()));
__builtin_unreachable();
}
));
}
struct partition_summary {
dht::decorated_key key;
tombstone tomb;
std::optional<static_row_summary> static_row;
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> clustering_fragments;
partition_summary(const schema& s, dht::decorated_key dk)
: key(std::move(dk))
, clustering_fragments(clustering_fragment_summary::less_cmp(s)) {
}
partition_summary(
dht::decorated_key dk,
tombstone tomb,
std::optional<static_row_summary> static_row,
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> clustering_fragments)
: key(std::move(dk))
, tomb(tomb)
, static_row(std::move(static_row))
, clustering_fragments(std::move(clustering_fragments)) {
}
};
template <bool OnlyPurged>
class basic_compacted_fragments_consumer_base {
const schema& _schema;
@@ -2326,8 +2156,8 @@ class basic_compacted_fragments_consumer_base {
std::function<api::timestamp_type(const dht::decorated_key&)> _get_max_purgeable;
api::timestamp_type _max_purgeable;
std::vector<partition_summary> _partition_summaries;
std::optional<partition_summary> _partition_summary;
std::vector<mutation> _mutations;
std::optional<mutation> _mutation;
private:
bool can_gc(tombstone t) {
@@ -2351,21 +2181,18 @@ private:
cell.deletion_time() < _gc_before &&
can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
}
value_summary examine_cell(const column_definition& cdef, const atomic_cell_or_collection& cell_or_collection, const row_tombstone& tomb) {
void examine_cell(const column_definition& cdef, const atomic_cell_or_collection& cell_or_collection, const row_tombstone& tomb) {
if (cdef.type->is_atomic()) {
auto cell = cell_or_collection.as_atomic_cell(cdef);
if constexpr (OnlyPurged) {
BOOST_REQUIRE(!cell.is_covered_by(tomb.tomb(), cdef.is_counter()));
}
BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged);
return cell_summary{cell.timestamp()};
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
auto cell = cell_or_collection.as_collection_mutation();
collection_summary summary;
cell.with_deserialized(*cdef.type, [&] (collection_mutation_view_description m_view) {
BOOST_REQUIRE(m_view.tomb.timestamp == api::missing_timestamp || m_view.tomb.timestamp > tomb.tomb().timestamp ||
is_tombstone_purgeable(m_view.tomb) == OnlyPurged);
summary.tomb = m_view.tomb;
auto t = m_view.tomb;
t.apply(tomb.tomb());
for (const auto& [key, cell] : m_view.cells) {
@@ -2373,19 +2200,16 @@ private:
BOOST_REQUIRE(!cell.is_covered_by(t, false));
}
BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged);
summary.cells.emplace_back(std::pair(key, cell_summary{cell.timestamp()}));
}
});
return std::move(summary);
} else {
throw std::runtime_error(fmt::format("Cannot check cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name()));
}
throw std::runtime_error(fmt::format("Cannot check cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name()));
}
row_summary examine_row(column_kind kind, const row& r, const row_tombstone& tomb) {
row_summary cr;
void examine_row(column_kind kind, const row& r, const row_tombstone& tomb) {
r.for_each_cell([&, this, kind] (column_id id, const atomic_cell_or_collection& cell) {
cr.emplace(id, examine_cell(_schema.column_at(kind, id), cell, tomb));
examine_cell(_schema.column_at(kind, id), cell, tomb);
});
return cr;
}
public:
@@ -2398,23 +2222,23 @@ public:
}
void consume_new_partition(const dht::decorated_key& dk) {
_max_purgeable = _get_max_purgeable(dk);
BOOST_REQUIRE(!_partition_summary);
_partition_summary.emplace(_schema, dk);
BOOST_REQUIRE(!_mutation);
_mutation.emplace(_schema.shared_from_this(), dk);
}
void consume(tombstone t) {
BOOST_REQUIRE(t);
BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(t), OnlyPurged);
BOOST_REQUIRE(_partition_summary);
_partition_summary->tomb = t;
BOOST_REQUIRE(_mutation);
_mutation->partition().apply(t);
}
stop_iteration consume(static_row&& sr, tombstone tomb, bool is_live) {
BOOST_REQUIRE(!OnlyPurged || !is_live);
auto compacted_cells = examine_row(column_kind::static_column, sr.cells(), row_tombstone(tomb));
examine_row(column_kind::static_column, sr.cells(), row_tombstone(tomb));
BOOST_REQUIRE(_partition_summary);
_partition_summary->static_row.emplace(static_row_summary{std::move(compacted_cells)});
BOOST_REQUIRE(_mutation);
_mutation->partition().static_row().apply(_schema, column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
}
@@ -2427,512 +2251,47 @@ public:
if (cr.tomb().regular()) {
BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(cr.tomb()), OnlyPurged);
}
auto compacted_cells = examine_row(column_kind::regular_column, cr.cells(), tomb);
examine_row(column_kind::regular_column, cr.cells(), tomb);
BOOST_REQUIRE(_partition_summary);
_partition_summary->clustering_fragments.emplace(clustering_row_summary{cr.key(), cr.marker(), cr.tomb(), std::move(compacted_cells)});
BOOST_REQUIRE(_mutation);
auto& dr = _mutation->partition().clustered_row(_schema, std::move(cr.key()));
dr.apply(_schema, std::move(cr).as_deletable_row());
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(rt.tomb), OnlyPurged);
BOOST_REQUIRE(_partition_summary);
_partition_summary->clustering_fragments.emplace(rt);
BOOST_REQUIRE(_mutation);
_mutation->partition().apply_row_tombstone(_schema, std::move(rt));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
BOOST_REQUIRE(_partition_summary);
_partition_summaries.emplace_back(std::move(*_partition_summary));
_partition_summary.reset();
BOOST_REQUIRE(_mutation);
_mutations.emplace_back(std::move(*_mutation));
_mutation.reset();
return stop_iteration::no;
}
std::vector<partition_summary> consume_end_of_stream() {
BOOST_REQUIRE(!_partition_summary);
std::vector<mutation> consume_end_of_stream() {
BOOST_REQUIRE(!_mutation);
return _partition_summaries;
return _mutations;
}
};
using survived_compacted_fragments_consumer = basic_compacted_fragments_consumer_base<false>;
using purged_compacted_fragments_consumer = basic_compacted_fragments_consumer_base<true>;
template <typename ForwardIt, typename TriCompare>
/// Iterates two ordered ranges in a lockstep.
///
/// For two ranges:
/// [1, 2, 4, 6, 7, 8]
/// [1, 3, 6, 7]
/// The iterator will dereference to:
/// {1, 1}
/// {2, null}
/// {null, 3}
/// {4, null}
/// {6, 6}
/// {7, 7}
/// {8, null}
/// FIXME: not a proper iterator as the iterated-over range is predetermined at
/// construction time. Good enough for the purposes of this test.
class lockstep_ordered_iterator {
public:
using underlying_pointer = typename std::iterator_traits<ForwardIt>::pointer;
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = std::pair<underlying_pointer, underlying_pointer>;
using pointer = value_type*;
using reference = value_type&;
private:
ForwardIt _it1;
ForwardIt _end1;
ForwardIt _it2;
ForwardIt _end2;
TriCompare _tri_cmp;
mutable std::optional<value_type> _current_value;
private:
void materialize() const {
if (_current_value) {
return;
}
_current_value.emplace(nullptr, nullptr);
if (_it1 == _end1 || _it2 == _end2) {
if (_it1 != _end1) {
_current_value->first = &*_it1;
} else {
_current_value->second = &*_it2;
}
return;
}
const auto res = _tri_cmp(*_it1, *_it2);
if (res < 0) {
_current_value->first = &*_it1;
} else if (res == 0) {
_current_value->first = &*_it1;
_current_value->second = &*_it2;
} else { // res > 0
_current_value->second = &*_it2;
}
}
reference dereference() const {
materialize();
return *_current_value;
}
public:
lockstep_ordered_iterator(ForwardIt it1, ForwardIt end1, ForwardIt it2, ForwardIt end2, TriCompare tri_cmp)
: _it1(it1)
, _end1(end1)
, _it2(it2)
, _end2(end2)
, _tri_cmp(std::move(tri_cmp)) {
}
bool operator==(const lockstep_ordered_iterator& o) const {
return _it1 == o._it1 && _end1 == o._end1 && _it2 == o._it2 && _end2 == o._end2;
}
bool operator!=(const lockstep_ordered_iterator& o) const {
return !(*this == o);
}
pointer operator->() const {
return &dereference();
}
reference operator*() const {
return dereference();
}
lockstep_ordered_iterator operator++(int) {
auto it = *this;
++(*this);
return it;
}
lockstep_ordered_iterator& operator++() {
const auto [v1, v2] = dereference();
if (v1) {
++_it1;
}
if (v2) {
++_it2;
}
_current_value.reset();
return *this;
}
};
template <typename Container, typename TriCompare>
auto iterate_over_in_ordered_lockstep(Container& a, Container& b, TriCompare tri_cmp) {
using iterator = decltype(std::begin(a));
return boost::iterator_range<lockstep_ordered_iterator<iterator, TriCompare>>{
lockstep_ordered_iterator<iterator, TriCompare>(std::begin(a), std::end(a), std::begin(b), std::end(b), tri_cmp),
lockstep_ordered_iterator<iterator, TriCompare>(std::end(a), std::end(a), std::end(b), std::end(b), tri_cmp)};
}
template <typename Container, typename OutputIt>
void merge_container(
Container a,
Container b,
OutputIt oit,
std::function<std::strong_ordering (const typename Container::value_type&, const typename Container::value_type&)> tri_cmp,
std::function<typename Container::value_type(typename Container::value_type, typename Container::value_type)> merge_func) {
for (auto [v1, v2] : iterate_over_in_ordered_lockstep(a, b, tri_cmp)) {
if (v1 && v2) {
*oit++ = merge_func(std::move(*v1), std::move(*v2));
} else {
if (v1) {
*oit++ = std::move(*v1);
}
if (v2) {
*oit++ = std::move(*v2);
}
}
}
}
row_summary merge(const schema& schema, column_kind kind, row_summary a, row_summary b) {
row_summary merged;
merge_container(
std::move(a),
std::move(b),
std::inserter(merged, merged.end()),
[] (const std::pair<const column_id, value_summary>& a, const std::pair<const column_id, value_summary>& b) -> std::strong_ordering {
return a.first <=> b.first;
},
[&schema, kind] (std::pair<const column_id, value_summary> a, std::pair<const column_id, value_summary> b) {
const auto& cdef = schema.column_at(kind, a.first);
BOOST_REQUIRE(cdef.type->is_multi_cell() && (cdef.type->is_collection() || cdef.type->is_user_type()));
BOOST_REQUIRE(std::holds_alternative<collection_summary>(a.second));
BOOST_REQUIRE(std::holds_alternative<collection_summary>(b.second));
auto& collection_a = std::get<collection_summary>(a.second);
auto& collection_b = std::get<collection_summary>(b.second);
auto tomb = collection_a.tomb;
tomb.apply(collection_b.tomb);
std::vector<std::pair<bytes, cell_summary>> merged;
for (auto [v1, v2] : iterate_over_in_ordered_lockstep(collection_a.cells, collection_b.cells, collection_element_tri_cmp(*cdef.type))) {
// Individual cells cannot be present in both collections.
BOOST_REQUIRE(!v1 || !v2);
if (v1) {
merged.emplace_back(std::move(*v1));
} else {
merged.emplace_back(std::move(*v2));
}
}
return std::pair(a.first, collection_summary{tomb, std::move(merged)});
});
return merged;
}
std::optional<static_row_summary> merge(const schema& schema, std::optional<static_row_summary> a, std::optional<static_row_summary> b) {
if (!a && !b) {
return {};
}
if (!a || !b) {
return a ? std::move(a) : std::move(b);
}
return static_row_summary{merge(schema, column_kind::static_column, std::move(a->cells), std::move(b->cells))};
}
clustering_row_summary merge(const schema& schema, clustering_row_summary a, clustering_row_summary b) {
if (!a.marker.is_missing() || !b.marker.is_missing()) {
BOOST_REQUIRE(a.marker.is_missing() != b.marker.is_missing());
}
if (a.tomb.regular() || b.tomb.regular()) {
BOOST_REQUIRE(bool(a.tomb.regular()) != bool(b.tomb.regular()));
}
return clustering_row_summary{
std::move(a.key),
(a.marker.is_missing() ? b.marker : a.marker),
(a.tomb.regular() ? a.tomb : b.tomb),
merge(schema, column_kind::regular_column, std::move(a.cells), std::move(b.cells))};
}
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> merge(
const schema& s,
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> a,
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> b) {
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> merged{clustering_fragment_summary::less_cmp(s)};
merge_container(
std::move(a),
std::move(b),
std::inserter(merged, merged.end()),
clustering_fragment_summary::tri_cmp(s),
[&s] (clustering_fragment_summary a, clustering_fragment_summary b) -> clustering_fragment_summary {
BOOST_REQUIRE_EQUAL(a.is_range_tombstone(), b.is_range_tombstone());
if (a.is_range_tombstone()) {
// No need to merge range tombstones.
return a;
}
return merge(s, std::move(a.as_clustering_row()), std::move(b.as_clustering_row()));
});
return merged;
}
std::vector<partition_summary> merge(const schema& s, std::vector<partition_summary> a, std::vector<partition_summary> b) {
std::vector<partition_summary> merged;
merge_container(
std::move(a),
std::move(b),
std::back_inserter(merged),
[&s] (const partition_summary& a, const partition_summary& b) {
return a.key.tri_compare(s, b.key);
},
[&s] (partition_summary a, partition_summary b) {
if (a.tomb || b.tomb) {
BOOST_REQUIRE(bool(a.tomb) != bool(b.tomb));
}
return partition_summary{
a.key,
(a.tomb ? a.tomb : b.tomb),
merge(s, std::move(a.static_row), std::move(b.static_row)),
merge(s, std::move(a.clustering_fragments), std::move(b.clustering_fragments))};
});
return merged;
}
cell_summary summarize_cell(const atomic_cell_view& cell) {
return cell_summary{cell.timestamp()};
}
row_summary summarize_row(const schema& schema, column_kind kind, const row& r) {
row_summary summary;
r.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell_or_collection) {
auto cdef = schema.column_at(kind, id);
if (cdef.type->is_atomic()) {
summary.emplace(id, summarize_cell(cell_or_collection.as_atomic_cell(cdef)));
} else if (cdef.type->is_collection() || cdef.type->is_user_type()) {
auto cell = cell_or_collection.as_collection_mutation();
collection_summary collection;
cell.with_deserialized(*cdef.type, [&] (collection_mutation_view_description m_view) {
collection.tomb = m_view.tomb;
for (const auto& [key, cell] : m_view.cells) {
collection.cells.emplace_back(key, summarize_cell(cell));
}
});
summary.emplace(id, std::move(collection));
} else {
throw std::runtime_error(fmt::format("Cannot summarize cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name()));
}
});
return summary;
}
partition_summary summarize_mutation(const mutation& m) {
const auto& schema = *m.schema();
std::set<clustering_fragment_summary, clustering_fragment_summary::less_cmp> clustering_fragments{clustering_fragment_summary::less_cmp(schema)};
for (const auto& entry : m.partition().clustered_rows()) {
const auto& r = entry.row();
clustering_fragments.emplace(clustering_row_summary(entry.key(), r.marker(), r.deleted_at(),
summarize_row(schema, column_kind::regular_column, r.cells())));
}
for (auto& rt : m.partition().row_tombstones()) {
clustering_fragments.insert(rt.tombstone());
}
return partition_summary(
m.decorated_key(),
m.partition().partition_tombstone(),
m.partition().static_row().empty() ?
std::nullopt :
std::optional(static_row_summary{summarize_row(schema, column_kind::static_column, m.partition().static_row().get())}),
std::move(clustering_fragments));
}
std::vector<partition_summary> summarize_mutations(const std::vector<mutation>& mutations) {
std::vector<partition_summary> summaries;
summaries.reserve(mutations.size());
std::transform(mutations.cbegin(), mutations.cend(), std::back_inserter(summaries), summarize_mutation);
return summaries;
}
struct stats {
size_t partitions = 0;
size_t partition_tombstones = 0;
size_t static_rows = 0;
size_t static_cells = 0;
size_t clustering_rows = 0;
size_t row_markers = 0;
size_t row_tombstones = 0;
size_t clustering_cells = 0;
size_t range_tombstones = 0;
};
std::ostream& operator<<(std::ostream& os, const stats& s) {
os << "stats{";
os << "partitions=" << s.partitions;
os << ", partition_tombstones=" << s.partition_tombstones;
os << ", static_rows=" << s.static_rows;
os << ", static_cells=" << s.static_cells;
os << ", clustering_rows=" << s.clustering_rows;
os << ", row_markers=" << s.row_markers;
os << ", row_tombstones=" << s.row_tombstones;
os << ", clustering_cells=" << s.clustering_cells;
os << ", range_tombstones=" << s.range_tombstones;
os << "}";
return os;
}
stats create_stats(const std::vector<partition_summary>& summaries) {
stats s;
s.partitions = summaries.size();
for (const auto& summary : summaries) {
s.partition_tombstones += size_t(bool(summary.tomb));
if (summary.static_row) {
++s.static_rows;
s.static_cells += summary.static_row->cells.size();
}
for (const auto& cf : summary.clustering_fragments) {
if (cf.is_range_tombstone()) {
++s.range_tombstones;
} else {
const auto& cr = cf.as_clustering_row();
++s.clustering_rows;
s.row_markers += size_t{!cr.marker.is_missing()};
s.row_tombstones += size_t{bool(cr.tomb.regular())};
s.clustering_cells += cr.cells.size();
}
}
}
return s;
}
void check_row_summaries(const schema& schema, column_kind kind, const row_summary& actual, const row_summary& expected, tombstone tomb) {
auto column_tri_cmp = [] (const std::pair<const column_id, value_summary>& a, const std::pair<const column_id, value_summary>& b) {
return a.first - b.first;
};
for (const auto& [actual_column, expected_column] : iterate_over_in_ordered_lockstep(actual, expected, column_tri_cmp)) {
BOOST_REQUIRE(expected_column);
const auto [expected_column_id, expected_cell_or_collection] = *expected_column;
if (!actual_column) {
std::visit(make_visitor(
[&] (const cell_summary& cell) {
BOOST_REQUIRE_LE(cell.timestamp, tomb.timestamp);
},
[&] (const collection_summary& collection) {
BOOST_REQUIRE_LE(collection.tomb.timestamp, tomb.timestamp);
auto t = collection.tomb;
t.apply(tomb);
for (const auto& [key, cell] : collection.cells) {
BOOST_REQUIRE_LE(cell.timestamp, t.timestamp);
}
}),
expected_cell_or_collection);
continue;
}
const auto [actual_column_id, actual_cell_or_collection] = *actual_column;
BOOST_REQUIRE_EQUAL(actual_cell_or_collection.index(), expected_cell_or_collection.index());
if (std::holds_alternative<cell_summary>(expected_cell_or_collection)) {
auto expected_cell = std::get<cell_summary>(expected_cell_or_collection);
auto actual_cell = std::get<cell_summary>(actual_cell_or_collection);
BOOST_REQUIRE_EQUAL(actual_cell.timestamp, expected_cell.timestamp);
} else {
auto cdef = schema.column_at(kind, expected_column_id);
auto expected_collection = std::get<collection_summary>(expected_cell_or_collection);
auto actual_collection = std::get<collection_summary>(actual_cell_or_collection);
auto t = expected_collection.tomb;
if (!actual_collection.tomb) {
BOOST_REQUIRE_LE(actual_collection.tomb.timestamp, tomb.timestamp);
}
t.apply(tomb);
assert(cdef.type->is_multi_cell() && (cdef.type->is_collection() || cdef.type->is_user_type()));
for (auto [actual_element, expected_element] : iterate_over_in_ordered_lockstep(actual_collection.cells, expected_collection.cells,
collection_element_tri_cmp(*cdef.type))) {
BOOST_REQUIRE(expected_element);
if (actual_element) {
BOOST_REQUIRE_EQUAL(actual_element->second.timestamp, expected_element->second.timestamp);
} else {
BOOST_REQUIRE_LE(expected_element->second.timestamp, t.timestamp);
}
}
}
}
}
void check_clustering_row_summaries(const schema& schema, const clustering_row_summary& actual, const clustering_row_summary& expected,
tombstone tomb) {
if (expected.marker.is_missing()) {
BOOST_REQUIRE(actual.marker.is_missing());
} else {
// actual is allowed to be missing the marker only if it is
// covered by a tombstone.
BOOST_REQUIRE(
(actual.marker.timestamp() == expected.marker.timestamp()) ||
(expected.marker.timestamp() <= tomb.timestamp));
}
if (expected.tomb.regular()) {
// actual is allowed to be missing the row tombstone only
// if it is covered by a higher level tombstone.
BOOST_REQUIRE(
(actual.tomb == expected.tomb) ||
(expected.tomb.tomb().timestamp <= tomb.timestamp));
} else {
BOOST_REQUIRE(!expected.tomb.tomb());
}
check_row_summaries(schema, column_kind::regular_column, actual.cells, expected.cells, tomb);
}
void check_clustering_summaries(const schema& schema, const partition_summary& actual, const partition_summary& expected) {
range_tombstone_accumulator range_tombstones(schema);
range_tombstones.set_partition_tombstone(expected.tomb);
for (auto [actual_frag, expected_frag] : iterate_over_in_ordered_lockstep(actual.clustering_fragments, expected.clustering_fragments,
clustering_fragment_summary::tri_cmp(schema))) {
// actual cannot have a position that is not in expected, this would
// mean that a new fragment appeared from thin air while compacting.
BOOST_REQUIRE(expected_frag);
if (expected_frag->is_clustering_row()) {
BOOST_REQUIRE(!actual_frag || actual_frag->is_clustering_row());
const auto& cre = expected_frag->as_clustering_row();
auto tomb = cre.tomb;
tomb.apply(range_tombstones.tombstone_for_row(cre.key));
check_clustering_row_summaries(schema, actual_frag ? actual_frag->as_clustering_row() : clustering_row_summary(cre.key), cre, tomb.tomb());
} else {
const auto& rte = expected_frag->as_range_tombstone();
range_tombstones.apply(expected_frag->as_range_tombstone());
if (actual_frag) {
BOOST_REQUIRE(actual_frag->is_range_tombstone());
BOOST_REQUIRE_EQUAL(actual_frag->as_range_tombstone().tomb.timestamp, rte.tomb.timestamp);
} else {
BOOST_REQUIRE_LE(expected_frag->as_range_tombstone().tomb.timestamp, expected.tomb.timestamp);
}
}
}
}
// Ensure no data was lost in the split. The survived atoms merged with the
// purged atoms should be equivalent to the original (expected) atoms.
// Only atoms that were erased due to being covered by tombstones are allowed
// to be missing.
void check_partition_summaries(const schema& schema, const std::vector<partition_summary>& actual, const std::vector<partition_summary>& expected) {
BOOST_CHECK_EQUAL(actual.size(), expected.size());
for (auto actual_it = actual.cbegin(), expected_it = expected.cbegin(); actual_it != actual.cend() || expected_it != expected.cend();
++actual_it, ++expected_it) {
BOOST_REQUIRE(actual_it->key.equal(schema, expected_it->key));
BOOST_REQUIRE_EQUAL(actual_it->tomb.timestamp, expected_it->tomb.timestamp);
if (expected_it->static_row) {
check_row_summaries(schema, column_kind::static_column, actual_it->static_row.value_or(static_row_summary{}).cells,
expected_it->static_row->cells, expected_it->tomb);
}
check_clustering_summaries(schema, *actual_it, *expected_it);
}
}
void run_compaction_data_stream_split_test(const schema& schema, reader_permit permit, gc_clock::time_point query_time,
const std::vector<mutation>& mutations) {
const auto expected_mutations_summary = summarize_mutations(mutations);
std::vector<mutation> mutations) {
auto never_gc = std::function<bool(tombstone)>([] (tombstone) { return false; });
for (auto& mut : mutations) {
mut.partition().compact_for_compaction(schema, never_gc, mut.decorated_key(), query_time);
}
testlog.info("Original data: {}", create_stats(expected_mutations_summary));
auto reader = make_flat_mutation_reader_from_mutations(schema.shared_from_this(), std::move(permit), std::move(mutations));
auto reader = make_flat_mutation_reader_from_mutations(schema.shared_from_this(), std::move(permit), mutations);
auto close_reader = deferred_close(reader);
auto get_max_purgeable = [] (const dht::decorated_key&) {
return api::max_timestamp;
@@ -2945,16 +2304,25 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p
survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable),
purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable));
auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer)).get0();
auto [survived_muts, purged_muts] = reader.consume(std::move(consumer)).get0();
testlog.info("Survived data: {}", create_stats(survived_partitions));
testlog.info("Purged data: {}", create_stats(purged_partitions));
auto merged_partition_summaries = merge(schema, std::move(survived_partitions), std::move(purged_partitions));
testlog.info("Merged data: {}", create_stats(merged_partition_summaries));
check_partition_summaries(schema, merged_partition_summaries, expected_mutations_summary);
auto survived_muts_it = survived_muts.begin();
const auto survived_muts_end = survived_muts.end();
auto purged_muts_it = purged_muts.begin();
const auto purged_muts_end = purged_muts.end();
for (const auto& expected_mut : mutations) {
const auto& dkey = expected_mut.decorated_key();
auto actual_mut = mutation(schema.shared_from_this(), dkey);
if (survived_muts_it != survived_muts_end && survived_muts_it->decorated_key().equal(schema, dkey)) {
actual_mut.apply(*survived_muts_it++);
}
if (purged_muts_it != purged_muts_end && purged_muts_it->decorated_key().equal(schema, dkey)) {
actual_mut.apply(*purged_muts_it++);
}
BOOST_REQUIRE_EQUAL(actual_mut, expected_mut);
}
BOOST_REQUIRE(survived_muts_it == survived_muts_end);
BOOST_REQUIRE(purged_muts_it == purged_muts_end);
}
} // anonymous namespace