mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-23 01:50:35 +00:00
In this commit we create cas_shard in select_statement and pass it to the sp::query_result function.
176 lines
6.4 KiB
C++
176 lines
6.4 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*
|
|
* Modified by ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: (LicenseRef-ScyllaDB-Source-Available-1.0 and Apache-2.0)
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include "paging_state.hh"
|
|
#include "cql3/result_set.hh"
|
|
#include "cql3/selection/selection.hh"
|
|
#include "service/query_state.hh"
|
|
#include "exceptions/coordinator_result.hh"
|
|
#include "service/cas_shard.hh"
|
|
|
|
namespace service {
|
|
|
|
class storage_proxy;
|
|
class storage_proxy_coordinator_query_options;
|
|
class storage_proxy_coordinator_query_result;
|
|
|
|
namespace pager {
|
|
|
|
using query_function = std::function<future<exceptions::coordinator_result<service::storage_proxy_coordinator_query_result>>(
|
|
service::storage_proxy& sp,
|
|
schema_ptr schema,
|
|
lw_shared_ptr<query::read_command> cmd,
|
|
dht::partition_range_vector&& partition_ranges,
|
|
db::consistency_level cl,
|
|
service::storage_proxy_coordinator_query_options optional_params,
|
|
std::optional<service::cas_shard> cas_shard)>;
|
|
|
|
/**
|
|
* Perform a query, paging it by page of a given size.
|
|
*
|
|
* This is essentially an iterator of pages. Each call to fetchPage() will
|
|
* return the next page (i.e. the next list of rows) and isExhausted()
|
|
* indicates whether there is more page to fetch. The pageSize will
|
|
* either be in term of cells or in term of CQL3 row, depending on the
|
|
* parameters of the command we page.
|
|
*
|
|
* Please note that the pager might page within rows, so there is no guarantee
|
|
* that successive pages won't return the same row (though with different
|
|
* columns every time).
|
|
*
|
|
* Also, there is no guarantee that fetchPage() won't return an empty list,
|
|
* even if isExhausted() return false (but it is guaranteed to return an empty
|
|
* list *if* isExhausted() return true). Indeed, isExhausted() does *not*
|
|
* trigger a query so in some (fairly rare) case we might not know the paging
|
|
* is done even though it is.
|
|
*/
|
|
class query_pager {
|
|
public:
|
|
template<typename T = void>
|
|
using result = exceptions::coordinator_result<T>;
|
|
|
|
struct stats {
|
|
// Total number of rows read by this pager, based on all pages it fetched
|
|
size_t rows_read_total = 0;
|
|
};
|
|
|
|
protected:
|
|
// remember if we use clustering. if not, each partition == one row
|
|
const bool _has_clustering_keys;
|
|
bool _exhausted = false;
|
|
uint64_t _max;
|
|
uint64_t _per_partition_limit;
|
|
|
|
std::optional<partition_key> _last_pkey;
|
|
position_in_partition _last_pos;
|
|
std::optional<query_id> _query_uuid;
|
|
|
|
shared_ptr<service::storage_proxy> _proxy;
|
|
schema_ptr _query_schema;
|
|
shared_ptr<const cql3::selection::selection> _selection;
|
|
service::query_state& _state;
|
|
const cql3::query_options& _options;
|
|
lw_shared_ptr<query::read_command> _cmd;
|
|
dht::partition_range_vector _ranges;
|
|
std::optional<service::cas_shard> _cas_shard;
|
|
paging_state::replicas_per_token_range _last_replicas;
|
|
std::optional<db::read_repair_decision> _query_read_repair_decision;
|
|
uint64_t _rows_fetched_for_last_partition = 0;
|
|
stats _stats;
|
|
|
|
query_function _query_function;
|
|
|
|
public:
|
|
query_pager(service::storage_proxy& p, schema_ptr query_schema, shared_ptr<const cql3::selection::selection> selection,
|
|
service::query_state& state,
|
|
const cql3::query_options& options,
|
|
lw_shared_ptr<query::read_command> cmd,
|
|
dht::partition_range_vector ranges,
|
|
std::optional<service::cas_shard> cas_shard,
|
|
query_function query_function_override = {});
|
|
virtual ~query_pager() {}
|
|
|
|
/**
|
|
* Fetches the next page.
|
|
*
|
|
* @param pageSize the maximum number of elements to return in the next page.
|
|
* @return the page of result.
|
|
*/
|
|
future<std::unique_ptr<cql3::result_set>> fetch_page(uint32_t page_size, gc_clock::time_point, db::timeout_clock::time_point timeout);
|
|
future<result<std::unique_ptr<cql3::result_set>>> fetch_page_result(uint32_t page_size, gc_clock::time_point, db::timeout_clock::time_point timeout);
|
|
|
|
/**
|
|
* For more than one page.
|
|
*/
|
|
future<> fetch_page(cql3::selection::result_set_builder&, uint32_t page_size, gc_clock::time_point, db::timeout_clock::time_point timeout);
|
|
virtual future<result<>> fetch_page_result(cql3::selection::result_set_builder&, uint32_t page_size, gc_clock::time_point, db::timeout_clock::time_point timeout);
|
|
|
|
future<cql3::result_generator> fetch_page_generator(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout, cql3::cql_stats& stats);
|
|
future<result<cql3::result_generator>> fetch_page_generator_result(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout, cql3::cql_stats& stats);
|
|
|
|
/**
|
|
* Whether or not this pager is exhausted, i.e. whether or not a call to
|
|
* fetchPage may return more result.
|
|
*
|
|
* @return whether the pager is exhausted.
|
|
*/
|
|
bool is_exhausted() const {
|
|
return _exhausted;
|
|
}
|
|
|
|
/**
|
|
* The maximum number of cells/CQL3 row that we may still have to return.
|
|
* In other words, that's the initial user limit minus what we've already
|
|
* returned (note that it's not how many we *will* return, just the upper
|
|
* limit on it).
|
|
*/
|
|
uint64_t max_remaining() const {
|
|
return _max;
|
|
}
|
|
|
|
/**
|
|
* Get the current state (snapshot) of the pager. The state can allow to restart the
|
|
* paging on another host from where we are at this point.
|
|
*
|
|
* @return the current paging state. If the pager is exhausted, the result is a valid pointer
|
|
* to a paging_state instance which will return 0 on calling get_remaining() on it.
|
|
*/
|
|
lw_shared_ptr<const paging_state> state() const;
|
|
|
|
const stats& stats() const {
|
|
return _stats;
|
|
}
|
|
|
|
protected:
|
|
template<typename Base>
|
|
class query_result_visitor;
|
|
|
|
future<result<service::storage_proxy_coordinator_query_result>>
|
|
do_fetch_page(uint32_t page_size, gc_clock::time_point now, db::timeout_clock::time_point timeout);
|
|
|
|
template<typename Visitor>
|
|
requires query::ResultVisitor<Visitor>
|
|
void handle_result(Visitor&& visitor,
|
|
const foreign_ptr<lw_shared_ptr<query::result>>& results,
|
|
uint32_t page_size, gc_clock::time_point now);
|
|
|
|
virtual uint64_t max_rows_to_fetch(uint32_t page_size) {
|
|
return std::min(_max, static_cast<uint64_t>(page_size));
|
|
}
|
|
|
|
virtual void maybe_adjust_per_partition_limit(uint32_t page_size) const { }
|
|
};
|
|
|
|
}
|
|
}
|
|
|