From dec4e5659ba01525febab070fbaebf0a6f68111c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 17 Jan 2022 15:15:51 +0200 Subject: [PATCH] test/boost/mutation_test: simplify test_compaction_data_stream_split test This test has very elaborate infrastructure essentially duplicating mutation, mutation::apply() and mutation::operator==. Drop all this extra code and use mutations directly instead. This makes migrating the test to v2 easier. --- test/boost/mutation_test.cc | 732 +++--------------------------------- 1 file changed, 50 insertions(+), 682 deletions(-) diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index ee42976ace..46ee0d79d6 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2148,176 +2148,6 @@ SEASTAR_THREAD_TEST_CASE(test_collection_compaction) { namespace { -struct cell_summary { - api::timestamp_type timestamp; -}; - -struct collection_summary { - tombstone tomb; - std::vector> cells; -}; - -using value_summary = std::variant; - -using row_summary = std::map; - -struct static_row_summary { - row_summary cells; -}; - -struct clustering_row_summary { - clustering_key key; - row_marker marker; - row_tombstone tomb; - row_summary cells; - - explicit clustering_row_summary(clustering_key key) : key(std::move(key)) - { } - clustering_row_summary(clustering_key key, row_marker marker, row_tombstone tomb, row_summary cells) - : key(std::move(key)), marker(marker), tomb(tomb), cells(std::move(cells)) - { } -}; - -class clustering_fragment_summary { -public: - class tri_cmp; - class less_cmp; - -private: - std::variant _value; - -public: - clustering_fragment_summary(clustering_row_summary cr) : _value(std::move(cr)) { } - clustering_fragment_summary(range_tombstone rt) : _value(std::move(rt)) { } - - const clustering_key_prefix& key() const { - return std::visit(make_visitor( - [] (const clustering_row_summary& cr) -> const clustering_key& { - return cr.key; - }, - [] (const range_tombstone& rt) -> const clustering_key& { - return rt.start; - }), - _value); - } - position_in_partition_view position() const { - return std::visit(make_visitor( - [] (const clustering_row_summary& cr) { - return position_in_partition_view::for_key(cr.key); - }, - [] (const range_tombstone& rt) { - return rt.position(); - }), - _value); - } - bool is_range_tombstone() const { - return std::holds_alternative(_value); - } - bool is_clustering_row() const { - return std::holds_alternative(_value); - } - const range_tombstone& as_range_tombstone() const { - return std::get(_value); - } - const clustering_row_summary& as_clustering_row() const { - return std::get(_value); - } - range_tombstone& as_range_tombstone() { - return std::get(_value); - } - clustering_row_summary& as_clustering_row() { - return std::get(_value); - } -}; - -class clustering_fragment_summary::tri_cmp { - position_in_partition::tri_compare _pos_tri_cmp; - bound_view::tri_compare _bv_cmp; - - std::strong_ordering rt_tri_cmp(const range_tombstone& a, const range_tombstone& b) const { - auto start_bound_cmp = _pos_tri_cmp(a.position(), b.position()); - if (start_bound_cmp != 0) { - return start_bound_cmp; - } - // Range tombstones can have the same start position. In this case use - // the end bound to decide who's "less". - return _bv_cmp(a.end_bound(), b.end_bound()); - } - -public: - explicit tri_cmp(const schema& schema) : _pos_tri_cmp(schema), _bv_cmp(schema) { } - - std::strong_ordering operator()(const clustering_fragment_summary& a, const clustering_fragment_summary& b) const { - if (const auto res = _pos_tri_cmp(a.position(), b.position()); res != 0) { - return res; - } - if (a.is_range_tombstone() && b.is_range_tombstone()) { - return rt_tri_cmp(a.as_range_tombstone(), b.as_range_tombstone()); - } - // Sort range tombstones before clustering rows - if (a.is_range_tombstone() || b.is_range_tombstone()) { - return int(b.is_range_tombstone()) <=> int(a.is_range_tombstone()); - } - return std::strong_ordering::equal; // two clustering rows - } -}; - -class clustering_fragment_summary::less_cmp { - clustering_fragment_summary::tri_cmp _tri_cmp; -public: - explicit less_cmp(const schema& schema) : _tri_cmp(schema) { } - bool operator()(const clustering_fragment_summary& a, const clustering_fragment_summary& b) const { - return _tri_cmp(a, b) < 0; - } -}; - -using collection_element_tri_cmp_type = std::function&, const std::pair&)>; - -collection_element_tri_cmp_type -collection_element_tri_cmp(const abstract_type& type) { - return visit(type, make_visitor( - [] (const collection_type_impl& ctype) -> collection_element_tri_cmp_type { - return [tri_cmp = serialized_tri_compare(ctype.name_comparator()->as_tri_comparator())] - (const std::pair& a, const std::pair& b) { - return tri_cmp(a.first, b.first); - }; - }, - [] (const user_type_impl& utype) -> collection_element_tri_cmp_type { - return [] (const std::pair& a, const std::pair& b) { - auto ai = deserialize_field_index(a.first); - auto bi = deserialize_field_index(b.first); - return ai <=> bi; - }; - }, - [] (const abstract_type& o) -> collection_element_tri_cmp_type { - BOOST_FAIL(format("collection_element_tri_cmp: unknown type {}", o.name())); - __builtin_unreachable(); - } - )); -} - -struct partition_summary { - dht::decorated_key key; - tombstone tomb; - std::optional static_row; - std::set clustering_fragments; - - partition_summary(const schema& s, dht::decorated_key dk) - : key(std::move(dk)) - , clustering_fragments(clustering_fragment_summary::less_cmp(s)) { - } - partition_summary( - dht::decorated_key dk, - tombstone tomb, - std::optional static_row, - std::set clustering_fragments) - : key(std::move(dk)) - , tomb(tomb) - , static_row(std::move(static_row)) - , clustering_fragments(std::move(clustering_fragments)) { - } -}; - template class basic_compacted_fragments_consumer_base { const schema& _schema; @@ -2326,8 +2156,8 @@ class basic_compacted_fragments_consumer_base { std::function _get_max_purgeable; api::timestamp_type _max_purgeable; - std::vector _partition_summaries; - std::optional _partition_summary; + std::vector _mutations; + std::optional _mutation; private: bool can_gc(tombstone t) { @@ -2351,21 +2181,18 @@ private: cell.deletion_time() < _gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time())); } - value_summary examine_cell(const column_definition& cdef, const atomic_cell_or_collection& cell_or_collection, const row_tombstone& tomb) { + void examine_cell(const column_definition& cdef, const atomic_cell_or_collection& cell_or_collection, const row_tombstone& tomb) { if (cdef.type->is_atomic()) { auto cell = cell_or_collection.as_atomic_cell(cdef); if constexpr (OnlyPurged) { BOOST_REQUIRE(!cell.is_covered_by(tomb.tomb(), cdef.is_counter())); } BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged); - return cell_summary{cell.timestamp()}; } else if (cdef.type->is_collection() || cdef.type->is_user_type()) { auto cell = cell_or_collection.as_collection_mutation(); - collection_summary summary; cell.with_deserialized(*cdef.type, [&] (collection_mutation_view_description m_view) { BOOST_REQUIRE(m_view.tomb.timestamp == api::missing_timestamp || m_view.tomb.timestamp > tomb.tomb().timestamp || is_tombstone_purgeable(m_view.tomb) == OnlyPurged); - summary.tomb = m_view.tomb; auto t = m_view.tomb; t.apply(tomb.tomb()); for (const auto& [key, cell] : m_view.cells) { @@ -2373,19 +2200,16 @@ private: BOOST_REQUIRE(!cell.is_covered_by(t, false)); } BOOST_REQUIRE_EQUAL(is_cell_purgeable(cell), OnlyPurged); - summary.cells.emplace_back(std::pair(key, cell_summary{cell.timestamp()})); } }); - return std::move(summary); + } else { + throw std::runtime_error(fmt::format("Cannot check cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name())); } - throw std::runtime_error(fmt::format("Cannot check cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name())); } - row_summary examine_row(column_kind kind, const row& r, const row_tombstone& tomb) { - row_summary cr; + void examine_row(column_kind kind, const row& r, const row_tombstone& tomb) { r.for_each_cell([&, this, kind] (column_id id, const atomic_cell_or_collection& cell) { - cr.emplace(id, examine_cell(_schema.column_at(kind, id), cell, tomb)); + examine_cell(_schema.column_at(kind, id), cell, tomb); }); - return cr; } public: @@ -2398,23 +2222,23 @@ public: } void consume_new_partition(const dht::decorated_key& dk) { _max_purgeable = _get_max_purgeable(dk); - BOOST_REQUIRE(!_partition_summary); - _partition_summary.emplace(_schema, dk); + BOOST_REQUIRE(!_mutation); + _mutation.emplace(_schema.shared_from_this(), dk); } void consume(tombstone t) { BOOST_REQUIRE(t); BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(t), OnlyPurged); - BOOST_REQUIRE(_partition_summary); - _partition_summary->tomb = t; + BOOST_REQUIRE(_mutation); + _mutation->partition().apply(t); } stop_iteration consume(static_row&& sr, tombstone tomb, bool is_live) { BOOST_REQUIRE(!OnlyPurged || !is_live); - auto compacted_cells = examine_row(column_kind::static_column, sr.cells(), row_tombstone(tomb)); + examine_row(column_kind::static_column, sr.cells(), row_tombstone(tomb)); - BOOST_REQUIRE(_partition_summary); - _partition_summary->static_row.emplace(static_row_summary{std::move(compacted_cells)}); + BOOST_REQUIRE(_mutation); + _mutation->partition().static_row().apply(_schema, column_kind::static_column, std::move(sr.cells())); return stop_iteration::no; } @@ -2427,512 +2251,47 @@ public: if (cr.tomb().regular()) { BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(cr.tomb()), OnlyPurged); } - auto compacted_cells = examine_row(column_kind::regular_column, cr.cells(), tomb); + examine_row(column_kind::regular_column, cr.cells(), tomb); - BOOST_REQUIRE(_partition_summary); - _partition_summary->clustering_fragments.emplace(clustering_row_summary{cr.key(), cr.marker(), cr.tomb(), std::move(compacted_cells)}); + BOOST_REQUIRE(_mutation); + auto& dr = _mutation->partition().clustered_row(_schema, std::move(cr.key())); + dr.apply(_schema, std::move(cr).as_deletable_row()); return stop_iteration::no; } stop_iteration consume(range_tombstone&& rt) { BOOST_REQUIRE_EQUAL(is_tombstone_purgeable(rt.tomb), OnlyPurged); - BOOST_REQUIRE(_partition_summary); - _partition_summary->clustering_fragments.emplace(rt); + BOOST_REQUIRE(_mutation); + _mutation->partition().apply_row_tombstone(_schema, std::move(rt)); return stop_iteration::no; } stop_iteration consume_end_of_partition() { - BOOST_REQUIRE(_partition_summary); - _partition_summaries.emplace_back(std::move(*_partition_summary)); - _partition_summary.reset(); + BOOST_REQUIRE(_mutation); + _mutations.emplace_back(std::move(*_mutation)); + _mutation.reset(); return stop_iteration::no; } - std::vector consume_end_of_stream() { - BOOST_REQUIRE(!_partition_summary); + std::vector consume_end_of_stream() { + BOOST_REQUIRE(!_mutation); - return _partition_summaries; + return _mutations; } }; using survived_compacted_fragments_consumer = basic_compacted_fragments_consumer_base; using purged_compacted_fragments_consumer = basic_compacted_fragments_consumer_base; -template -/// Iterates two ordered ranges in a lockstep. -/// -/// For two ranges: -/// [1, 2, 4, 6, 7, 8] -/// [1, 3, 6, 7] -/// The iterator will dereference to: -/// {1, 1} -/// {2, null} -/// {null, 3} -/// {4, null} -/// {6, 6} -/// {7, 7} -/// {8, null} -/// FIXME: not a proper iterator as the iterated-over range is predetermined at -/// construction time. Good enough for the purposes of this test. -class lockstep_ordered_iterator { -public: - using underlying_pointer = typename std::iterator_traits::pointer; - using iterator_category = std::forward_iterator_tag; - using difference_type = std::ptrdiff_t; - using value_type = std::pair; - using pointer = value_type*; - using reference = value_type&; - -private: - ForwardIt _it1; - ForwardIt _end1; - ForwardIt _it2; - ForwardIt _end2; - TriCompare _tri_cmp; - mutable std::optional _current_value; - -private: - void materialize() const { - if (_current_value) { - return; - } - _current_value.emplace(nullptr, nullptr); - if (_it1 == _end1 || _it2 == _end2) { - if (_it1 != _end1) { - _current_value->first = &*_it1; - } else { - _current_value->second = &*_it2; - } - return; - } - const auto res = _tri_cmp(*_it1, *_it2); - if (res < 0) { - _current_value->first = &*_it1; - } else if (res == 0) { - _current_value->first = &*_it1; - _current_value->second = &*_it2; - } else { // res > 0 - _current_value->second = &*_it2; - } - } - reference dereference() const { - materialize(); - return *_current_value; - } - -public: - lockstep_ordered_iterator(ForwardIt it1, ForwardIt end1, ForwardIt it2, ForwardIt end2, TriCompare tri_cmp) - : _it1(it1) - , _end1(end1) - , _it2(it2) - , _end2(end2) - , _tri_cmp(std::move(tri_cmp)) { - } - - bool operator==(const lockstep_ordered_iterator& o) const { - return _it1 == o._it1 && _end1 == o._end1 && _it2 == o._it2 && _end2 == o._end2; - } - bool operator!=(const lockstep_ordered_iterator& o) const { - return !(*this == o); - } - pointer operator->() const { - return &dereference(); - } - reference operator*() const { - return dereference(); - } - lockstep_ordered_iterator operator++(int) { - auto it = *this; - ++(*this); - return it; - } - lockstep_ordered_iterator& operator++() { - const auto [v1, v2] = dereference(); - if (v1) { - ++_it1; - } - if (v2) { - ++_it2; - } - _current_value.reset(); - return *this; - } -}; - -template -auto iterate_over_in_ordered_lockstep(Container& a, Container& b, TriCompare tri_cmp) { - using iterator = decltype(std::begin(a)); - return boost::iterator_range>{ - lockstep_ordered_iterator(std::begin(a), std::end(a), std::begin(b), std::end(b), tri_cmp), - lockstep_ordered_iterator(std::end(a), std::end(a), std::end(b), std::end(b), tri_cmp)}; -} - -template -void merge_container( - Container a, - Container b, - OutputIt oit, - std::function tri_cmp, - std::function merge_func) { - for (auto [v1, v2] : iterate_over_in_ordered_lockstep(a, b, tri_cmp)) { - if (v1 && v2) { - *oit++ = merge_func(std::move(*v1), std::move(*v2)); - } else { - if (v1) { - *oit++ = std::move(*v1); - } - if (v2) { - *oit++ = std::move(*v2); - } - } - } -} - -row_summary merge(const schema& schema, column_kind kind, row_summary a, row_summary b) { - row_summary merged; - merge_container( - std::move(a), - std::move(b), - std::inserter(merged, merged.end()), - [] (const std::pair& a, const std::pair& b) -> std::strong_ordering { - return a.first <=> b.first; - }, - [&schema, kind] (std::pair a, std::pair b) { - const auto& cdef = schema.column_at(kind, a.first); - BOOST_REQUIRE(cdef.type->is_multi_cell() && (cdef.type->is_collection() || cdef.type->is_user_type())); - - BOOST_REQUIRE(std::holds_alternative(a.second)); - BOOST_REQUIRE(std::holds_alternative(b.second)); - auto& collection_a = std::get(a.second); - auto& collection_b = std::get(b.second); - - auto tomb = collection_a.tomb; - tomb.apply(collection_b.tomb); - std::vector> merged; - for (auto [v1, v2] : iterate_over_in_ordered_lockstep(collection_a.cells, collection_b.cells, collection_element_tri_cmp(*cdef.type))) { - // Individual cells cannot be present in both collections. - BOOST_REQUIRE(!v1 || !v2); - if (v1) { - merged.emplace_back(std::move(*v1)); - } else { - merged.emplace_back(std::move(*v2)); - } - } - return std::pair(a.first, collection_summary{tomb, std::move(merged)}); - }); - return merged; -} - -std::optional merge(const schema& schema, std::optional a, std::optional b) { - if (!a && !b) { - return {}; - } - if (!a || !b) { - return a ? std::move(a) : std::move(b); - } - return static_row_summary{merge(schema, column_kind::static_column, std::move(a->cells), std::move(b->cells))}; -} - -clustering_row_summary merge(const schema& schema, clustering_row_summary a, clustering_row_summary b) { - if (!a.marker.is_missing() || !b.marker.is_missing()) { - BOOST_REQUIRE(a.marker.is_missing() != b.marker.is_missing()); - } - if (a.tomb.regular() || b.tomb.regular()) { - BOOST_REQUIRE(bool(a.tomb.regular()) != bool(b.tomb.regular())); - } - return clustering_row_summary{ - std::move(a.key), - (a.marker.is_missing() ? b.marker : a.marker), - (a.tomb.regular() ? a.tomb : b.tomb), - merge(schema, column_kind::regular_column, std::move(a.cells), std::move(b.cells))}; -} - -std::set merge( - const schema& s, - std::set a, - std::set b) { - std::set merged{clustering_fragment_summary::less_cmp(s)}; - merge_container( - std::move(a), - std::move(b), - std::inserter(merged, merged.end()), - clustering_fragment_summary::tri_cmp(s), - [&s] (clustering_fragment_summary a, clustering_fragment_summary b) -> clustering_fragment_summary { - BOOST_REQUIRE_EQUAL(a.is_range_tombstone(), b.is_range_tombstone()); - if (a.is_range_tombstone()) { - // No need to merge range tombstones. - return a; - } - return merge(s, std::move(a.as_clustering_row()), std::move(b.as_clustering_row())); - }); - return merged; -} - -std::vector merge(const schema& s, std::vector a, std::vector b) { - std::vector merged; - merge_container( - std::move(a), - std::move(b), - std::back_inserter(merged), - [&s] (const partition_summary& a, const partition_summary& b) { - return a.key.tri_compare(s, b.key); - }, - [&s] (partition_summary a, partition_summary b) { - if (a.tomb || b.tomb) { - BOOST_REQUIRE(bool(a.tomb) != bool(b.tomb)); - } - return partition_summary{ - a.key, - (a.tomb ? a.tomb : b.tomb), - merge(s, std::move(a.static_row), std::move(b.static_row)), - merge(s, std::move(a.clustering_fragments), std::move(b.clustering_fragments))}; - }); - return merged; -} - -cell_summary summarize_cell(const atomic_cell_view& cell) { - return cell_summary{cell.timestamp()}; -} - -row_summary summarize_row(const schema& schema, column_kind kind, const row& r) { - row_summary summary; - r.for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell_or_collection) { - auto cdef = schema.column_at(kind, id); - if (cdef.type->is_atomic()) { - summary.emplace(id, summarize_cell(cell_or_collection.as_atomic_cell(cdef))); - } else if (cdef.type->is_collection() || cdef.type->is_user_type()) { - auto cell = cell_or_collection.as_collection_mutation(); - collection_summary collection; - cell.with_deserialized(*cdef.type, [&] (collection_mutation_view_description m_view) { - collection.tomb = m_view.tomb; - for (const auto& [key, cell] : m_view.cells) { - collection.cells.emplace_back(key, summarize_cell(cell)); - } - }); - summary.emplace(id, std::move(collection)); - } else { - throw std::runtime_error(fmt::format("Cannot summarize cell {} of unknown type {}", cdef.name_as_text(), cdef.type->name())); - } - }); - return summary; -} - -partition_summary summarize_mutation(const mutation& m) { - const auto& schema = *m.schema(); - std::set clustering_fragments{clustering_fragment_summary::less_cmp(schema)}; - for (const auto& entry : m.partition().clustered_rows()) { - const auto& r = entry.row(); - clustering_fragments.emplace(clustering_row_summary(entry.key(), r.marker(), r.deleted_at(), - summarize_row(schema, column_kind::regular_column, r.cells()))); - } - for (auto& rt : m.partition().row_tombstones()) { - clustering_fragments.insert(rt.tombstone()); - } - return partition_summary( - m.decorated_key(), - m.partition().partition_tombstone(), - m.partition().static_row().empty() ? - std::nullopt : - std::optional(static_row_summary{summarize_row(schema, column_kind::static_column, m.partition().static_row().get())}), - std::move(clustering_fragments)); -} - -std::vector summarize_mutations(const std::vector& mutations) { - std::vector summaries; - summaries.reserve(mutations.size()); - std::transform(mutations.cbegin(), mutations.cend(), std::back_inserter(summaries), summarize_mutation); - return summaries; -} - -struct stats { - size_t partitions = 0; - size_t partition_tombstones = 0; - size_t static_rows = 0; - size_t static_cells = 0; - size_t clustering_rows = 0; - size_t row_markers = 0; - size_t row_tombstones = 0; - size_t clustering_cells = 0; - size_t range_tombstones = 0; -}; - -std::ostream& operator<<(std::ostream& os, const stats& s) { - os << "stats{"; - os << "partitions=" << s.partitions; - os << ", partition_tombstones=" << s.partition_tombstones; - os << ", static_rows=" << s.static_rows; - os << ", static_cells=" << s.static_cells; - os << ", clustering_rows=" << s.clustering_rows; - os << ", row_markers=" << s.row_markers; - os << ", row_tombstones=" << s.row_tombstones; - os << ", clustering_cells=" << s.clustering_cells; - os << ", range_tombstones=" << s.range_tombstones; - os << "}"; - return os; -} - -stats create_stats(const std::vector& summaries) { - stats s; - - s.partitions = summaries.size(); - for (const auto& summary : summaries) { - s.partition_tombstones += size_t(bool(summary.tomb)); - if (summary.static_row) { - ++s.static_rows; - s.static_cells += summary.static_row->cells.size(); - } - - for (const auto& cf : summary.clustering_fragments) { - if (cf.is_range_tombstone()) { - ++s.range_tombstones; - } else { - const auto& cr = cf.as_clustering_row(); - ++s.clustering_rows; - s.row_markers += size_t{!cr.marker.is_missing()}; - s.row_tombstones += size_t{bool(cr.tomb.regular())}; - s.clustering_cells += cr.cells.size(); - } - } - } - - return s; -} - -void check_row_summaries(const schema& schema, column_kind kind, const row_summary& actual, const row_summary& expected, tombstone tomb) { - auto column_tri_cmp = [] (const std::pair& a, const std::pair& b) { - return a.first - b.first; - }; - for (const auto& [actual_column, expected_column] : iterate_over_in_ordered_lockstep(actual, expected, column_tri_cmp)) { - BOOST_REQUIRE(expected_column); - const auto [expected_column_id, expected_cell_or_collection] = *expected_column; - if (!actual_column) { - std::visit(make_visitor( - [&] (const cell_summary& cell) { - BOOST_REQUIRE_LE(cell.timestamp, tomb.timestamp); - }, - [&] (const collection_summary& collection) { - BOOST_REQUIRE_LE(collection.tomb.timestamp, tomb.timestamp); - auto t = collection.tomb; - t.apply(tomb); - for (const auto& [key, cell] : collection.cells) { - BOOST_REQUIRE_LE(cell.timestamp, t.timestamp); - } - }), - expected_cell_or_collection); - continue; - } - const auto [actual_column_id, actual_cell_or_collection] = *actual_column; - BOOST_REQUIRE_EQUAL(actual_cell_or_collection.index(), expected_cell_or_collection.index()); - - if (std::holds_alternative(expected_cell_or_collection)) { - auto expected_cell = std::get(expected_cell_or_collection); - auto actual_cell = std::get(actual_cell_or_collection); - BOOST_REQUIRE_EQUAL(actual_cell.timestamp, expected_cell.timestamp); - } else { - auto cdef = schema.column_at(kind, expected_column_id); - auto expected_collection = std::get(expected_cell_or_collection); - auto actual_collection = std::get(actual_cell_or_collection); - auto t = expected_collection.tomb; - if (!actual_collection.tomb) { - BOOST_REQUIRE_LE(actual_collection.tomb.timestamp, tomb.timestamp); - } - t.apply(tomb); - - assert(cdef.type->is_multi_cell() && (cdef.type->is_collection() || cdef.type->is_user_type())); - for (auto [actual_element, expected_element] : iterate_over_in_ordered_lockstep(actual_collection.cells, expected_collection.cells, - collection_element_tri_cmp(*cdef.type))) { - BOOST_REQUIRE(expected_element); - if (actual_element) { - BOOST_REQUIRE_EQUAL(actual_element->second.timestamp, expected_element->second.timestamp); - } else { - BOOST_REQUIRE_LE(expected_element->second.timestamp, t.timestamp); - } - } - } - } -} - -void check_clustering_row_summaries(const schema& schema, const clustering_row_summary& actual, const clustering_row_summary& expected, - tombstone tomb) { - if (expected.marker.is_missing()) { - BOOST_REQUIRE(actual.marker.is_missing()); - } else { - // actual is allowed to be missing the marker only if it is - // covered by a tombstone. - BOOST_REQUIRE( - (actual.marker.timestamp() == expected.marker.timestamp()) || - (expected.marker.timestamp() <= tomb.timestamp)); - } - if (expected.tomb.regular()) { - // actual is allowed to be missing the row tombstone only - // if it is covered by a higher level tombstone. - BOOST_REQUIRE( - (actual.tomb == expected.tomb) || - (expected.tomb.tomb().timestamp <= tomb.timestamp)); - } else { - BOOST_REQUIRE(!expected.tomb.tomb()); - } - check_row_summaries(schema, column_kind::regular_column, actual.cells, expected.cells, tomb); -} - -void check_clustering_summaries(const schema& schema, const partition_summary& actual, const partition_summary& expected) { - range_tombstone_accumulator range_tombstones(schema); - range_tombstones.set_partition_tombstone(expected.tomb); - - for (auto [actual_frag, expected_frag] : iterate_over_in_ordered_lockstep(actual.clustering_fragments, expected.clustering_fragments, - clustering_fragment_summary::tri_cmp(schema))) { - // actual cannot have a position that is not in expected, this would - // mean that a new fragment appeared from thin air while compacting. - BOOST_REQUIRE(expected_frag); - - if (expected_frag->is_clustering_row()) { - BOOST_REQUIRE(!actual_frag || actual_frag->is_clustering_row()); - const auto& cre = expected_frag->as_clustering_row(); - auto tomb = cre.tomb; - tomb.apply(range_tombstones.tombstone_for_row(cre.key)); - check_clustering_row_summaries(schema, actual_frag ? actual_frag->as_clustering_row() : clustering_row_summary(cre.key), cre, tomb.tomb()); - } else { - const auto& rte = expected_frag->as_range_tombstone(); - range_tombstones.apply(expected_frag->as_range_tombstone()); - if (actual_frag) { - BOOST_REQUIRE(actual_frag->is_range_tombstone()); - BOOST_REQUIRE_EQUAL(actual_frag->as_range_tombstone().tomb.timestamp, rte.tomb.timestamp); - } else { - BOOST_REQUIRE_LE(expected_frag->as_range_tombstone().tomb.timestamp, expected.tomb.timestamp); - } - } - } -} - -// Ensure no data was lost in the split. The survived atoms merged with the -// purged atoms should be equivalent to the original (expected) atoms. -// Only atoms that were erased due to being covered by tombstones are allowed -// to be missing. -void check_partition_summaries(const schema& schema, const std::vector& actual, const std::vector& expected) { - BOOST_CHECK_EQUAL(actual.size(), expected.size()); - - for (auto actual_it = actual.cbegin(), expected_it = expected.cbegin(); actual_it != actual.cend() || expected_it != expected.cend(); - ++actual_it, ++expected_it) { - BOOST_REQUIRE(actual_it->key.equal(schema, expected_it->key)); - BOOST_REQUIRE_EQUAL(actual_it->tomb.timestamp, expected_it->tomb.timestamp); - - if (expected_it->static_row) { - check_row_summaries(schema, column_kind::static_column, actual_it->static_row.value_or(static_row_summary{}).cells, - expected_it->static_row->cells, expected_it->tomb); - } - - check_clustering_summaries(schema, *actual_it, *expected_it); - } -} - void run_compaction_data_stream_split_test(const schema& schema, reader_permit permit, gc_clock::time_point query_time, - const std::vector& mutations) { - const auto expected_mutations_summary = summarize_mutations(mutations); + std::vector mutations) { + auto never_gc = std::function([] (tombstone) { return false; }); + for (auto& mut : mutations) { + mut.partition().compact_for_compaction(schema, never_gc, mut.decorated_key(), query_time); + } - testlog.info("Original data: {}", create_stats(expected_mutations_summary)); - - auto reader = make_flat_mutation_reader_from_mutations(schema.shared_from_this(), std::move(permit), std::move(mutations)); + auto reader = make_flat_mutation_reader_from_mutations(schema.shared_from_this(), std::move(permit), mutations); auto close_reader = deferred_close(reader); auto get_max_purgeable = [] (const dht::decorated_key&) { return api::max_timestamp; @@ -2945,16 +2304,25 @@ void run_compaction_data_stream_split_test(const schema& schema, reader_permit p survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable), purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable)); - auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer)).get0(); + auto [survived_muts, purged_muts] = reader.consume(std::move(consumer)).get0(); - testlog.info("Survived data: {}", create_stats(survived_partitions)); - testlog.info("Purged data: {}", create_stats(purged_partitions)); - - auto merged_partition_summaries = merge(schema, std::move(survived_partitions), std::move(purged_partitions)); - - testlog.info("Merged data: {}", create_stats(merged_partition_summaries)); - - check_partition_summaries(schema, merged_partition_summaries, expected_mutations_summary); + auto survived_muts_it = survived_muts.begin(); + const auto survived_muts_end = survived_muts.end(); + auto purged_muts_it = purged_muts.begin(); + const auto purged_muts_end = purged_muts.end(); + for (const auto& expected_mut : mutations) { + const auto& dkey = expected_mut.decorated_key(); + auto actual_mut = mutation(schema.shared_from_this(), dkey); + if (survived_muts_it != survived_muts_end && survived_muts_it->decorated_key().equal(schema, dkey)) { + actual_mut.apply(*survived_muts_it++); + } + if (purged_muts_it != purged_muts_end && purged_muts_it->decorated_key().equal(schema, dkey)) { + actual_mut.apply(*purged_muts_it++); + } + BOOST_REQUIRE_EQUAL(actual_mut, expected_mut); + } + BOOST_REQUIRE(survived_muts_it == survived_muts_end); + BOOST_REQUIRE(purged_muts_it == purged_muts_end); } } // anonymous namespace