query: add flag to return static row on partition with no rows
A SELECT statement that has clustering key restrictions isn't supposed to return static content if no regular rows matches the restrictions, see #589. However, for the CAS statement we do need to return static content on failure so this patch adds a flag that allows the caller to override this behavior.
This commit is contained in:
@@ -113,9 +113,10 @@ mutation::query(query::result::builder& builder,
|
||||
{
|
||||
auto pb = builder.add_partition(*schema(), key());
|
||||
auto is_reversed = slice.options.contains<query::partition_slice::option::reversed>();
|
||||
auto always_return_static_content = slice.options.contains<query::partition_slice::option::always_return_static_content>();
|
||||
mutation_partition& p = partition();
|
||||
auto limit = std::min(row_limit, slice.partition_row_limit());
|
||||
p.compact_for_query(*schema(), now, slice.row_ranges(*schema(), key()), is_reversed, limit);
|
||||
p.compact_for_query(*schema(), now, slice.row_ranges(*schema(), key()), always_return_static_content, is_reversed, limit);
|
||||
p.query_compacted(pb, *schema(), limit);
|
||||
}
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ class compact_mutation_state {
|
||||
bool _empty_partition_in_gc_consumer{};
|
||||
const dht::decorated_key* _dk{};
|
||||
dht::decorated_key _last_dk;
|
||||
bool _has_ck_selector{};
|
||||
bool _return_static_content_on_partition_with_no_rows{};
|
||||
|
||||
std::optional<static_row> _last_static_row;
|
||||
|
||||
@@ -251,7 +251,9 @@ public:
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
auto& pk = dk.key();
|
||||
_dk = &dk;
|
||||
_has_ck_selector = has_ck_selector(_slice.row_ranges(_schema, pk));
|
||||
_return_static_content_on_partition_with_no_rows =
|
||||
_slice.options.contains(query::partition_slice::option::always_return_static_content) ||
|
||||
!has_ck_selector(_slice.row_ranges(_schema, pk));
|
||||
_empty_partition = true;
|
||||
_empty_partition_in_gc_consumer = true;
|
||||
_rows_in_current_partition = 0;
|
||||
@@ -392,7 +394,8 @@ public:
|
||||
if (!_empty_partition) {
|
||||
// #589 - Do not add extra row for statics unless we did a CK range-less query.
|
||||
// See comment in query
|
||||
if (_rows_in_current_partition == 0 && _static_row_live && !_has_ck_selector) {
|
||||
if (_rows_in_current_partition == 0 && _static_row_live &&
|
||||
_return_static_content_on_partition_with_no_rows) {
|
||||
++_rows_in_current_partition;
|
||||
}
|
||||
|
||||
|
||||
@@ -932,8 +932,11 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|
||||
// #589
|
||||
// If ck:s exist, and we do a restriction on them, we either have maching
|
||||
// rows, or return nothing, since cql does not allow "is null".
|
||||
bool return_static_content_on_partition_with_no_rows =
|
||||
pw.slice().options.contains(query::partition_slice::option::always_return_static_content) ||
|
||||
!has_ck_selector(pw.ranges());
|
||||
if (row_count == 0
|
||||
&& (has_ck_selector(pw.ranges())
|
||||
&& (!return_static_content_on_partition_with_no_rows
|
||||
|| !has_any_live_data(s, column_kind::static_column, static_row().get()))) {
|
||||
pw.retract();
|
||||
} else {
|
||||
@@ -1349,6 +1352,7 @@ void mutation_partition::trim_rows(const schema& s,
|
||||
uint32_t mutation_partition::do_compact(const schema& s,
|
||||
gc_clock::time_point query_time,
|
||||
const std::vector<query::clustering_range>& row_ranges,
|
||||
bool always_return_static_content,
|
||||
bool reverse,
|
||||
uint32_t row_limit,
|
||||
can_gc_fn& can_gc)
|
||||
@@ -1395,7 +1399,8 @@ uint32_t mutation_partition::do_compact(const schema& s,
|
||||
|
||||
// #589 - Do not add extra row for statics unless we did a CK range-less query.
|
||||
// See comment in query
|
||||
if (row_count == 0 && static_row_live && !has_ck_selector(row_ranges)) {
|
||||
bool return_static_content_on_partition_with_no_rows = always_return_static_content || !has_ck_selector(row_ranges);
|
||||
if (row_count == 0 && static_row_live && return_static_content_on_partition_with_no_rows) {
|
||||
++row_count;
|
||||
}
|
||||
|
||||
@@ -1416,11 +1421,12 @@ mutation_partition::compact_for_query(
|
||||
const schema& s,
|
||||
gc_clock::time_point query_time,
|
||||
const std::vector<query::clustering_range>& row_ranges,
|
||||
bool always_return_static_content,
|
||||
bool reverse,
|
||||
uint32_t row_limit)
|
||||
{
|
||||
check_schema(s);
|
||||
return do_compact(s, query_time, row_ranges, reverse, row_limit, always_gc);
|
||||
return do_compact(s, query_time, row_ranges, always_return_static_content, reverse, row_limit, always_gc);
|
||||
}
|
||||
|
||||
void mutation_partition::compact_for_compaction(const schema& s,
|
||||
@@ -1431,7 +1437,7 @@ void mutation_partition::compact_for_compaction(const schema& s,
|
||||
query::clustering_range::make_open_ended_both_sides()
|
||||
};
|
||||
|
||||
do_compact(s, compaction_time, all_rows, false, query::max_rows, can_gc);
|
||||
do_compact(s, compaction_time, all_rows, true, false, query::max_rows, can_gc);
|
||||
}
|
||||
|
||||
// Returns true if the mutation_partition represents no writes.
|
||||
@@ -2067,8 +2073,10 @@ uint32_t mutation_querier::consume_end_of_stream() {
|
||||
// #589
|
||||
// If ck:s exist, and we do a restriction on them, we either have maching
|
||||
// rows, or return nothing, since cql does not allow "is null".
|
||||
if (!_live_clustering_rows
|
||||
&& (has_ck_selector(_pw.ranges()) || !_live_data_in_static_row)) {
|
||||
bool return_static_content_on_partition_with_no_rows =
|
||||
_pw.slice().options.contains(query::partition_slice::option::always_return_static_content) ||
|
||||
!has_ck_selector(_pw.ranges());
|
||||
if (!_live_clustering_rows && (!return_static_content_on_partition_with_no_rows || !_live_data_in_static_row)) {
|
||||
_pw.retract();
|
||||
return 0;
|
||||
} else {
|
||||
@@ -2163,7 +2171,9 @@ future<> data_query(
|
||||
}
|
||||
|
||||
void reconcilable_result_builder::consume_new_partition(const dht::decorated_key& dk) {
|
||||
_has_ck_selector = has_ck_selector(_slice.row_ranges(_schema, dk.key()));
|
||||
_return_static_content_on_partition_with_no_rows =
|
||||
_slice.options.contains(query::partition_slice::option::always_return_static_content) ||
|
||||
!has_ck_selector(_slice.row_ranges(_schema, dk.key()));
|
||||
_static_row_is_alive = false;
|
||||
_live_rows = 0;
|
||||
auto is_reversed = _slice.options.contains(query::partition_slice::option::reversed);
|
||||
@@ -2200,7 +2210,7 @@ stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) {
|
||||
}
|
||||
|
||||
stop_iteration reconcilable_result_builder::consume_end_of_partition() {
|
||||
if (_live_rows == 0 && _static_row_is_alive && !_has_ck_selector) {
|
||||
if (_live_rows == 0 && _static_row_is_alive && _return_static_content_on_partition_with_no_rows) {
|
||||
++_live_rows;
|
||||
// Normally we count only live clustering rows, to guarantee that
|
||||
// the next page fetch won't ask for the same range. However,
|
||||
|
||||
@@ -1350,6 +1350,7 @@ private:
|
||||
uint32_t do_compact(const schema& s,
|
||||
gc_clock::time_point now,
|
||||
const std::vector<query::clustering_range>& row_ranges,
|
||||
bool always_return_static_content,
|
||||
bool reverse,
|
||||
uint32_t row_limit,
|
||||
can_gc_fn&);
|
||||
@@ -1381,7 +1382,8 @@ public:
|
||||
// The row_limit parameter must be > 0.
|
||||
//
|
||||
uint32_t compact_for_query(const schema& s, gc_clock::time_point query_time,
|
||||
const std::vector<query::clustering_range>& row_ranges, bool reversed, uint32_t row_limit);
|
||||
const std::vector<query::clustering_range>& row_ranges, bool always_return_static_content,
|
||||
bool reversed, uint32_t row_limit);
|
||||
|
||||
// Performs the following:
|
||||
// - expires cells based on compaction_time
|
||||
|
||||
@@ -116,7 +116,7 @@ class reconcilable_result_builder {
|
||||
utils::chunked_vector<partition> _result;
|
||||
uint32_t _live_rows{};
|
||||
|
||||
bool _has_ck_selector{};
|
||||
bool _return_static_content_on_partition_with_no_rows{};
|
||||
bool _static_row_is_alive{};
|
||||
uint32_t _total_live_rows = 0;
|
||||
query::result_memory_accounter _memory_accounter;
|
||||
|
||||
@@ -123,8 +123,23 @@ constexpr auto max_rows = std::numeric_limits<uint32_t>::max();
|
||||
// Schema-dependent.
|
||||
class partition_slice {
|
||||
public:
|
||||
enum class option { send_clustering_key, send_partition_key, send_timestamp, send_expiry, reversed, distinct, collections_as_maps, send_ttl,
|
||||
allow_short_read, with_digest, bypass_cache };
|
||||
enum class option {
|
||||
send_clustering_key,
|
||||
send_partition_key,
|
||||
send_timestamp,
|
||||
send_expiry,
|
||||
reversed,
|
||||
distinct,
|
||||
collections_as_maps,
|
||||
send_ttl,
|
||||
allow_short_read,
|
||||
with_digest,
|
||||
bypass_cache,
|
||||
// Normally, we don't return static row if the request has clustering
|
||||
// key restrictions and the partition doesn't have any rows matching
|
||||
// the restrictions, see #589. This flag overrides this behavior.
|
||||
always_return_static_content,
|
||||
};
|
||||
using option_set = enum_set<super_enum<option,
|
||||
option::send_clustering_key,
|
||||
option::send_partition_key,
|
||||
@@ -136,7 +151,8 @@ public:
|
||||
option::send_ttl,
|
||||
option::allow_short_read,
|
||||
option::with_digest,
|
||||
option::bypass_cache>>;
|
||||
option::bypass_cache,
|
||||
option::always_return_static_content>>;
|
||||
clustering_row_ranges _row_ranges;
|
||||
public:
|
||||
column_id_vector static_columns; // TODO: consider using bitmap
|
||||
|
||||
@@ -2590,7 +2590,8 @@ private:
|
||||
const auto& m = m_a_rc.mut;
|
||||
auto mp = mutation_partition(s, m.partition());
|
||||
auto&& ranges = cmd.slice.row_ranges(s, m.key());
|
||||
mp.compact_for_query(s, cmd.timestamp, ranges, is_reversed, limit);
|
||||
bool always_return_static_content = cmd.slice.options.contains<query::partition_slice::option::always_return_static_content>();
|
||||
mp.compact_for_query(s, cmd.timestamp, ranges, always_return_static_content, is_reversed, limit);
|
||||
|
||||
std::optional<clustering_key> ck;
|
||||
if (!mp.clustered_rows().empty()) {
|
||||
@@ -2621,6 +2622,7 @@ private:
|
||||
const primary_key& last_reconciled_row, std::vector<mutation_and_live_row_count>& rp,
|
||||
const std::vector<std::vector<version>>& versions, bool is_reversed) {
|
||||
bool short_reads_allowed = cmd.slice.options.contains<query::partition_slice::option::allow_short_read>();
|
||||
bool always_return_static_content = cmd.slice.options.contains<query::partition_slice::option::always_return_static_content>();
|
||||
primary_key::less_compare cmp(s, is_reversed);
|
||||
std::optional<primary_key> shortest_read;
|
||||
auto num_replicas = versions[0].size();
|
||||
@@ -2656,7 +2658,8 @@ private:
|
||||
std::vector<query::clustering_range> ranges;
|
||||
ranges.emplace_back(is_reversed ? query::clustering_range::make_starting_with(std::move(*shortest_read->clustering))
|
||||
: query::clustering_range::make_ending_with(std::move(*shortest_read->clustering)));
|
||||
it->live_row_count = it->mut.partition().compact_for_query(s, cmd.timestamp, ranges, is_reversed, query::max_rows);
|
||||
it->live_row_count = it->mut.partition().compact_for_query(s, cmd.timestamp, ranges, always_return_static_content,
|
||||
is_reversed, query::max_rows);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -261,7 +261,7 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
m = m1;
|
||||
m.apply(m4);
|
||||
m.partition().compact_for_query(*s, gc_clock::now(), { query::clustering_range::make_singular(ck) },
|
||||
false, query::max_rows);
|
||||
false, false, query::max_rows);
|
||||
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
|
||||
BOOST_REQUIRE(m.partition().static_row().empty());
|
||||
|
||||
|
||||
@@ -1527,11 +1527,11 @@ SEASTAR_TEST_CASE(test_trim_rows) {
|
||||
|
||||
auto compact_and_expect_empty = [&] (mutation m, std::vector<query::clustering_range> ranges) {
|
||||
mutation m2 = m;
|
||||
m.partition().compact_for_query(*s, now, ranges, false, query::max_rows);
|
||||
m.partition().compact_for_query(*s, now, ranges, false, false, query::max_rows);
|
||||
BOOST_REQUIRE(m.partition().clustered_rows().empty());
|
||||
|
||||
std::reverse(ranges.begin(), ranges.end());
|
||||
m2.partition().compact_for_query(*s, now, ranges, true, query::max_rows);
|
||||
m2.partition().compact_for_query(*s, now, ranges, false, true, query::max_rows);
|
||||
BOOST_REQUIRE(m2.partition().clustered_rows().empty());
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user