diff --git a/mutation_partition.cc b/mutation_partition.cc index b1fc090d89..76675a68d4 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -23,6 +23,70 @@ #include "mutation_partition.hh" #include "mutation_partition_applier.hh" +template +struct reversion_traits; + +template<> +struct reversion_traits { + template + static auto begin(Container& c) { + return c.begin(); + } + + template + static auto end(Container& c) { + return c.end(); + } + + template + static typename Container::iterator erase_and_dispose(Container& c, + typename Container::iterator begin, + typename Container::iterator end, + Disposer disposer) + { + return c.erase_and_dispose(begin, end, std::move(disposer)); + } + + template + static boost::iterator_range maybe_reverse( + Container& c, boost::iterator_range r) + { + return r; + } +}; + +template<> +struct reversion_traits { + template + static auto begin(Container& c) { + return c.rbegin(); + } + + template + static auto end(Container& c) { + return c.rend(); + } + + template + static typename Container::reverse_iterator erase_and_dispose(Container& c, + typename Container::reverse_iterator begin, + typename Container::reverse_iterator end, + Disposer disposer) + { + return typename Container::reverse_iterator( + c.erase_and_dispose(end.base(), begin.base(), disposer) + ); + } + + template + static boost::iterator_range maybe_reverse( + Container& c, boost::iterator_range r) + { + using reverse_iterator = typename Container::reverse_iterator; + return boost::make_iterator_range(reverse_iterator(r.end()), reverse_iterator(r.begin())); + } +}; + mutation_partition::mutation_partition(const mutation_partition& x) : _tombstone(x._tombstone) , _static_row(x._static_row) @@ -570,11 +634,55 @@ row::find_cell(column_id id) const { } } -uint32_t mutation_partition::do_compact(const schema& s, gc_clock::time_point query_time, - const std::vector& row_ranges, uint32_t row_limit, api::timestamp_type max_purgeable) +template +boost::iterator_range +unconst(Container& c, boost::iterator_range r) { + return boost::make_iterator_range( + c.erase(r.begin(), r.begin()), + c.erase(r.end(), r.end()) + ); +} + +template +void mutation_partition::trim_rows(const schema& s, + const std::vector& row_ranges, + Func&& func) +{ + static_assert(std::is_same>::value, "Bad func signature"); + + bool stop = false; + auto last = reversion_traits::begin(_rows); + auto deleter = current_deleter(); + + for (auto&& row_range : row_ranges) { + if (stop) { + break; + } + + auto it_range = reversion_traits::maybe_reverse(_rows, unconst(_rows, range(s, row_range))); + last = reversion_traits::erase_and_dispose(_rows, last, it_range.begin(), deleter); + + while (last != it_range.end()) { + rows_entry& e = *last; + if (func(e) == stop_iteration::yes) { + stop = true; + break; + } + ++last; + } + } + + reversion_traits::erase_and_dispose(_rows, last, reversion_traits::end(_rows), deleter); +} + +uint32_t mutation_partition::do_compact(const schema& s, + gc_clock::time_point query_time, + const std::vector& row_ranges, + bool reverse, + uint32_t row_limit, + api::timestamp_type max_purgeable) { assert(row_limit > 0); - bool stop = false; auto gc_before = query_time - s.gc_grace_seconds(); @@ -583,45 +691,38 @@ uint32_t mutation_partition::do_compact(const schema& s, gc_clock::time_point qu uint32_t row_count = 0; - auto last = _rows.begin(); - for (auto&& row_range : row_ranges) { - if (stop) { - break; - } + auto row_callback = [&] (rows_entry& e) { + deletable_row& row = e.row(); - auto it_range = range(s, row_range); - last = _rows.erase_and_dispose(last, it_range.begin(), current_deleter()); + tombstone tomb = tombstone_for_row(s, e); - while (last != it_range.end()) { - rows_entry& e = *last; - deletable_row& row = e.row(); + bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, max_purgeable, gc_before); + is_live |= row.marker().compact_and_expire(tomb, query_time, max_purgeable, gc_before); - tombstone tomb = tombstone_for_row(s, e); - - bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, max_purgeable, gc_before); - is_live |= row.marker().compact_and_expire(tomb, query_time, max_purgeable, gc_before); - - // when row_limit is reached, do not exit immediately, - // iterate to the next live_row instead to include trailing - // tombstones in the mutation. This is how Origin deals with - // https://issues.apache.org/jira/browse/CASSANDRA-8933 - if (is_live) { - if (row_count == row_limit) { - stop = true; - break; - } - ++row_count; + // when row_limit is reached, do not exit immediately, + // iterate to the next live_row instead to include trailing + // tombstones in the mutation. This is how Origin deals with + // https://issues.apache.org/jira/browse/CASSANDRA-8933 + if (is_live) { + if (row_count == row_limit) { + return stop_iteration::yes; } - ++last; + ++row_count; } + + return stop_iteration::no; + }; + + if (reverse) { + trim_rows(s, row_ranges, row_callback); + } else { + trim_rows(s, row_ranges, row_callback); } if (row_count == 0 && static_row_live) { ++row_count; } - _rows.erase_and_dispose(last, _rows.end(), current_deleter()); - auto can_purge_tombstone = [&] (const tombstone& t) { return t.timestamp < max_purgeable && t.deletion_time < gc_before; }; @@ -648,9 +749,10 @@ mutation_partition::compact_for_query( const schema& s, gc_clock::time_point query_time, const std::vector& row_ranges, + bool reverse, uint32_t row_limit) { - return do_compact(s, query_time, row_ranges, row_limit, api::max_timestamp); + return do_compact(s, query_time, row_ranges, reverse, row_limit, api::max_timestamp); } void mutation_partition::compact_for_compaction(const schema& s, @@ -660,7 +762,7 @@ void mutation_partition::compact_for_compaction(const schema& s, query::clustering_range::make_open_ended_both_sides() }; - do_compact(s, compaction_time, all_rows, query::max_rows, max_purgeable); + do_compact(s, compaction_time, all_rows, false, query::max_rows, max_purgeable); } // Returns true if there is no live data or tombstones. diff --git a/mutation_partition.hh b/mutation_partition.hh index 2f0d7ffc67..4b8231b9e6 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -575,9 +575,21 @@ public: // Same guarantees as for apply(const schema&, const mutation_partition&). void apply(const schema& schema, mutation_partition_view); private: - uint32_t do_compact(const schema& s, gc_clock::time_point now, - const std::vector& row_ranges, uint32_t row_limit, + uint32_t do_compact(const schema& s, + gc_clock::time_point now, + const std::vector& row_ranges, + bool reverse, + uint32_t row_limit, api::timestamp_type max_purgeable); + + // Calls func for each row entry inside row_ranges until func returns stop_iteration::yes. + // Removes all entries for which func didn't return stop_iteration::no or wasn't called at all. + // If reversed is true, func will be called on entries in reverse order. In that case row_ranges + // must be already in reverse order. + template + void trim_rows(const schema& s, + const std::vector& row_ranges, + Func&& func); public: // Performs the following: // - throws out data which doesn't belong to row_ranges @@ -596,7 +608,7 @@ public: // The row_limit parameter must be > 0. // uint32_t compact_for_query(const schema& s, gc_clock::time_point query_time, - const std::vector& row_ranges, uint32_t row_limit); + const std::vector& row_ranges, bool reversed, uint32_t row_limit); // Performs the following: // - expires cells based on compaction_time diff --git a/mutation_query.cc b/mutation_query.cc index d07d6c20e0..e35c90de64 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -146,8 +146,10 @@ mutation_query(const mutation_source& source, return consume(state.reader, [&state] (mutation&& m) { // FIXME: Make data sources respect row_ranges so that we don't have to filter them out here. auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct); + auto is_reversed = state.slice.options.contains(query::partition_slice::option::reversed); auto limit = !is_distinct ? state.limit : 1; - auto rows_left = m.partition().compact_for_query(*m.schema(), state.query_time, state.slice.row_ranges, limit); + auto rows_left = m.partition().compact_for_query(*m.schema(), state.query_time, state.slice.row_ranges, + is_reversed, limit); state.limit -= rows_left; // NOTE: We must return all columns, regardless of what's in diff --git a/partition_slice_builder.cc b/partition_slice_builder.cc index 7617d308ca..cdfd419bb5 100644 --- a/partition_slice_builder.cc +++ b/partition_slice_builder.cc @@ -120,3 +120,9 @@ partition_slice_builder::with_static_column(bytes name) { _static_columns->push_back(def->id); return *this; } + +partition_slice_builder& +partition_slice_builder::reversed() { + _options.set(); + return *this; +} diff --git a/partition_slice_builder.hh b/partition_slice_builder.hh index 2cbe6f0d86..05f17325a8 100644 --- a/partition_slice_builder.hh +++ b/partition_slice_builder.hh @@ -50,6 +50,7 @@ public: partition_slice_builder& with_regular_column(bytes name); partition_slice_builder& with_no_regular_columns(); partition_slice_builder& with_range(query::clustering_range range); + partition_slice_builder& reversed(); query::partition_slice build(); }; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 229fc28bad..493ddf1fab 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2603,7 +2603,8 @@ public: // unfreeze -> trim -> freeze mutation m = p.mut().unfreeze(_schema); static const std::vector all(1, query::clustering_range::make_open_ended_both_sides()); - auto rc = m.partition().compact_for_query(*_schema, _cmd->timestamp, all, limit_left); + bool is_reversed = _cmd->slice.options.contains(query::partition_slice::option::reversed); + auto rc = m.partition().compact_for_query(*_schema, _cmd->timestamp, all, is_reversed, limit_left); partitions.push_back(partition(rc, freeze(m))); row_count += rc; } else { diff --git a/tests/mutation_query_test.cc b/tests/mutation_query_test.cc index 6931afe395..b7e1f1970e 100644 --- a/tests/mutation_query_test.cc +++ b/tests/mutation_query_test.cc @@ -171,6 +171,201 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { }); } +SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { + return seastar::async([] { + auto s = make_schema(); + auto now = gc_clock::now(); + + mutation m1(partition_key::from_single_value(*s, "key1"), s); + + m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", bytes("A_v1"), 1); + m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", bytes("B_v1"), 1); + m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("C")), "v1", bytes("C_v1"), 1); + m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("D")), "v1", bytes("D_v1"), 1); + m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("E")), "v1", bytes("E_v1"), 1); + + auto src = make_source({m1}); + + { + auto slice = partition_slice_builder(*s) + .reversed() + .build(); + + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 3, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(3) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("D")) + .with_column("v1", bytes("D_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("C")) + .with_column("v1", bytes("C_v1"))); + } + + { + auto slice = partition_slice_builder(*s) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("E")))) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("D")))) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("C")))) + .reversed() + .build(); + + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 3, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(3) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("D")) + .with_column("v1", bytes("D_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("C")) + .with_column("v1", bytes("C_v1"))); + } + + { + auto slice = partition_slice_builder(*s) + .with_range(query::clustering_range( + {clustering_key_prefix::from_single_value(*s, bytes("C"))}, + {clustering_key_prefix::from_single_value(*s, bytes("E"))})) + .reversed() + .build(); + + { + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 10, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(3) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("D")) + .with_column("v1", bytes("D_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("C")) + .with_column("v1", bytes("C_v1"))); + } + + { + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 1, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(1) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))); + } + + { + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 2, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(2) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("D")) + .with_column("v1", bytes("D_v1"))); + } + } + + { + auto slice = partition_slice_builder(*s) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("E")))) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("D")))) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("C")))) + .reversed() + .build(); + + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 2, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(2) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("D")) + .with_column("v1", bytes("D_v1"))); + } + + { + auto slice = partition_slice_builder(*s) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("E")))) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("C")))) + .reversed() + .build(); + + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 3, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_size(2) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("E")) + .with_column("v1", bytes("E_v1"))) + .has(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("C")) + .with_column("v1", bytes("C_v1"))); + } + + { + auto slice = partition_slice_builder(*s) + .with_range(query::clustering_range::make_singular( + clustering_key_prefix::from_single_value(*s, bytes("B")))) + .reversed() + .build(); + + reconcilable_result result = mutation_query(src, + query::full_partition_range, slice, 3, now).get0(); + + assert_that(to_result_set(result, s, slice)) + .has_only(a_row() + .with_column("pk", bytes("key1")) + .with_column("ck", bytes("B")) + .with_column("v1", bytes("B_v1"))); + } + }); +} + SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { return seastar::async([] { auto s = make_schema();