Merge "Make mutation queries respect reversed order" from Tomasz

"Affects CQL statements like the following one:

   select * from table order by ck desc;

Fixes #480."
This commit is contained in:
Avi Kivity
2015-10-22 15:08:19 +03:00
7 changed files with 357 additions and 38 deletions

View File

@@ -23,6 +23,70 @@
#include "mutation_partition.hh"
#include "mutation_partition_applier.hh"
template<bool reversed>
struct reversion_traits;
template<>
struct reversion_traits<false> {
template <typename Container>
static auto begin(Container& c) {
return c.begin();
}
template <typename Container>
static auto end(Container& c) {
return c.end();
}
template <typename Container, typename Disposer>
static typename Container::iterator erase_and_dispose(Container& c,
typename Container::iterator begin,
typename Container::iterator end,
Disposer disposer)
{
return c.erase_and_dispose(begin, end, std::move(disposer));
}
template <typename Container>
static boost::iterator_range<typename Container::iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
{
return r;
}
};
template<>
struct reversion_traits<true> {
template <typename Container>
static auto begin(Container& c) {
return c.rbegin();
}
template <typename Container>
static auto end(Container& c) {
return c.rend();
}
template <typename Container, typename Disposer>
static typename Container::reverse_iterator erase_and_dispose(Container& c,
typename Container::reverse_iterator begin,
typename Container::reverse_iterator end,
Disposer disposer)
{
return typename Container::reverse_iterator(
c.erase_and_dispose(end.base(), begin.base(), disposer)
);
}
template <typename Container>
static boost::iterator_range<typename Container::reverse_iterator> maybe_reverse(
Container& c, boost::iterator_range<typename Container::iterator> r)
{
using reverse_iterator = typename Container::reverse_iterator;
return boost::make_iterator_range(reverse_iterator(r.end()), reverse_iterator(r.begin()));
}
};
mutation_partition::mutation_partition(const mutation_partition& x)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
@@ -570,11 +634,55 @@ row::find_cell(column_id id) const {
}
}
uint32_t mutation_partition::do_compact(const schema& s, gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges, uint32_t row_limit, api::timestamp_type max_purgeable)
template <typename Container>
boost::iterator_range<typename Container::iterator>
unconst(Container& c, boost::iterator_range<typename Container::const_iterator> r) {
return boost::make_iterator_range(
c.erase(r.begin(), r.begin()),
c.erase(r.end(), r.end())
);
}
template<bool reversed, typename Func>
void mutation_partition::trim_rows(const schema& s,
const std::vector<query::clustering_range>& row_ranges,
Func&& func)
{
static_assert(std::is_same<stop_iteration, std::result_of_t<Func(rows_entry&)>>::value, "Bad func signature");
bool stop = false;
auto last = reversion_traits<reversed>::begin(_rows);
auto deleter = current_deleter<rows_entry>();
for (auto&& row_range : row_ranges) {
if (stop) {
break;
}
auto it_range = reversion_traits<reversed>::maybe_reverse(_rows, unconst(_rows, range(s, row_range)));
last = reversion_traits<reversed>::erase_and_dispose(_rows, last, it_range.begin(), deleter);
while (last != it_range.end()) {
rows_entry& e = *last;
if (func(e) == stop_iteration::yes) {
stop = true;
break;
}
++last;
}
}
reversion_traits<reversed>::erase_and_dispose(_rows, last, reversion_traits<reversed>::end(_rows), deleter);
}
uint32_t mutation_partition::do_compact(const schema& s,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool reverse,
uint32_t row_limit,
api::timestamp_type max_purgeable)
{
assert(row_limit > 0);
bool stop = false;
auto gc_before = query_time - s.gc_grace_seconds();
@@ -583,45 +691,38 @@ uint32_t mutation_partition::do_compact(const schema& s, gc_clock::time_point qu
uint32_t row_count = 0;
auto last = _rows.begin();
for (auto&& row_range : row_ranges) {
if (stop) {
break;
}
auto row_callback = [&] (rows_entry& e) {
deletable_row& row = e.row();
auto it_range = range(s, row_range);
last = _rows.erase_and_dispose(last, it_range.begin(), current_deleter<rows_entry>());
tombstone tomb = tombstone_for_row(s, e);
while (last != it_range.end()) {
rows_entry& e = *last;
deletable_row& row = e.row();
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, max_purgeable, gc_before);
is_live |= row.marker().compact_and_expire(tomb, query_time, max_purgeable, gc_before);
tombstone tomb = tombstone_for_row(s, e);
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, max_purgeable, gc_before);
is_live |= row.marker().compact_and_expire(tomb, query_time, max_purgeable, gc_before);
// when row_limit is reached, do not exit immediately,
// iterate to the next live_row instead to include trailing
// tombstones in the mutation. This is how Origin deals with
// https://issues.apache.org/jira/browse/CASSANDRA-8933
if (is_live) {
if (row_count == row_limit) {
stop = true;
break;
}
++row_count;
// when row_limit is reached, do not exit immediately,
// iterate to the next live_row instead to include trailing
// tombstones in the mutation. This is how Origin deals with
// https://issues.apache.org/jira/browse/CASSANDRA-8933
if (is_live) {
if (row_count == row_limit) {
return stop_iteration::yes;
}
++last;
++row_count;
}
return stop_iteration::no;
};
if (reverse) {
trim_rows<true>(s, row_ranges, row_callback);
} else {
trim_rows<false>(s, row_ranges, row_callback);
}
if (row_count == 0 && static_row_live) {
++row_count;
}
_rows.erase_and_dispose(last, _rows.end(), current_deleter<rows_entry>());
auto can_purge_tombstone = [&] (const tombstone& t) {
return t.timestamp < max_purgeable && t.deletion_time < gc_before;
};
@@ -648,9 +749,10 @@ mutation_partition::compact_for_query(
const schema& s,
gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges,
bool reverse,
uint32_t row_limit)
{
return do_compact(s, query_time, row_ranges, row_limit, api::max_timestamp);
return do_compact(s, query_time, row_ranges, reverse, row_limit, api::max_timestamp);
}
void mutation_partition::compact_for_compaction(const schema& s,
@@ -660,7 +762,7 @@ void mutation_partition::compact_for_compaction(const schema& s,
query::clustering_range::make_open_ended_both_sides()
};
do_compact(s, compaction_time, all_rows, query::max_rows, max_purgeable);
do_compact(s, compaction_time, all_rows, false, query::max_rows, max_purgeable);
}
// Returns true if there is no live data or tombstones.

View File

@@ -575,9 +575,21 @@ public:
// Same guarantees as for apply(const schema&, const mutation_partition&).
void apply(const schema& schema, mutation_partition_view);
private:
uint32_t do_compact(const schema& s, gc_clock::time_point now,
const std::vector<query::clustering_range>& row_ranges, uint32_t row_limit,
uint32_t do_compact(const schema& s,
gc_clock::time_point now,
const std::vector<query::clustering_range>& row_ranges,
bool reverse,
uint32_t row_limit,
api::timestamp_type max_purgeable);
// Calls func for each row entry inside row_ranges until func returns stop_iteration::yes.
// Removes all entries for which func didn't return stop_iteration::no or wasn't called at all.
// If reversed is true, func will be called on entries in reverse order. In that case row_ranges
// must be already in reverse order.
template<bool reversed, typename Func>
void trim_rows(const schema& s,
const std::vector<query::clustering_range>& row_ranges,
Func&& func);
public:
// Performs the following:
// - throws out data which doesn't belong to row_ranges
@@ -596,7 +608,7 @@ public:
// The row_limit parameter must be > 0.
//
uint32_t compact_for_query(const schema& s, gc_clock::time_point query_time,
const std::vector<query::clustering_range>& row_ranges, uint32_t row_limit);
const std::vector<query::clustering_range>& row_ranges, bool reversed, uint32_t row_limit);
// Performs the following:
// - expires cells based on compaction_time

View File

@@ -146,8 +146,10 @@ mutation_query(const mutation_source& source,
return consume(state.reader, [&state] (mutation&& m) {
// FIXME: Make data sources respect row_ranges so that we don't have to filter them out here.
auto is_distinct = state.slice.options.contains(query::partition_slice::option::distinct);
auto is_reversed = state.slice.options.contains(query::partition_slice::option::reversed);
auto limit = !is_distinct ? state.limit : 1;
auto rows_left = m.partition().compact_for_query(*m.schema(), state.query_time, state.slice.row_ranges, limit);
auto rows_left = m.partition().compact_for_query(*m.schema(), state.query_time, state.slice.row_ranges,
is_reversed, limit);
state.limit -= rows_left;
// NOTE: We must return all columns, regardless of what's in

View File

@@ -120,3 +120,9 @@ partition_slice_builder::with_static_column(bytes name) {
_static_columns->push_back(def->id);
return *this;
}
partition_slice_builder&
partition_slice_builder::reversed() {
_options.set<query::partition_slice::option::reversed>();
return *this;
}

View File

@@ -50,6 +50,7 @@ public:
partition_slice_builder& with_regular_column(bytes name);
partition_slice_builder& with_no_regular_columns();
partition_slice_builder& with_range(query::clustering_range range);
partition_slice_builder& reversed();
query::partition_slice build();
};

View File

@@ -2603,7 +2603,8 @@ public:
// unfreeze -> trim -> freeze
mutation m = p.mut().unfreeze(_schema);
static const std::vector<query::clustering_range> all(1, query::clustering_range::make_open_ended_both_sides());
auto rc = m.partition().compact_for_query(*_schema, _cmd->timestamp, all, limit_left);
bool is_reversed = _cmd->slice.options.contains(query::partition_slice::option::reversed);
auto rc = m.partition().compact_for_query(*_schema, _cmd->timestamp, all, is_reversed, limit_left);
partitions.push_back(partition(rc, freeze(m)));
row_count += rc;
} else {

View File

@@ -171,6 +171,201 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
});
}
SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
return seastar::async([] {
auto s = make_schema();
auto now = gc_clock::now();
mutation m1(partition_key::from_single_value(*s, "key1"), s);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", bytes("A_v1"), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", bytes("B_v1"), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("C")), "v1", bytes("C_v1"), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("D")), "v1", bytes("D_v1"), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("E")), "v1", bytes("E_v1"), 1);
auto src = make_source({m1});
{
auto slice = partition_slice_builder(*s)
.reversed()
.build();
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("D"))
.with_column("v1", bytes("D_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("C"))
.with_column("v1", bytes("C_v1")));
}
{
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("D"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("D"))
.with_column("v1", bytes("D_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("C"))
.with_column("v1", bytes("C_v1")));
}
{
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range(
{clustering_key_prefix::from_single_value(*s, bytes("C"))},
{clustering_key_prefix::from_single_value(*s, bytes("E"))}))
.reversed()
.build();
{
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 10, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("D"))
.with_column("v1", bytes("D_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("C"))
.with_column("v1", bytes("C_v1")));
}
{
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 1, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(1)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")));
}
{
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 2, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("D"))
.with_column("v1", bytes("D_v1")));
}
}
{
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("D"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 2, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("D"))
.with_column("v1", bytes("D_v1")));
}
{
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("E"))))
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("C"))))
.reversed()
.build();
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("E"))
.with_column("v1", bytes("E_v1")))
.has(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("C"))
.with_column("v1", bytes("C_v1")));
}
{
auto slice = partition_slice_builder(*s)
.with_range(query::clustering_range::make_singular(
clustering_key_prefix::from_single_value(*s, bytes("B"))))
.reversed()
.build();
reconcilable_result result = mutation_query(src,
query::full_partition_range, slice, 3, now).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
.with_column("pk", bytes("key1"))
.with_column("ck", bytes("B"))
.with_column("v1", bytes("B_v1")));
}
});
}
SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
return seastar::async([] {
auto s = make_schema();