Merge 'query: allow replica to provide arbitrary continue position' from Botond Dénes

Currently, we use the last row in the query result set as the position where the query is continued from on the next page. Since only live rows make it into query result set, this mandates the query to be stopped on a live row on the replica, lest any dead rows or tombstones processed after the live rows, would have to be re-processed on the next page (and the saved reader would have to be thrown away due to position mismatch). This requirement of having to stop on a live row is problematic with datasets which have lots of dead rows or tombstones, especially if these form a prefix. In the extreme case, a query can time out before it can process a single live row and the data-set becomes effectively unreadable until compaction gets rid of the tombstones.
This series prepares the way for the solution: it allows the replica to determine what position the query should continue from on the next page. This position can be that of a dead row, if the query stopped on a dead row. For now, the replica supplies the same position that would have been obtained with looking at the last row in the result set, this series merely introduces the infrastructure for transferring a position together with the query result, and it prepares the paging logic to make use of this position. If the coordinator is not prepared for the new field, it will simply fall-back to the old way of looking at the last row in the result set. As I said for now this is still the same as the content of the new field so there is no problem in mixed clusters.

Refs: https://github.com/scylladb/scylla/issues/3672
Refs: https://github.com/scylladb/scylla/issues/7689
Refs: https://github.com/scylladb/scylla/issues/7933

Tests: manual upgrade test.
I wrote a data set with:
```
./scylla-bench -mode=write -workload=sequential -replication-factor=3 -nodes 127.0.0.1,127.0.0.2,127.0.0.3 -clustering-row-count=10000 -clustering-row-size=8096 -partition-count=1000
```
This creates large, 80MB partitions, which should fill many pages if read in full. Then I started a read workload:
```
./scylla-bench -mode=read -workload=uniform -replication-factor=3 -nodes 127.0.0.1,127.0.0.2,127.0.0.3 -clustering-row-count=10000 -duration=10m -rows-per-request=9000 -page-size=100
```
I confirmed that paging is happening as expected, then upgraded the nodes one-by-one to this PR (while the read-load was ongoing). I observed no read errors or any other errors in the logs.

Closes #10829

* github.com:scylladb/scylla:
  query: have replica provide the last position
  idl/query: add last_position to query_result
  mutlishard_mutation_query: propagate compaction state to result builder
  multishard_mutation_query: defer creating result builder until needed
  querier: use full_position instead of ad-hoc struct
  querier: rely on compactor for position tracking
  mutation_compactor: add current_full_position() convenience accessor
  mutation_compactor: s/_last_clustering_pos/_last_pos/
  mutation_compactor: add state accessor to compact_mutation
  introduce full_position
  idl: move position_in_partition into own header
  service/paging: use position_in_partition instead of clustering_key for last row
  alternator/serialization: extract value object parsing logic
  service/pagers/query_pagers.cc: fix indentation
  position_in_partition: add to_string(partition_region) and parse_partition_region()
  mutation_fragment.hh: move operator<<(partition_region) to position_in_partition.hh
This commit is contained in:
Avi Kivity
2022-06-27 12:23:21 +03:00
30 changed files with 599 additions and 409 deletions

View File

@@ -3605,9 +3605,9 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
++exploded_pk_it;
}
auto ck = paging_state.get_clustering_key();
if (ck) {
auto exploded_ck = ck->explode();
auto pos = paging_state.get_position_in_partition();
if (pos.has_key()) {
auto exploded_ck = pos.key().explode();
auto exploded_ck_it = exploded_ck.begin();
for (const column_definition& cdef : schema.clustering_key_columns()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
@@ -3616,6 +3616,10 @@ static rjson::value encode_paging_state(const schema& schema, const service::pag
++exploded_ck_it;
}
}
rjson::add_with_string_name(last_evaluated_key, scylla_paging_region, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_region.data()], "S", rjson::from_string(to_string(pos.region())));
rjson::add_with_string_name(last_evaluated_key, scylla_paging_weight, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_weight.data()], "N", static_cast<int>(pos.get_bound_weight()));
return last_evaluated_key;
}
@@ -3639,11 +3643,11 @@ static future<executor::request_return_type> do_query(service::storage_proxy& pr
if (exclusive_start_key) {
partition_key pk = pk_from_json(*exclusive_start_key, schema);
std::optional<clustering_key> ck;
auto pos = position_in_partition(position_in_partition::partition_start_tag_t());
if (schema->clustering_key_size() > 0) {
ck = ck_from_json(*exclusive_start_key, schema);
pos = pos_from_json(*exclusive_start_key, schema);
}
paging_state = make_lw_shared<service::pager::paging_state>(pk, ck, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, utils::UUID(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
auto regular_columns = boost::copy_range<query::column_id_vector>(

View File

@@ -14,6 +14,7 @@
#include "rapidjson/writer.h"
#include "concrete_types.hh"
#include "cql3/type_json.hh"
#include "position_in_partition.hh"
static logging::logger slogger("alternator-serialization");
@@ -162,31 +163,42 @@ bytes get_key_column_value(const rjson::value& item, const column_definition& co
}
// Parses the JSON encoding for a key value, which is a map with a single
// entry, whose key is the type (expected to match the key column's type)
// and the value is the encoded value.
bytes get_key_from_typed_value(const rjson::value& key_typed_value, const column_definition& column) {
// entry whose key is the type and the value is the encoded value.
// If this type does not match the desired "type_str", an api_error::validation
// error is thrown (the "name" parameter is the name of the column which will
// mentioned in the exception message).
// If the type does match, a reference to the encoded value is returned.
static const rjson::value& get_typed_value(const rjson::value& key_typed_value, std::string_view type_str, std::string_view name, std::string_view value_name) {
if (!key_typed_value.IsObject() || key_typed_value.MemberCount() != 1 ||
!key_typed_value.MemberBegin()->value.IsString()) {
throw api_error::validation(
format("Malformed value object for key column {}: {}",
column.name_as_text(), key_typed_value));
format("Malformed value object for {} {}: {}",
value_name, name, key_typed_value));
}
auto it = key_typed_value.MemberBegin();
if (it->name != type_to_string(column.type)) {
if (rjson::to_string_view(it->name) != type_str) {
throw api_error::validation(
format("Type mismatch: expected type {} for key column {}, got type {}",
type_to_string(column.type), column.name_as_text(), it->name));
format("Type mismatch: expected type {} for {} {}, got type {}",
type_str, value_name, name, it->name));
}
std::string_view value_view = rjson::to_string_view(it->value);
return it->value;
}
// Parses the JSON encoding for a key value, which is a map with a single
// entry, whose key is the type (expected to match the key column's type)
// and the value is the encoded value.
bytes get_key_from_typed_value(const rjson::value& key_typed_value, const column_definition& column) {
auto& value = get_typed_value(key_typed_value, type_to_string(column.type), column.name_as_text(), "key column");
std::string_view value_view = rjson::to_string_view(value);
if (value_view.empty()) {
throw api_error::validation(
format("The AttributeValue for a key attribute cannot contain an empty string value. Key: {}", column.name_as_text()));
}
if (column.type == bytes_type) {
return rjson::base64_decode(it->value);
return rjson::base64_decode(value);
} else {
return column.type->from_string(rjson::to_string_view(it->value));
return column.type->from_string(value_view);
}
}
@@ -237,6 +249,36 @@ clustering_key ck_from_json(const rjson::value& item, schema_ptr schema) {
return clustering_key::from_exploded(raw_ck);
}
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema) {
auto ck = ck_from_json(item, schema);
const auto region_item = rjson::find(item, scylla_paging_region);
const auto weight_item = rjson::find(item, scylla_paging_weight);
if (bool(region_item) != bool(weight_item)) {
throw api_error::validation("Malformed value object: region and weight has to be either both missing or both present");
}
partition_region region;
bound_weight weight;
if (region_item) {
auto region_view = rjson::to_string_view(get_typed_value(*region_item, "S", scylla_paging_region, "key region"));
auto weight_view = rjson::to_string_view(get_typed_value(*weight_item, "N", scylla_paging_weight, "key weight"));
auto region = parse_partition_region(region_view);
if (weight_view == "-1") {
weight = bound_weight::before_all_prefixed;
} else if (weight_view == "0") {
weight = bound_weight::equal;
} else if (weight_view == "1") {
weight = bound_weight::after_all_prefixed;
} else {
throw std::runtime_error(fmt::format("Invalid value for weight: {}", weight_view));
}
return position_in_partition(region, weight, region == partition_region::clustered ? std::optional(std::move(ck)) : std::nullopt);
}
if (ck.is_empty()) {
return position_in_partition(position_in_partition::partition_start_tag_t());
}
return position_in_partition::for_key(std::move(ck));
}
big_decimal unwrap_number(const rjson::value& v, std::string_view diagnostic) {
if (!v.IsObject() || v.MemberCount() != 1) {
throw api_error::validation(format("{}: invalid number object", diagnostic));

View File

@@ -17,6 +17,8 @@
#include "utils/rjson.hh"
#include "utils/big_decimal.hh"
class position_in_partition;
namespace alternator {
enum class alternator_type : int8_t {
@@ -33,6 +35,9 @@ struct type_representation {
data_type dtype;
};
inline constexpr std::string_view scylla_paging_region(":scylla:paging:region");
inline constexpr std::string_view scylla_paging_weight(":scylla:paging:weight");
type_info type_info_from_string(std::string_view type);
type_representation represent_type(alternator_type atype);
@@ -47,6 +52,7 @@ rjson::value json_key_column_value(bytes_view cell, const column_definition& col
partition_key pk_from_json(const rjson::value& item, schema_ptr schema);
clustering_key ck_from_json(const rjson::value& item, schema_ptr schema);
position_in_partition pos_from_json(const rjson::value& item, schema_ptr schema);
// If v encodes a number (i.e., it is a {"N": [...]}, returns an object representing it. Otherwise,
// raises ValidationException with diagnostic.

View File

@@ -1135,6 +1135,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/forward_request.idl.hh',
'idl/replica_exception.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
'idl/position_in_partition.idl.hh',
]
rusts = [

View File

@@ -989,9 +989,9 @@ lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement
return paging_state;
}
auto&& last_partition_and_clustering_key = result_view.get_last_partition_and_clustering_key();
auto& last_base_pk = std::get<0>(last_partition_and_clustering_key);
auto& last_base_ck = std::get<1>(last_partition_and_clustering_key);
auto&& last_pos = result_view.get_last_position();
auto& last_base_pk = last_pos.partition;
auto* last_base_ck = last_pos.position.has_key() ? &last_pos.position.key() : nullptr;
bytes_opt indexed_column_value = _used_index_restrictions->value_for(*cdef, options);

40
full_position.hh Normal file
View File

@@ -0,0 +1,40 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "keys.hh"
#include "position_in_partition.hh"
struct full_position;
struct full_position_view {
const partition_key_view partition;
const position_in_partition_view position;
full_position_view(const full_position&);
full_position_view(const partition_key&, const position_in_partition_view);
};
struct full_position {
partition_key partition;
position_in_partition position;
full_position(full_position_view);
full_position(partition_key, position_in_partition);
operator full_position_view() {
return full_position_view(partition, position);
}
};
inline full_position_view::full_position_view(const full_position& fp) : partition(fp.partition), position(fp.position) { }
inline full_position_view::full_position_view(const partition_key& pk, const position_in_partition_view pos) : partition(pk), position(pos) { }
inline full_position::full_position(full_position_view fpv) : partition(fpv.partition), position(fpv.position) { }
inline full_position::full_position(partition_key pk, position_in_partition pos) : partition(std::move(pk)), position(pos) { }

View File

@@ -6,6 +6,19 @@ enum class read_repair_decision : uint8_t {
};
}
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
}
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
partition_end,
};
namespace service {
namespace pager {
class paging_state {
@@ -18,6 +31,8 @@ class paging_state {
uint32_t get_rows_fetched_for_last_partition_low_bits() [[version 3.1]] = 0;
uint32_t get_remaining_high_bits() [[version 4.3]] = 0;
uint32_t get_rows_fetched_for_last_partition_high_bits() [[version 4.3]] = 0;
bound_weight get_clustering_key_weight() [[version 5.1]] = bound_weight::equal;
partition_region get_partition_region() [[version 5.1]] = partition_region::clustered;
};
}
}

View File

@@ -10,25 +10,6 @@ class repair_hash {
uint64_t hash;
};
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
};
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
partition_end,
};
class position_in_partition {
partition_region get_type();
bound_weight get_bound_weight();
std::optional<clustering_key_prefix> get_clustering_key_prefix();
};
struct partition_key_and_mutation_fragments {
partition_key get_key();
std::list<frozen_mutation_fragment> get_mutation_fragments();

View File

@@ -0,0 +1,31 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
enum class bound_weight : int8_t {
before_all_prefixed = -1,
equal = 0,
after_all_prefixed = 1,
};
enum class partition_region : uint8_t {
partition_start,
static_row,
clustered,
partition_end,
};
class position_in_partition {
partition_region get_type();
bound_weight get_bound_weight();
std::optional<clustering_key_prefix> get_clustering_key_prefix();
};
struct full_position {
partition_key partition;
position_in_partition position;
};

View File

@@ -27,6 +27,7 @@ class qr_partition stub [[writable]] {
class query_result stub [[writable]] {
utils::chunked_vector<qr_partition> partitions; // in ring order
std::optional<full_position> last_position [[version 5.1]];
};
enum class digest_algorithm : uint8_t {

View File

@@ -44,6 +44,7 @@
#include "service/raft/messaging.hh"
#include "replica/exceptions.hh"
#include "serializer.hh"
#include "full_position.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
@@ -59,6 +60,7 @@
#include "idl/gossip_digest.dist.hh"
#include "idl/read_command.dist.hh"
#include "idl/range.dist.hh"
#include "idl/position_in_partition.dist.hh"
#include "idl/partition_checksum.dist.hh"
#include "idl/query.dist.hh"
#include "idl/cache_temperature.dist.hh"
@@ -89,6 +91,7 @@
#include "idl/gossip_digest.dist.impl.hh"
#include "idl/read_command.dist.impl.hh"
#include "idl/range.dist.impl.hh"
#include "idl/position_in_partition.dist.impl.hh"
#include "idl/query.dist.impl.hh"
#include "idl/cache_temperature.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"

View File

@@ -205,7 +205,7 @@ class read_context : public reader_lifecycle_policy_v2 {
dismantle_buffer_stats dismantle_combined_buffer(flat_mutation_reader_v2::tracked_buffer combined_buffer, const dht::decorated_key& pkey);
dismantle_buffer_stats dismantle_compaction_state(detached_compaction_state compaction_state);
future<> save_reader(shard_id shard, const dht::decorated_key& last_pkey, const std::optional<clustering_key_prefix>& last_ckey);
future<> save_reader(shard_id shard, const dht::decorated_key& last_pkey, position_in_partition_view last_pos);
public:
read_context(distributed<replica::database>& db, schema_ptr s, const query::read_command& cmd, const dht::partition_range_vector& ranges,
@@ -275,7 +275,7 @@ public:
future<> lookup_readers(db::timeout_clock::time_point timeout) noexcept;
future<> save_readers(flat_mutation_reader_v2::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
std::optional<clustering_key_prefix> last_ckey) noexcept;
position_in_partition last_pos) noexcept;
future<> stop();
};
@@ -474,10 +474,10 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
return stats;
}
future<> read_context::save_reader(shard_id shard, const dht::decorated_key& last_pkey, const std::optional<clustering_key_prefix>& last_ckey) {
return do_with(std::exchange(_readers[shard], {}), [this, shard, &last_pkey, &last_ckey] (reader_meta& rm) mutable {
future<> read_context::save_reader(shard_id shard, const dht::decorated_key& last_pkey, position_in_partition_view last_pos) {
return do_with(std::exchange(_readers[shard], {}), [this, shard, &last_pkey, last_pos] (reader_meta& rm) mutable {
return _db.invoke_on(shard, [this, query_uuid = _cmd.query_uuid, query_ranges = _ranges, &rm,
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (replica::database& db) mutable {
&last_pkey, last_pos, gts = tracing::global_trace_state_ptr(_trace_state)] (replica::database& db) mutable {
try {
auto rparts = rm.rparts.release(); // avoid another round-trip when destroying rparts
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rparts->handle));
@@ -518,7 +518,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
std::move(*reader),
std::move(rparts->permit),
last_pkey,
last_ckey);
position_in_partition(last_pos));
db.get_querier_cache().insert(query_uuid, std::move(querier), gts.get());
@@ -584,7 +584,7 @@ future<> read_context::lookup_readers(db::timeout_clock::time_point timeout) noe
}
future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unconsumed_buffer, detached_compaction_state compaction_state,
std::optional<clustering_key_prefix> last_ckey) noexcept {
position_in_partition last_pos) noexcept {
if (_cmd.query_uuid == utils::UUID{}) {
co_return;
}
@@ -597,10 +597,10 @@ future<> read_context::save_readers(flat_mutation_reader_v2::tracked_buffer unco
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
co_await parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) {
co_await parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_pos] (shard_id shard) {
auto& rm = _readers[shard];
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
return save_reader(shard, last_pkey, last_ckey);
return save_reader(shard, last_pkey, last_pos);
}
return make_ready_future<>();
@@ -615,15 +615,13 @@ using compact_for_result_state = compact_for_query_state_v2<ResultType::only_liv
template <typename ResultBuilder>
requires std::is_nothrow_move_constructible_v<typename ResultBuilder::result_type>
struct page_consume_result {
std::optional<clustering_key_prefix> last_ckey;
typename ResultBuilder::result_type result;
flat_mutation_reader_v2::tracked_buffer unconsumed_fragments;
lw_shared_ptr<compact_for_result_state<ResultBuilder>> compaction_state;
page_consume_result(std::optional<clustering_key_prefix>&& ckey, typename ResultBuilder::result_type&& result, flat_mutation_reader_v2::tracked_buffer&& unconsumed_fragments,
page_consume_result(typename ResultBuilder::result_type&& result, flat_mutation_reader_v2::tracked_buffer&& unconsumed_fragments,
lw_shared_ptr<compact_for_result_state<ResultBuilder>>&& compaction_state) noexcept
: last_ckey(std::move(ckey))
, result(std::move(result))
: result(std::move(result))
, unconsumed_fragments(std::move(unconsumed_fragments))
, compaction_state(std::move(compaction_state)) {
}
@@ -699,7 +697,7 @@ future<page_consume_result<ResultBuilder>> read_page(
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
ResultBuilder&& result_builder) {
noncopyable_function<ResultBuilder(const compact_for_result_state<ResultBuilder>&)> result_builder_factory) {
auto compaction_state = make_lw_shared<compact_for_result_state<ResultBuilder>>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(),
cmd.partition_limit);
@@ -710,11 +708,11 @@ future<page_consume_result<ResultBuilder>> read_page(
}
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, std::move(result_builder), cmd.get_row_limit(),
auto f = co_await coroutine::as_future(query::consume_page(reader, compaction_state, cmd.slice, result_builder_factory(*compaction_state), cmd.get_row_limit(),
cmd.partition_limit, cmd.timestamp));
if (!f.failed()) {
// no exceptions are thrown in this block
auto [ckey, result] = std::move(f).get0();
auto result = std::move(f).get0();
const auto& cstats = compaction_state->stats();
tracing::trace(trace_state, "Page stats: {} partition(s), {} static row(s) ({} live, {} dead), {} clustering row(s) ({} live, {} dead) and {} range tombstone(s)",
cstats.partitions,
@@ -728,7 +726,7 @@ future<page_consume_result<ResultBuilder>> read_page(
auto buffer = reader.detach_buffer();
co_await reader.close();
// page_consume_result cannot fail so there's no risk of double-closing reader.
co_return page_consume_result<ResultBuilder>(std::move(ckey), std::move(result), std::move(buffer), std::move(compaction_state));
co_return page_consume_result<ResultBuilder>(std::move(result), std::move(buffer), std::move(compaction_state));
}
co_await reader.close();
@@ -743,17 +741,16 @@ future<typename ResultBuilder::result_type> do_query(
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
ResultBuilder&& result_builder_) {
noncopyable_function<ResultBuilder(const compact_for_result_state<ResultBuilder>&)> result_builder_factory) {
auto ctx = seastar::make_shared<read_context>(db, s, cmd, ranges, trace_state, timeout);
// "capture" result_builder so it won't be released if we yield.
auto result_builder = std::move(result_builder_);
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(ctx->lookup_readers(timeout).then([&] {
return read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, std::move(result_builder));
auto f = co_await coroutine::as_future(ctx->lookup_readers(timeout).then([&, result_builder_factory = std::move(result_builder_factory)] () mutable {
return read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, std::move(result_builder_factory));
}).then([&] (page_consume_result<ResultBuilder> r) -> future<typename ResultBuilder::result_type> {
if (r.compaction_state->are_limits_reached() || r.result.is_short_read()) {
co_await ctx->save_readers(std::move(r.unconsumed_fragments), std::move(*r.compaction_state).detach_state(), std::move(r.last_ckey));
co_await ctx->save_readers(std::move(r.unconsumed_fragments), std::move(*r.compaction_state).detach_state(),
position_in_partition(r.compaction_state->current_position()));
}
co_return std::move(r.result);
}));
@@ -772,7 +769,7 @@ static future<std::tuple<foreign_ptr<lw_shared_ptr<typename ResultBuilder::resul
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
db::timeout_clock::time_point timeout,
std::function<ResultBuilder(query::result_memory_accounter&&)> result_builder_factory) {
std::function<ResultBuilder(query::result_memory_accounter&&, const compact_for_result_state<ResultBuilder>&)> result_builder_factory) {
if (cmd.get_row_limit() == 0 || cmd.slice.partition_row_limit() == 0 || cmd.partition_limit == 0) {
co_return std::tuple(
make_foreign(make_lw_shared<typename ResultBuilder::result_type>()),
@@ -786,9 +783,10 @@ static future<std::tuple<foreign_ptr<lw_shared_ptr<typename ResultBuilder::resul
try {
auto accounter = co_await local_db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, short_read_allowed);
auto result_builder = result_builder_factory(std::move(accounter));
auto result = co_await do_query<ResultBuilder>(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(result_builder));
auto result = co_await do_query<ResultBuilder>(db, s, cmd, ranges, std::move(trace_state), timeout,
[result_builder_factory, accounter = std::move(accounter)] (const compact_for_result_state<ResultBuilder>& compaction_state) mutable {
return result_builder_factory(std::move(accounter), compaction_state);
});
++stats.total_reads;
stats.short_mutation_queries += bool(result.is_short_read());
@@ -829,12 +827,15 @@ public:
static constexpr emit_only_live_rows only_live = emit_only_live_rows::yes;
private:
const compact_for_result_state<data_query_result_builder>& _compaction_state;
std::unique_ptr<query::result::builder> _res_builder;
query_result_builder _builder;
public:
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts, query::result_memory_accounter&& accounter)
: _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
data_query_result_builder(const schema& s, const query::partition_slice& slice, query::result_options opts,
query::result_memory_accounter&& accounter, const compact_for_result_state<data_query_result_builder>& compaction_state)
: _compaction_state(compaction_state)
, _res_builder(std::make_unique<query::result::builder>(slice, opts, std::move(accounter)))
, _builder(s, *_res_builder) { }
void consume_new_partition(const dht::decorated_key& dk) { _builder.consume_new_partition(dk); }
@@ -845,6 +846,9 @@ public:
stop_iteration consume_end_of_partition() { return _builder.consume_end_of_partition(); }
result_type consume_end_of_stream() {
_builder.consume_end_of_stream();
if (_compaction_state.are_limits_reached() || _res_builder->is_short_read()) {
return _res_builder->build(_compaction_state.current_full_position());
}
return _res_builder->build();
}
};
@@ -861,7 +865,7 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
schema_ptr query_schema = cmd.slice.is_reversed() ? table_schema->make_reversed() : table_schema;
return do_query_on_all_shards<mutation_query_result_builder>(db, query_schema, cmd, ranges, std::move(trace_state), timeout,
[table_schema, &cmd] (query::result_memory_accounter&& accounter) {
[table_schema, &cmd] (query::result_memory_accounter&& accounter, const compact_for_result_state<mutation_query_result_builder>& compaction_state) {
return mutation_query_result_builder(*table_schema, cmd.slice, std::move(accounter));
});
}
@@ -877,7 +881,7 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
schema_ptr query_schema = cmd.slice.is_reversed() ? table_schema->make_reversed() : table_schema;
return do_query_on_all_shards<data_query_result_builder>(db, query_schema, cmd, ranges, std::move(trace_state), timeout,
[table_schema, &cmd, opts] (query::result_memory_accounter&& accounter) {
return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter));
[table_schema, &cmd, opts] (query::result_memory_accounter&& accounter, const compact_for_result_state<data_query_result_builder>& compaction_state) {
return data_query_result_builder(*table_schema, cmd.slice, opts, std::move(accounter), compaction_state);
});
}

View File

@@ -12,6 +12,7 @@
#include "mutation_fragment.hh"
#include "range_tombstone_assembler.hh"
#include "tombstone_gc.hh"
#include "full_position.hh"
static inline bool has_ck_selector(const query::clustering_row_ranges& ranges) {
// Like PK range, an empty row range, should be considered an "exclude all" restriction
@@ -160,7 +161,7 @@ class compact_mutation_state {
bool _return_static_content_on_partition_with_no_rows{};
std::optional<static_row> _last_static_row;
position_in_partition _last_clustering_pos;
position_in_partition _last_pos;
// Currently active tombstone, can be different than the tombstone emitted to
// the regular consumer (_current_emitted_tombstone) because even purged
// tombstone that are not emitted are still applied to data when compacting.
@@ -285,7 +286,7 @@ public:
, _partition_limit(partition_limit)
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
, _last_dk({dht::token(), partition_key::make_empty()})
, _last_clustering_pos(position_in_partition::before_all_clustered_rows())
, _last_pos(position_in_partition::end_of_partition_tag_t())
{
static_assert(!sstable_compaction(), "This constructor cannot be used for sstable compaction.");
}
@@ -298,7 +299,7 @@ public:
, _can_gc([this] (tombstone t) { return can_gc(t); })
, _slice(s.full_slice())
, _last_dk({dht::token(), partition_key::make_empty()})
, _last_clustering_pos(position_in_partition::before_all_clustered_rows())
, _last_pos(position_in_partition::end_of_partition_tag_t())
, _collector(std::make_unique<mutation_compactor_garbage_collector>(_schema))
{
static_assert(sstable_compaction(), "This constructor can only be used for sstable compaction.");
@@ -320,7 +321,7 @@ public:
_max_purgeable = api::missing_timestamp;
_gc_before = std::nullopt;
_last_static_row.reset();
_last_clustering_pos = position_in_partition::before_all_clustered_rows();
_last_pos = position_in_partition(position_in_partition::partition_start_tag_t());
_effective_tombstone = {};
_current_emitted_tombstone = {};
_current_emitted_gc_tombstone = {};
@@ -349,6 +350,7 @@ public:
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume(static_row&& sr, Consumer& consumer, GCConsumer& gc_consumer) {
_last_static_row = static_row(_schema, sr);
_last_pos = position_in_partition(position_in_partition::static_row_tag_t());
auto current_tombstone = _partition_tombstone;
if constexpr (sstable_compaction()) {
_collector->start_collecting_static_row();
@@ -380,7 +382,7 @@ public:
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume(clustering_row&& cr, Consumer& consumer, GCConsumer& gc_consumer) {
if (!sstable_compaction()) {
_last_clustering_pos = cr.position();
_last_pos = cr.position();
}
auto current_tombstone = std::max(_partition_tombstone, _effective_tombstone);
auto t = cr.tomb();
@@ -447,7 +449,7 @@ public:
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume(range_tombstone_change&& rtc, Consumer& consumer, GCConsumer& gc_consumer) {
if (!sstable_compaction()) {
_last_clustering_pos = rtc.position();
_last_pos = rtc.position();
}
++_stats.range_tombstones;
do_consume(std::move(rtc), consumer, gc_consumer);
@@ -458,7 +460,7 @@ public:
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) {
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition::after_key(_last_clustering_pos), tombstone{});
auto rtc = range_tombstone_change(position_in_partition::after_key(_last_pos), tombstone{});
// do_consume() overwrites _effective_tombstone with {}, so save and restore it.
auto prev_tombstone = _effective_tombstone;
do_consume(std::move(rtc), consumer, gc_consumer);
@@ -507,6 +509,19 @@ public:
return _dk;
}
// Only updated when SSTableCompaction == compact_for_sstables::no.
// Only meaningful if compaction has started already (current_partition() != nullptr).
position_in_partition_view current_position() const {
return _last_pos;
}
std::optional<full_position> current_full_position() const {
if (!_dk) {
return {};
}
return full_position(_dk->key(), _last_pos);
}
/// Reset limits and query-time to the new page's ones and re-emit the
/// partition-header and static row if there are clustering rows or range
/// tombstones left in the partition.
@@ -533,7 +548,7 @@ public:
consume(*std::exchange(_last_static_row, {}), consumer, nc);
}
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone);
auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_pos), _effective_tombstone);
do_consume(std::move(rtc), consumer, nc);
}
}
@@ -552,7 +567,7 @@ public:
detached_compaction_state detach_state() && {
partition_start ps(std::move(_last_dk), _partition_tombstone);
if (_effective_tombstone) {
return {std::move(ps), std::move(_last_static_row), range_tombstone_change(position_in_partition_view::after_key(_last_clustering_pos), _effective_tombstone)};
return {std::move(ps), std::move(_last_static_row), range_tombstone_change(position_in_partition_view::after_key(_last_pos), _effective_tombstone)};
} else {
return {std::move(ps), std::move(_last_static_row), std::optional<range_tombstone_change>{}};
}
@@ -619,6 +634,10 @@ public:
auto consume_end_of_stream() {
return _state->consume_end_of_stream(_consumer, _gc_consumer);
}
lw_shared_ptr<compact_mutation_state<OnlyLive, SSTableCompaction>> get_state() {
return _state;
}
};
template<emit_only_live_rows only_live, typename Consumer>

View File

@@ -38,14 +38,31 @@ operator<<(std::ostream& os, const partition_end& eop) {
return os << "{partition_end}";
}
std::ostream& operator<<(std::ostream& out, partition_region r) {
std::string_view to_string(partition_region r) {
switch (r) {
case partition_region::partition_start: out << "partition_start"; break;
case partition_region::static_row: out << "static_row"; break;
case partition_region::clustered: out << "clustered"; break;
case partition_region::partition_end: out << "partition_end"; break;
case partition_region::partition_start: return "partition_start";
case partition_region::static_row: return "static_row";
case partition_region::clustered: return "clustered";
case partition_region::partition_end: return "partition_end";
}
return out;
}
partition_region parse_partition_region(std::string_view s) {
if (s == "partition_start") {
return partition_region::partition_start;
} else if (s == "static_row") {
return partition_region::static_row;
} else if (s == "clustered") {
return partition_region::clustered;
} else if (s == "partition_end") {
return partition_region::partition_end;
} else {
throw std::runtime_error(fmt::format("Invalid value for partition_region: {}", s));
}
}
std::ostream& operator<<(std::ostream& out, partition_region r) {
return out << to_string(r);
}
std::ostream& operator<<(std::ostream& os, position_in_partition_view::printer p) {

View File

@@ -525,7 +525,6 @@ inline position_in_partition_view partition_end::position() const
return position_in_partition_view(position_in_partition_view::end_of_partition_tag_t());
}
std::ostream& operator<<(std::ostream&, partition_region);
std::ostream& operator<<(std::ostream&, mutation_fragment::kind);

View File

@@ -2202,6 +2202,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query_v2<emit_only_live_rows::yes, query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, query_result_builder(*s, builder));
auto compaction_state = consumer.get_state();
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
// FIXME: frozen_mutation::consume supports only forward consumers
@@ -2225,7 +2226,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
if (r.is_short_read()) {
builder.mark_as_short_read();
}
co_return builder.build();
co_return builder.build(compaction_state->current_full_position());
}
query::result
@@ -2233,9 +2234,10 @@ query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_l
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size });
auto consumer = compact_for_query_v2<emit_only_live_rows::yes, query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, query_result_builder(*m.schema(), builder));
auto compaction_state = consumer.get_state();
const auto reverse = slice.options.contains(query::partition_slice::option::reversed) ? consume_in_reverse::legacy_half_reverse : consume_in_reverse::no;
std::move(m).consume(consumer, reverse);
return builder.build();
return builder.build(compaction_state->current_full_position());
}
class counter_write_query_result_builder {

View File

@@ -80,6 +80,10 @@ enum class partition_region : uint8_t {
partition_end,
};
std::ostream& operator<<(std::ostream&, partition_region);
std::string_view to_string(partition_region);
partition_region parse_partition_region(std::string_view);
class position_in_partition_view {
friend class position_in_partition;

View File

@@ -43,10 +43,10 @@ static sstring cannot_use_reason(can_use cu)
}
static bool ring_position_matches(const schema& s, const dht::partition_range& range, const query::partition_slice& slice,
const position_view& pos) {
full_position_view pos) {
const auto is_reversed = slice.is_reversed();
const auto expected_start = dht::ring_position_view(*pos.partition_key);
const auto expected_start = dht::ring_position(dht::decorate_key(s, pos.partition));
// If there are no clustering columns or the select is distinct we don't
// have clustering rows at all. In this case we can be sure we won't have
// anything more in the last page's partition and thus the start bound is
@@ -54,7 +54,7 @@ static bool ring_position_matches(const schema& s, const dht::partition_range& r
// inclusive.
const auto expected_inclusiveness = s.clustering_key_size() > 0 &&
!slice.options.contains<query::partition_slice::option::distinct>() &&
pos.clustering_key;
pos.position.region() == partition_region::clustered;
const auto comparator = dht::ring_position_comparator(s);
if (is_reversed && !range.is_singular()) {
@@ -66,8 +66,8 @@ static bool ring_position_matches(const schema& s, const dht::partition_range& r
return start && comparator(start->value(), expected_start) == 0 && start->is_inclusive() == expected_inclusiveness;
}
static bool clustering_position_matches(const schema& s, const query::partition_slice& slice, const position_view& pos) {
const auto& row_ranges = slice.row_ranges(s, pos.partition_key->key());
static bool clustering_position_matches(const schema& s, const query::partition_slice& slice, full_position_view pos) {
const auto& row_ranges = slice.row_ranges(s, pos.partition);
if (row_ranges.empty()) {
// This is a valid slice on the last page of a query with
@@ -77,7 +77,7 @@ static bool clustering_position_matches(const schema& s, const query::partition_
return true;
}
if (!pos.clustering_key) {
if (pos.position.region() != partition_region::clustered) {
// We stopped at a non-clustering position so the partition's clustering
// row ranges should be the default row ranges.
return &row_ranges == &slice.default_row_ranges();
@@ -94,7 +94,7 @@ static bool clustering_position_matches(const schema& s, const query::partition_
if (!start) {
return false;
}
return !start->is_inclusive() && eq(start->value(), *pos.clustering_key);
return !start->is_inclusive() && eq(start->value(), pos.position.key());
}
static bool ranges_match(const schema& s, const dht::partition_range& original_range, const dht::partition_range& new_range) {
@@ -156,17 +156,16 @@ static can_use can_be_used_for_page(const Querier& q, const schema& s, const dht
return can_use::no_schema_version_mismatch;
}
const auto pos = q.current_position();
if (!pos.partition_key) {
const auto pos_opt = q.current_position();
if (!pos_opt) {
// There was nothing read so far so we assume we are ok.
return can_use::yes;
}
if (!ring_position_matches(s, range, slice, pos)) {
if (!ring_position_matches(s, range, slice, *pos_opt)) {
return can_use::no_ring_pos_mismatch;
}
if (!clustering_position_matches(s, slice, pos)) {
if (!clustering_position_matches(s, slice, *pos_opt)) {
return can_use::no_clustering_pos_mismatch;
}
return can_use::yes;

View File

@@ -13,6 +13,7 @@
#include "mutation_compactor.hh"
#include "reader_concurrency_semaphore.hh"
#include "readers/mutation_source.hh"
#include "full_position.hh"
#include <boost/intrusive/set.hpp>
@@ -20,42 +21,6 @@
namespace query {
template <typename Consumer>
class clustering_position_tracker {
Consumer _consumer;
lw_shared_ptr<std::optional<clustering_key_prefix>> _last_ckey;
public:
clustering_position_tracker(Consumer&& consumer, lw_shared_ptr<std::optional<clustering_key_prefix>> last_ckey)
: _consumer(std::forward<Consumer>(consumer))
, _last_ckey(std::move(last_ckey)) {
}
void consume_new_partition(const dht::decorated_key& dk) {
_last_ckey->reset();
_consumer.consume_new_partition(dk);
}
void consume(tombstone t) {
_consumer.consume(t);
}
stop_iteration consume(static_row&& sr, tombstone t, bool is_live) {
return _consumer.consume(std::move(sr), std::move(t), is_live);
}
stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_live) {
*_last_ckey = cr.key();
return _consumer.consume(std::move(cr), std::move(t), is_live);
}
stop_iteration consume(range_tombstone_change&& rtc) {
return _consumer.consume(std::move(rtc));
}
stop_iteration consume_end_of_partition() {
return _consumer.consume_end_of_partition();
}
auto consume_end_of_stream() {
return _consumer.consume_end_of_stream();
}
};
/// Consume a page worth of data from the reader.
///
/// Uses `compaction_state` for compacting the fragments and `consumer` for
@@ -77,23 +42,12 @@ auto consume_page(flat_mutation_reader_v2& reader,
const auto next_fragment_region = next_fragment ? next_fragment->position().region() : partition_region::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_region, consumer);
auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
auto reader_consumer = compact_for_query_v2<OnlyLive, clustering_position_tracker<Consumer>>(
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));
auto reader_consumer = compact_for_query_v2<OnlyLive, Consumer>(compaction_state, std::move(consumer));
return reader.consume(std::move(reader_consumer)).then([last_ckey] (auto&&... results) mutable {
static_assert(sizeof...(results) <= 1);
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
});
return reader.consume(std::move(reader_consumer));
});
}
struct position_view {
const dht::decorated_key* partition_key;
const clustering_key_prefix* clustering_key;
};
class querier_base {
friend class querier_utils;
@@ -143,7 +97,7 @@ public:
return _slice->options.contains(query::partition_slice::option::reversed);
}
virtual position_view current_position() const = 0;
virtual std::optional<full_position_view> current_position() const = 0;
dht::partition_ranges_view ranges() const {
return _query_ranges;
@@ -180,7 +134,6 @@ public:
template <emit_only_live_rows OnlyLive>
class querier : public querier_base {
lw_shared_ptr<compact_for_query_state_v2<OnlyLive>> _compaction_state;
std::optional<clustering_key_prefix> _last_ckey;
public:
querier(const mutation_source& ms,
@@ -206,8 +159,7 @@ public:
gc_clock::time_point query_time,
tracing::trace_state_ptr trace_ptr = {}) {
return ::query::consume_page(std::get<flat_mutation_reader_v2>(_reader), _compaction_state, *_slice, std::move(consumer), row_limit,
partition_limit, query_time).then([this, trace_ptr = std::move(trace_ptr)] (auto&& results) {
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
partition_limit, query_time).then_wrapped([this, trace_ptr = std::move(trace_ptr)] (auto&& fut) {
const auto& cstats = _compaction_state->stats();
tracing::trace(trace_ptr, "Page stats: {} partition(s), {} static row(s) ({} live, {} dead), {} clustering row(s) ({} live, {} dead) and {} range tombstone(s)",
cstats.partitions,
@@ -218,21 +170,16 @@ public:
cstats.clustering_rows.live,
cstats.clustering_rows.dead,
cstats.range_tombstones);
constexpr auto size = std::tuple_size<std::decay_t<decltype(results)>>::value;
static_assert(size <= 2);
if constexpr (size == 1) {
return make_ready_future<>();
} else {
auto result = std::get<1>(std::move(results));
return make_ready_future<std::decay_t<decltype(result)>>(std::move(result));
}
return std::move(fut);
});
}
virtual position_view current_position() const override {
virtual std::optional<full_position_view> current_position() const override {
const dht::decorated_key* dk = _compaction_state->current_partition();
const clustering_key_prefix* clustering_key = _last_ckey ? &*_last_ckey : nullptr;
return {dk, clustering_key};
if (!dk) {
return {};
}
return full_position_view(dk->key(), _compaction_state->current_position());
}
};
@@ -251,7 +198,7 @@ using mutation_querier = querier<emit_only_live_rows::no>;
class shard_mutation_querier : public querier_base {
std::unique_ptr<const dht::partition_range_vector> _query_ranges;
dht::decorated_key _nominal_pkey;
std::optional<clustering_key_prefix> _nominal_ckey;
position_in_partition _nominal_pos;
private:
shard_mutation_querier(
@@ -261,11 +208,11 @@ private:
flat_mutation_reader_v2 reader,
reader_permit permit,
dht::decorated_key nominal_pkey,
std::optional<clustering_key_prefix> nominal_ckey)
position_in_partition nominal_pos)
: querier_base(permit, std::move(reader_range), std::move(reader_slice), std::move(reader), *query_ranges)
, _query_ranges(std::move(query_ranges))
, _nominal_pkey(std::move(nominal_pkey))
, _nominal_ckey(std::move(nominal_ckey)) {
, _nominal_pos(std::move(nominal_pos)) {
}
@@ -277,13 +224,13 @@ public:
flat_mutation_reader_v2 reader,
reader_permit permit,
dht::decorated_key nominal_pkey,
std::optional<clustering_key_prefix> nominal_ckey)
position_in_partition nominal_pos)
: shard_mutation_querier(std::make_unique<const dht::partition_range_vector>(std::move(query_ranges)), std::move(reader_range),
std::move(reader_slice), std::move(reader), std::move(permit), std::move(nominal_pkey), std::move(nominal_ckey)) {
std::move(reader_slice), std::move(reader), std::move(permit), std::move(nominal_pkey), std::move(nominal_pos)) {
}
virtual position_view current_position() const override {
return {&_nominal_pkey, _nominal_ckey ? &*_nominal_ckey : nullptr};
virtual std::optional<full_position_view> current_position() const override {
return full_position_view(_nominal_pkey.key(), _nominal_pos);
}
lw_shared_ptr<const dht::partition_range> reader_range() && {

View File

@@ -13,14 +13,17 @@
#include "query-result.hh"
#include "digest_algorithm.hh"
#include "full_position.hh"
#include "idl/uuid.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/position_in_partition.dist.hh"
#include "idl/query.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/position_in_partition.dist.impl.hh"
#include "idl/query.dist.impl.hh"
namespace query {
@@ -190,8 +193,10 @@ public:
return std::make_tuple(ps.size(), rows);
}
std::tuple<partition_key, std::optional<clustering_key>>
get_last_partition_and_clustering_key() const {
full_position get_last_position() const {
if (_v.last_position()) {
return *_v.last_position();
}
auto ps = _v.partitions();
assert(!ps.empty());
auto pit = ps.begin();
@@ -201,16 +206,19 @@ public:
}
auto p = *pit;
auto rs = p.rows();
std::optional<clustering_key> last_row;
auto pos = position_in_partition(position_in_partition::partition_start_tag_t());
if (!rs.empty()) {
auto rit = rs.begin();
auto rnext = rit;
while (++rnext != rs.end()) {
rit = rnext;
}
last_row = (*rit).key();
const auto& key_opt = (*rit).key();
if (key_opt) {
pos = position_in_partition::for_key(*key_opt);
}
}
return { p.key().value(), std::move(last_row) };
return { p.key().value(), std::move(pos) };
}
};

View File

@@ -14,13 +14,16 @@
#include "query-result.hh"
#include "digest_algorithm.hh"
#include "digester.hh"
#include "full_position.hh"
#include "idl/uuid.dist.hh"
#include "idl/keys.dist.hh"
#include "idl/position_in_partition.dist.hh"
#include "idl/query.dist.hh"
#include "serializer_impl.hh"
#include "serialization_visitors.hh"
#include "idl/uuid.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/position_in_partition.dist.impl.hh"
#include "idl/query.dist.impl.hh"
namespace query {
@@ -162,14 +165,19 @@ public:
_partition_count, _last_modified);
}
result build() {
std::move(_w).end_partitions().end_query_result();
result build(std::optional<full_position> last_pos = {}) {
auto after_partitions = std::move(_w).end_partitions();
if (last_pos) {
std::move(after_partitions).write_last_position(*last_pos).end_query_result();
} else {
std::move(after_partitions).skip_last_position().end_query_result();
}
switch (_request) {
case result_request::only_result:
return result(std::move(_out), _short_read, _row_count, _partition_count, std::move(_memory_accounter).done());
case result_request::only_digest: {
bytes_ostream buf;
ser::writer_of_query_result<bytes_ostream>(buf).start_partitions().end_partitions().end_query_result();
ser::writer_of_query_result<bytes_ostream>(buf).start_partitions().end_partitions().skip_last_position().end_query_result();
return result(std::move(buf), result_digest(_digest.finalize_array()), _last_modified, _short_read, {}, {});
}
case result_request::result_and_digest:

View File

@@ -317,7 +317,7 @@ void result::ensure_counts() {
result::result()
: result([] {
bytes_ostream out;
ser::writer_of_query_result<bytes_ostream>(out).skip_partitions().end_query_result();
ser::writer_of_query_result<bytes_ostream>(out).skip_partitions().skip_last_position().end_query_result();
return out;
}(), short_read::no, 0, 0)
{ }
@@ -354,8 +354,11 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
short_read is_short_read;
uint32_t partition_count = 0;
std::optional<full_position> last_position;
for (auto&& r : _partial) {
result_view::do_with(*r, [&] (result_view rv) {
last_position.reset();
for (auto&& pv : rv._v.partitions()) {
auto rows = pv.rows();
// If rows.empty(), then there's a static row, or there wouldn't be a partition
@@ -375,17 +378,25 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
return;
}
}
last_position = rv._v.last_position();
});
if (r->is_short_read()) {
is_short_read = short_read::yes;
last_position.reset();
break;
}
if (row_count >= _max_rows || partition_count >= _max_partitions) {
last_position.reset();
break;
}
}
std::move(partitions).end_partitions().end_query_result();
auto after_partitions = std::move(partitions).end_partitions();
if (last_position) {
std::move(after_partitions).write_last_position(*last_position).end_query_result();
} else {
std::move(after_partitions).skip_last_position().end_query_result();
}
return make_foreign(make_lw_shared<query::result>(std::move(w), is_short_read, row_count, partition_count));
}

View File

@@ -45,6 +45,7 @@
#include <cfloat>
#include <algorithm>
#include "idl/position_in_partition.dist.hh"
#include "idl/partition_checksum.dist.hh"
logging::logger rlogger("repair");

View File

@@ -44,6 +44,7 @@
#include "db/system_keyspace.hh"
#include "service/storage_proxy.hh"
#include "db/batchlog_manager.hh"
#include "idl/position_in_partition.dist.hh"
#include "idl/partition_checksum.dist.hh"
#include "readers/empty_v2.hh"
#include "readers/evictable.hh"

View File

@@ -2112,6 +2112,11 @@ table::query(schema_ptr s,
}
}
std::optional<full_position> last_pos;
if (querier_opt && querier_opt->current_position()) {
last_pos.emplace(*querier_opt->current_position());
}
if (!saved_querier || (querier_opt && !querier_opt->are_limits_reached() && !qs.builder.is_short_read())) {
co_await querier_opt->close();
querier_opt = {};
@@ -2120,7 +2125,7 @@ table::query(schema_ptr s,
*saved_querier = std::move(querier_opt);
}
co_return make_lw_shared<query::result>(qs.builder.build());
co_return make_lw_shared<query::result>(qs.builder.build(std::move(last_pos)));
}
future<reconcilable_result>

View File

@@ -34,7 +34,9 @@ service::pager::paging_state::paging_state(partition_key pk,
std::optional<db::read_repair_decision> query_read_repair_decision,
uint32_t rows_fetched_for_last_partition_low_bits,
uint32_t rem_high_bits,
uint32_t rows_fetched_for_last_partition_high_bits)
uint32_t rows_fetched_for_last_partition_high_bits,
bound_weight ck_weight,
partition_region region)
: _partition_key(std::move(pk))
, _clustering_key(std::move(ck))
, _remaining_low_bits(rem_low_bits)
@@ -43,20 +45,24 @@ service::pager::paging_state::paging_state(partition_key pk,
, _query_read_repair_decision(query_read_repair_decision)
, _rows_fetched_for_last_partition_low_bits(rows_fetched_for_last_partition_low_bits)
, _remaining_high_bits(rem_high_bits)
, _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits) {
}
, _rows_fetched_for_last_partition_high_bits(rows_fetched_for_last_partition_high_bits)
, _ck_weight(ck_weight)
, _region(region)
{ }
service::pager::paging_state::paging_state(partition_key pk,
std::optional<clustering_key> ck,
position_in_partition_view pos,
uint64_t rem,
utils::UUID query_uuid,
replicas_per_token_range last_replicas,
std::optional<db::read_repair_decision> query_read_repair_decision,
uint64_t rows_fetched_for_last_partition)
: paging_state(std::move(pk), std::move(ck), static_cast<uint32_t>(rem), query_uuid, std::move(last_replicas), query_read_repair_decision,
: paging_state(std::move(pk), pos.has_key() ? std::optional(pos.key()) : std::nullopt, static_cast<uint32_t>(rem), query_uuid, std::move(last_replicas), query_read_repair_decision,
static_cast<uint32_t>(rows_fetched_for_last_partition), static_cast<uint32_t>(rem >> 32),
static_cast<uint32_t>(rows_fetched_for_last_partition >> 32)) {
}
static_cast<uint32_t>(rows_fetched_for_last_partition >> 32),
pos.get_bound_weight(),
pos.region())
{ }
lw_shared_ptr<service::pager::paging_state> service::pager::paging_state::deserialize(
bytes_opt data) {

View File

@@ -17,6 +17,7 @@
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "db/read_repair_decision.hh"
#include "position_in_partition.hh"
namespace service {
@@ -36,8 +37,11 @@ private:
uint32_t _rows_fetched_for_last_partition_low_bits;
uint32_t _remaining_high_bits;
uint32_t _rows_fetched_for_last_partition_high_bits;
bound_weight _ck_weight = bound_weight::equal;
partition_region _region = partition_region::partition_start;
public:
// IDL ctor
paging_state(partition_key pk,
std::optional<clustering_key> ck,
uint32_t rem,
@@ -46,10 +50,12 @@ public:
std::optional<db::read_repair_decision> query_read_repair_decision,
uint32_t rows_fetched_for_last_partition,
uint32_t remaining_ext,
uint32_t rows_fetched_for_last_partition_high_bits);
uint32_t rows_fetched_for_last_partition_high_bits,
bound_weight ck_weight,
partition_region region);
paging_state(partition_key pk,
std::optional<clustering_key> ck,
position_in_partition_view pos,
uint64_t rem,
utils::UUID reader_recall_uuid,
replicas_per_token_range last_replicas,
@@ -60,8 +66,11 @@ public:
_partition_key = std::move(pk);
}
// sets position to at the given clustering key
void set_clustering_key(clustering_key ck) {
_clustering_key = std::move(ck);
_ck_weight = bound_weight::equal;
_region = partition_region::clustered;
}
void set_remaining(uint64_t remaining) {
@@ -77,10 +86,27 @@ public:
}
/**
* Clustering key in last partition. I.e. first, next, row
*
* Use \ref get_position_in_partition() instead.
*/
const std::optional<clustering_key>& get_clustering_key() const {
return _clustering_key;
}
/**
* Weight of last processed position, see \ref get_position_in_partition()
*/
bound_weight get_clustering_key_weight() const {
return _ck_weight;
}
/**
* Partition region of last processed position, see \ref get_position_in_partition()
*/
partition_region get_partition_region() const {
return _region;
}
position_in_partition_view get_position_in_partition() const {
return position_in_partition_view(_region, _ck_weight, _clustering_key ? &*_clustering_key : nullptr);
}
/**
* Max remaining rows to fetch in total.
* I.e. initial row_limit - #rows returned so far.

View File

@@ -62,7 +62,7 @@ protected:
uint64_t _per_partition_limit;
std::optional<partition_key> _last_pkey;
std::optional<clustering_key> _last_ckey;
position_in_partition _last_pos;
std::optional<utils::UUID> _query_uuid;
shared_ptr<service::storage_proxy> _proxy;

View File

@@ -40,183 +40,187 @@ static bool has_clustering_keys(const schema& s, const query::read_command& cmd)
&& !cmd.slice.options.contains<query::partition_slice::option::distinct>();
}
query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->get_row_limit())
, _per_partition_limit(cmd->slice.partition_row_limit())
, _proxy(p.shared_from_this())
, _schema(std::move(s))
, _selection(selection)
, _state(state)
, _options(options)
, _cmd(std::move(cmd))
, _ranges(std::move(ranges))
{}
query_pager::query_pager(service::storage_proxy& p, schema_ptr s,
shared_ptr<const cql3::selection::selection> selection,
service::query_state& state,
const cql3::query_options& options,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector ranges)
: _has_clustering_keys(has_clustering_keys(*s, *cmd))
, _max(cmd->get_row_limit())
, _per_partition_limit(cmd->slice.partition_row_limit())
, _last_pos(position_in_partition::partition_start_tag_t())
, _proxy(p.shared_from_this())
, _schema(std::move(s))
, _selection(selection)
, _state(state)
, _options(options)
, _cmd(std::move(cmd))
, _ranges(std::move(ranges))
{}
future<result<service::storage_proxy::coordinator_query_result>> query_pager::do_fetch_page(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
auto state = _options.get_paging_state();
future<result<service::storage_proxy::coordinator_query_result>> query_pager::do_fetch_page(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
auto state = _options.get_paging_state();
// Most callers should set this but we want to make sure, as results
// won't be paged without it.
_cmd->slice.options.set<query::partition_slice::option::allow_short_read>();
// Override this, to make sure we use the value appropriate for paging
// (with allow_short_read set).
_cmd->max_result_size = _proxy->get_max_result_size(_cmd->slice);
// Most callers should set this but we want to make sure, as results
// won't be paged without it.
_cmd->slice.options.set<query::partition_slice::option::allow_short_read>();
// Override this, to make sure we use the value appropriate for paging
// (with allow_short_read set).
_cmd->max_result_size = _proxy->get_max_result_size(_cmd->slice);
if (!_last_pkey && state) {
_max = state->get_remaining();
_last_pkey = state->get_partition_key();
_last_ckey = state->get_clustering_key();
_query_uuid = state->get_query_uuid();
_last_replicas = state->get_last_replicas();
_query_read_repair_decision = state->get_query_read_repair_decision();
_rows_fetched_for_last_partition = state->get_rows_fetched_for_last_partition();
}
if (!_last_pkey && state) {
_max = state->get_remaining();
_last_pkey = state->get_partition_key();
_last_pos = state->get_position_in_partition();
_query_uuid = state->get_query_uuid();
_last_replicas = state->get_last_replicas();
_query_read_repair_decision = state->get_query_read_repair_decision();
_rows_fetched_for_last_partition = state->get_rows_fetched_for_last_partition();
}
_cmd->is_first_page = query::is_first_page(!_query_uuid);
if (!_query_uuid) {
_query_uuid = utils::make_random_uuid();
}
_cmd->query_uuid = *_query_uuid;
_cmd->is_first_page = query::is_first_page(!_query_uuid);
if (!_query_uuid) {
_query_uuid = utils::make_random_uuid();
}
_cmd->query_uuid = *_query_uuid;
qlogger.trace("fetch_page query id {}", _cmd->query_uuid);
qlogger.trace("fetch_page query id {}", _cmd->query_uuid);
if (_last_pkey) {
auto dpk = dht::decorate_key(*_schema, *_last_pkey);
dht::ring_position lo(dpk);
if (_last_pkey) {
auto dpk = dht::decorate_key(*_schema, *_last_pkey);
dht::ring_position lo(dpk);
auto reversed = _cmd->slice.options.contains<query::partition_slice::option::reversed>();
auto reversed = _cmd->slice.options.contains<query::partition_slice::option::reversed>();
qlogger.trace("PKey={}, CKey={}, reversed={}", dpk, _last_ckey, reversed);
qlogger.trace("PKey={}, Pos={}, reversed={}", dpk, _last_pos, reversed);
// Note: we're assuming both that the ranges are checked
// and "cql-compliant", and that storage_proxy will process
// the ranges in order
//
// If the original query has singular restrictions like "col in (x, y, z)",
// we will eventually generate an empty range. This is ok, because empty range == nothing,
// which is what we thus mean.
auto modify_ranges = [reversed](auto& ranges, auto& lo, bool inclusive, const auto& cmp) {
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
typedef typename range_type::bound bound_type;
bool found = false;
// Note: we're assuming both that the ranges are checked
// and "cql-compliant", and that storage_proxy will process
// the ranges in order
//
// If the original query has singular restrictions like "col in (x, y, z)",
// we will eventually generate an empty range. This is ok, because empty range == nothing,
// which is what we thus mean.
auto modify_ranges = [reversed](auto& ranges, auto& lo, bool inclusive, const auto& cmp) {
typedef typename std::remove_reference_t<decltype(ranges)>::value_type range_type;
typedef typename range_type::bound bound_type;
bool found = false;
auto i = ranges.begin();
while (i != ranges.end()) {
bool contains = i->contains(lo, cmp);
auto i = ranges.begin();
while (i != ranges.end()) {
bool contains = i->contains(lo, cmp);
if (contains) {
found = true;
}
bool remove = !found
|| (contains && !inclusive && (i->is_singular()
|| (reversed && i->start() && cmp(i->start()->value(), lo) == 0)
|| (!reversed && i->end() && cmp(i->end()->value(), lo) == 0)))
;
if (remove) {
qlogger.trace("Remove range {}", *i);
i = ranges.erase(i);
continue;
}
if (contains) {
auto r = reversed && !i->is_singular()
? range_type(i->start(), bound_type{ lo, inclusive })
: range_type( bound_type{ lo, inclusive }, i->end(), i->is_singular())
;
qlogger.trace("Modify range {} -> {}", *i, r);
*i = std::move(r);
}
++i;
if (contains) {
found = true;
}
qlogger.trace("Result ranges {}", ranges);
};
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
const bool has_ck = _has_clustering_keys && _last_ckey;
bool remove = !found
|| (contains && !inclusive && (i->is_singular()
|| (reversed && i->start() && cmp(i->start()->value(), lo) == 0)
|| (!reversed && i->end() && cmp(i->end()->value(), lo) == 0)))
;
// If we have no clustering keys, it should mean we only have one row
// per PK. Thus we can just bypass the last one.
modify_ranges(_ranges, lo, has_ck, dht::ring_position_comparator(*_schema));
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
clustering_key_prefix ckp = clustering_key_prefix::from_exploded(*_schema, _last_ckey->explode(*_schema));
query::trim_clustering_row_ranges_to(*_schema, row_ranges, ckp, reversed);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
if (remove) {
qlogger.trace("Remove range {}", *i);
i = ranges.erase(i);
continue;
}
if (contains) {
auto r = reversed && !i->is_singular()
? range_type(i->start(), bound_type{ lo, inclusive })
: range_type( bound_type{ lo, inclusive }, i->end(), i->is_singular())
;
qlogger.trace("Modify range {} -> {}", *i, r);
*i = std::move(r);
}
++i;
}
qlogger.trace("Result ranges {}", ranges);
};
// last ck can be empty depending on whether we
// deserialized state or not. This case means "last page ended on
// something-not-bound-by-clustering" (i.e. a static row, alone)
const bool has_ck = _has_clustering_keys && _last_pos.region() == partition_region::clustered;
// If we have no clustering keys, it should mean we only have one row
// per PK. Thus we can just bypass the last one.
modify_ranges(_ranges, lo, has_ck, dht::ring_position_comparator(*_schema));
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
position_in_partition_view next_pos = _last_pos;
if (_last_pos.has_key()) {
next_pos = reversed ? position_in_partition_view::before_key(_last_pos) : position_in_partition_view::after_key(_last_pos);
}
query::trim_clustering_row_ranges_to(*_schema, row_ranges, next_pos, reversed);
_cmd->slice.set_range(*_schema, *_last_pkey, row_ranges);
}
auto max_rows = max_rows_to_fetch(page_size);
// We always need PK so we can determine where to start next.
_cmd->slice.options.set<query::partition_slice::option::send_partition_key>();
// don't add empty bytes (cks) unless we have to
if (_has_clustering_keys) {
_cmd->slice.options.set<
query::partition_slice::option::send_clustering_key>();
}
_cmd->set_row_limit(max_rows);
maybe_adjust_per_partition_limit(page_size);
qlogger.debug("Fetching {}, page size={}, max_rows={}",
_cmd->cf_id, page_size, max_rows
);
auto ranges = _ranges;
auto command = ::make_lw_shared<query::read_command>(*_cmd);
return _proxy->query_result(_schema,
std::move(command),
std::move(ranges),
_options.get_consistency(),
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision});
}
future<> query_pager::fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return fetch_page_result(builder, page_size, now, timeout)
.then(utils::result_into_future<result<>>);
}
auto max_rows = max_rows_to_fetch(page_size);
future<result<>> query_pager::fetch_page_result(cql3::selection::result_set_builder& builder, uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return do_fetch_page(page_size, now, timeout).then(utils::result_wrap([this, &builder, page_size, now] (service::storage_proxy::coordinator_query_result qr) {
_last_replicas = std::move(qr.last_replicas);
_query_read_repair_decision = qr.read_repair_decision;
return builder.with_thread_if_needed([this, &builder, page_size, now, qr = std::move(qr)] () mutable -> result<> {
handle_result(cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection),
std::move(qr.query_result), page_size, now);
return bo::success();
// We always need PK so we can determine where to start next.
_cmd->slice.options.set<query::partition_slice::option::send_partition_key>();
// don't add empty bytes (cks) unless we have to
if (_has_clustering_keys) {
_cmd->slice.options.set<
query::partition_slice::option::send_clustering_key>();
}
_cmd->set_row_limit(max_rows);
maybe_adjust_per_partition_limit(page_size);
qlogger.debug("Fetching {}, page size={}, max_rows={}",
_cmd->cf_id, page_size, max_rows
);
auto ranges = _ranges;
auto command = ::make_lw_shared<query::read_command>(*_cmd);
return _proxy->query_result(_schema,
std::move(command),
std::move(ranges),
_options.get_consistency(),
{timeout, _state.get_permit(), _state.get_client_state(), _state.get_trace_state(), std::move(_last_replicas), _query_read_repair_decision});
}
future<> query_pager::fetch_page(cql3::selection::result_set_builder& builder, uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return fetch_page_result(builder, page_size, now, timeout)
.then(utils::result_into_future<result<>>);
}
future<result<>> query_pager::fetch_page_result(cql3::selection::result_set_builder& builder, uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return do_fetch_page(page_size, now, timeout).then(utils::result_wrap([this, &builder, page_size, now] (service::storage_proxy::coordinator_query_result qr) {
_last_replicas = std::move(qr.last_replicas);
_query_read_repair_decision = qr.read_repair_decision;
return builder.with_thread_if_needed([this, &builder, page_size, now, qr = std::move(qr)] () mutable -> result<> {
handle_result(cql3::selection::result_set_builder::visitor(builder, *_schema, *_selection),
std::move(qr.query_result), page_size, now);
return bo::success();
});
}));
}
future<std::unique_ptr<cql3::result_set>> query_pager::fetch_page(uint32_t page_size,
gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return fetch_page_result(page_size, now, timeout)
.then(utils::result_into_future<result<std::unique_ptr<cql3::result_set>>>);
}
future<result<std::unique_ptr<cql3::result_set>>> query_pager::fetch_page_result(uint32_t page_size,
gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return do_with(
cql3::selection::result_set_builder(*_selection, now,
_options.get_cql_serialization_format()),
[this, page_size, now, timeout](auto& builder) {
return this->fetch_page_result(builder, page_size, now, timeout).then(utils::result_wrap([&builder] {
return builder.with_thread_if_needed([&builder] () -> result<std::unique_ptr<cql3::result_set>> {
return bo::success(builder.build());
});
}));
});
}));
}
future<std::unique_ptr<cql3::result_set>> query_pager::fetch_page(uint32_t page_size,
gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return fetch_page_result(page_size, now, timeout)
.then(utils::result_into_future<result<std::unique_ptr<cql3::result_set>>>);
}
future<result<std::unique_ptr<cql3::result_set>>> query_pager::fetch_page_result(uint32_t page_size,
gc_clock::time_point now, db::timeout_clock::time_point timeout) {
return do_with(
cql3::selection::result_set_builder(*_selection, now,
_options.get_cql_serialization_format()),
[this, page_size, now, timeout](auto& builder) {
return this->fetch_page_result(builder, page_size, now, timeout).then(utils::result_wrap([&builder] {
return builder.with_thread_if_needed([&builder] () -> result<std::unique_ptr<cql3::result_set>> {
return bo::success(builder.build());
});
}));
});
}
}
future<cql3::result_generator> query_pager::fetch_page_generator(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout, cql3::cql_stats& stats) {
return fetch_page_generator_result(page_size, now, timeout, stats)
@@ -349,74 +353,77 @@ public:
}
};
template<typename Visitor>
requires query::ResultVisitor<Visitor>
void query_pager::handle_result(
Visitor&& visitor,
const foreign_ptr<lw_shared_ptr<query::result>>& results,
uint32_t page_size, gc_clock::time_point now) {
template<typename Visitor>
requires query::ResultVisitor<Visitor>
void query_pager::handle_result(
Visitor&& visitor,
const foreign_ptr<lw_shared_ptr<query::result>>& results,
uint32_t page_size, gc_clock::time_point now) {
auto update_slice = [&] (const partition_key& last_pkey) {
// refs #752, when doing aggregate queries we will re-use same
// slice repeatedly. Since "specific ck ranges" only deal with
// a single extra range, we must clear out the old one
// Even if it was not so of course, leaving junk in the slice
// is bad.
_cmd->slice.clear_range(*_schema, last_pkey);
};
auto update_slice = [&] (const partition_key& last_pkey) {
// refs #752, when doing aggregate queries we will re-use same
// slice repeatedly. Since "specific ck ranges" only deal with
// a single extra range, we must clear out the old one
// Even if it was not so of course, leaving junk in the slice
// is bad.
_cmd->slice.clear_range(*_schema, last_pkey);
};
auto view = query::result_view(*results);
auto view = query::result_view(*results);
uint64_t row_count;
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
view.consume(_cmd->slice, v);
_last_pos = position_in_partition(position_in_partition::partition_start_tag_t());
uint64_t row_count;
if constexpr(!std::is_same_v<std::decay_t<Visitor>, noop_visitor>) {
query_result_visitor<Visitor> v(std::forward<Visitor>(visitor));
view.consume(_cmd->slice, v);
if (_last_pkey) {
update_slice(*_last_pkey);
}
row_count = v.total_rows - v.dropped_rows;
_max = _max - row_count;
_exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0;
// If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches
if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) {
if (_last_pkey && v.last_pkey && _last_pkey->equal(*_schema, *v.last_pkey)) {
_rows_fetched_for_last_partition += v.last_partition_row_count;
} else {
_rows_fetched_for_last_partition = v.last_partition_row_count;
}
}
_last_pkey = v.last_pkey;
if (v.last_ckey) {
_last_pos = position_in_partition::for_key(*v.last_ckey);
}
} else {
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
_max = _max - row_count;
_exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0;
if (!_exhausted && row_count > 0) {
if (_last_pkey) {
update_slice(*_last_pkey);
}
row_count = v.total_rows - v.dropped_rows;
_max = _max - row_count;
_exhausted = (v.total_rows < page_size && !results->is_short_read() && v.dropped_rows == 0) || _max == 0;
// If per partition limit is defined, we need to accumulate rows fetched for last partition key if the key matches
if (_cmd->slice.partition_row_limit() < query::max_rows_if_set) {
if (_last_pkey && v.last_pkey && _last_pkey->equal(*_schema, *v.last_pkey)) {
_rows_fetched_for_last_partition += v.last_partition_row_count;
} else {
_rows_fetched_for_last_partition = v.last_partition_row_count;
}
}
_last_pkey = v.last_pkey;
_last_ckey = v.last_ckey;
} else {
row_count = results->row_count() ? *results->row_count() : std::get<1>(view.count_partitions_and_rows());
_max = _max - row_count;
_exhausted = (row_count < page_size && !results->is_short_read()) || _max == 0;
if (!_exhausted && row_count > 0) {
if (_last_pkey) {
update_slice(*_last_pkey);
}
auto [ last_pkey, last_ckey ] = view.get_last_partition_and_clustering_key();
_last_pkey = std::move(last_pkey);
_last_ckey = std::move(last_ckey);
}
}
qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : "");
if (_last_pkey) {
qlogger.debug("Last partition key: {}", *_last_pkey);
}
if (_has_clustering_keys && _last_ckey) {
qlogger.debug("Last clustering key: {}", *_last_ckey);
auto last_pos = view.get_last_position();
_last_pkey = last_pos.partition;
_last_pos = last_pos.position;
}
}
lw_shared_ptr<const paging_state> query_pager::state() const {
return make_lw_shared<paging_state>(_last_pkey.value_or(partition_key::make_empty()), _last_ckey, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition);
qlogger.debug("Fetched {} rows, max_remain={} {}", row_count, _max, _exhausted ? "(exh)" : "");
if (_last_pkey) {
qlogger.debug("Last partition key: {}", *_last_pkey);
}
if (_has_clustering_keys && _last_pos.region() == partition_region::clustered) {
qlogger.debug("Last clustering pos: {}", _last_pos);
}
}
lw_shared_ptr<const paging_state> query_pager::state() const {
return make_lw_shared<paging_state>(_last_pkey.value_or(partition_key::make_empty()), _last_pos, _exhausted ? 0 : _max, _cmd->query_uuid, _last_replicas, _query_read_repair_decision, _rows_fetched_for_last_partition);
}
}

View File

@@ -547,7 +547,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
// which is a valid paging state as well, and should return
// no rows.
paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
std::nullopt, paging_state->get_remaining(), paging_state->get_query_uuid(),
position_in_partition_view(position_in_partition_view::partition_start_tag_t()),
paging_state->get_remaining(), paging_state->get_query_uuid(),
paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(),
paging_state->get_rows_fetched_for_last_partition());
@@ -561,7 +562,8 @@ SEASTAR_TEST_CASE(test_simple_index_paging) {
{
// An artificial paging state with an empty key pair is also valid and is expected
// not to return rows (since no row matches an empty partition key)
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(), std::nullopt,
auto paging_state = make_lw_shared<service::pager::paging_state>(partition_key::make_empty(),
position_in_partition_view(position_in_partition_view::partition_start_tag_t()),
1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1);
auto qo = std::make_unique<cql3::query_options>(db::consistency_level::LOCAL_ONE, std::vector<cql3::raw_value>{},
cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()});