Files
scylladb/alternator/executor_read.cc
Radosław Cybulski 74b523ea20 treewide: fix spelling errors.
Fix various spelling errors.

Closes scylladb/scylladb#29574
2026-04-21 18:20:26 +03:00

1958 lines
104 KiB
C++

/*
* Copyright 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
// This file implements the Alternator read operations: GetItem, BatchGetItem,
// Query (including vector search) and Scan.
// Public entry points:
// * executor::get_item()
// * executor::batch_get_item()
// * executor::scan()
// * executor::query()
// Major internal functions:
// * do_query(): the common code for Query and Scan, except vector search.
// * query_vector(): the vector-search code path for Query with VectorSearch.
// and a number of helper functions for parsing common parameters of read
// requests such as TableName, IndexName, Select, FilterExpression,
// ConsistentRead, ProjectionExpression, and more.
#include "alternator/executor.hh"
#include "alternator/executor_util.hh"
#include "alternator/conditions.hh"
#include "alternator/expressions.hh"
#include "alternator/consumed_capacity.hh"
#include "alternator/serialization.hh"
#include "alternator/attribute_path.hh"
#include "auth/permission.hh"
#include "cql3/selection/selection.hh"
#include "cql3/result_set.hh"
#include "query/query-request.hh"
#include "schema/schema.hh"
#include "service/client_state.hh"
#include "service/pager/query_pagers.hh"
#include "service/storage_proxy.hh"
#include "index/secondary_index.hh"
#include "utils/assert.hh"
#include "utils/overloaded_functor.hh"
#include "utils/error_injection.hh"
#include "vector_search/vector_store_client.hh"
#include <seastar/core/abort_on_expiry.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <boost/range/algorithm/find_end.hpp>
#include <charconv>
#include <stdexcept>
using namespace std::chrono_literals;
namespace alternator {
extern logging::logger elogger; // from executor.cc
// make_streamed_with_extra_array() is variant of make_streamed() above, which
// builds a streaming response (a function writing to an output stream) from a
// JSON object (rjson::value) but adds to it at the end an additional array.
// The extra array is given a separate chunked_vector to avoid putting it
// inside the rjson::value - because RapidJSON does contiguous allocations for
// arrays which we want to avoid for potentially long arrays in Query/Scan
// responses (see #23535).
// If we ever fix RapidJSON to avoid contiguous allocations for arrays, or
// replace it entirely (#24458), we can remove this function and the function
// rjson::print_with_extra_array() which it calls.
static body_writer make_streamed_with_extra_array(rjson::value&& value,
std::string array_name, utils::chunked_vector<rjson::value>&& array) {
return [value = std::move(value), array_name = std::move(array_name), array = std::move(array)](output_stream<char>&& _out) mutable -> future<> {
auto out = std::move(_out);
std::exception_ptr ex;
try {
co_await rjson::print_with_extra_array(value, array_name, array, out);
} catch (...) {
ex = std::current_exception();
}
co_await out.close();
co_await rjson::destroy_gently(std::move(value));
// TODO: can/should we also destroy the array gently?
if (ex) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
};
}
// select_type represents how the Select parameter of Query/Scan selects what
// to return. It is also used by calculate_attrs_to_get() to know whether to
// return no attributes (count), or specific attributes.
enum class select_type { regular, count, projection };
// Check according to the request's "ConsistentRead" field, which consistency
// level we need to use for the read. The field can be True for strongly
// consistent reads, or False for eventually consistent reads, or if this
// field is absence, we default to eventually consistent reads.
// In Scylla, eventually-consistent reads are implemented as consistency
// level LOCAL_ONE, and strongly-consistent reads as LOCAL_QUORUM.
static db::consistency_level get_read_consistency(const rjson::value& request) {
const rjson::value* consistent_read_value = rjson::find(request, "ConsistentRead");
bool consistent_read = false;
if (consistent_read_value && !consistent_read_value->IsNull()) {
if (consistent_read_value->IsBool()) {
consistent_read = consistent_read_value->GetBool();
} else {
throw api_error::validation("ConsistentRead flag must be a boolean");
}
}
return consistent_read ? db::consistency_level::LOCAL_QUORUM : db::consistency_level::LOCAL_ONE;
}
// attrs_to_get saves for each top-level attribute an attrs_to_get_node,
// a hierarchy of subparts that need to be kept. The following function
// calculate_attrs_to_get() takes either AttributesToGet or
// ProjectionExpression parameters (having both is *not* allowed),
// and returns the list of cells we need to read, or a disengaged optional
// when *all* attributes are to be returned.
// However, in our current implementation, only top-level attributes are
// stored as separate cells - a nested document is stored serialized together
// (as JSON) in the same cell. So this function return a map - each key is the
// top-level attribute we will need need to read, and the value for each
// top-level attribute is the partial hierarchy (struct hierarchy_filter)
// that we will need to extract from that serialized JSON.
// For example, if ProjectionExpression lists a.b and a.c[2], we
// return one top-level attribute name, "a", with the value "{b, c[2]}".
static std::optional<attrs_to_get> calculate_attrs_to_get(const rjson::value& req, parsed::expression_cache& parsed_expression_cache, std::unordered_set<std::string>& used_attribute_names, select_type select = select_type::regular) {
if (select == select_type::count) {
// An empty map asks to retrieve no attributes. Note that this is
// different from a disengaged optional which means retrieve all.
return attrs_to_get();
}
// FIXME: also need to handle select_type::projection
const bool has_attributes_to_get = req.HasMember("AttributesToGet");
const bool has_projection_expression = req.HasMember("ProjectionExpression");
if (has_attributes_to_get && has_projection_expression) {
throw api_error::validation(
format("GetItem does not allow both ProjectionExpression and AttributesToGet to be given together"));
}
if (has_attributes_to_get) {
const rjson::value& attributes_to_get = req["AttributesToGet"];
attrs_to_get ret;
for (auto it = attributes_to_get.Begin(); it != attributes_to_get.End(); ++it) {
attribute_path_map_add("AttributesToGet", ret, rjson::to_string(*it));
validate_attr_name_length("AttributesToGet", it->GetStringLength(), false);
}
if (ret.empty()) {
throw api_error::validation("Empty AttributesToGet is not allowed. Consider using Select=COUNT instead.");
}
return ret;
} else if (has_projection_expression) {
const rjson::value& projection_expression = req["ProjectionExpression"];
const rjson::value* expression_attribute_names = rjson::find(req, "ExpressionAttributeNames");
std::vector<parsed::path> paths_to_get;
try {
paths_to_get = parsed_expression_cache.parse_projection_expression(rjson::to_string_view(projection_expression));
} catch(expressions_syntax_error& e) {
throw api_error::validation(e.what());
}
resolve_projection_expression(paths_to_get, expression_attribute_names, used_attribute_names);
attrs_to_get ret;
for (const parsed::path& p : paths_to_get) {
attribute_path_map_add("ProjectionExpression", ret, p);
}
return ret;
}
// A disengaged optional asks to read everything
return std::nullopt;
}
// get_table_or_view() is similar to to get_table(), except it returns either
// a table or a materialized view from which to read, based on the TableName
// and optional IndexName in the request. Only requests like Query and Scan
// which allow IndexName should use this function.
enum class table_or_view_type { base, lsi, gsi, vector_index };
static std::pair<schema_ptr, table_or_view_type>
get_table_or_view(service::storage_proxy& proxy, const rjson::value& request) {
table_or_view_type type = table_or_view_type::base;
std::string table_name = get_table_name(request);
if (schema_ptr s = try_get_internal_table(proxy.data_dictionary(), table_name)) {
return {s, type};
}
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
const rjson::value* index_name = rjson::find(request, "IndexName");
std::string orig_table_name;
if (index_name) {
if (index_name->IsString()) {
orig_table_name = std::move(table_name);
table_name = view_name(orig_table_name, rjson::to_string_view(*index_name));
type = table_or_view_type::gsi;
} else {
throw api_error::validation(
fmt::format("Non-string IndexName '{}'", rjson::to_string_view(*index_name)));
}
// If no tables for global indexes were found, the index may be local
if (!proxy.data_dictionary().has_schema(keyspace_name, table_name)) {
type = table_or_view_type::lsi;
table_name = lsi_name(orig_table_name, rjson::to_string_view(*index_name));
}
}
try {
return { proxy.data_dictionary().find_schema(keyspace_name, table_name), type };
} catch(data_dictionary::no_such_column_family&) {
if (index_name) {
// DynamoDB returns a different error depending on whether the
// base table doesn't exist (ResourceNotFoundException) or it
// does exist but the index does not (ValidationException).
auto base_table = proxy.data_dictionary().try_find_table(keyspace_name, orig_table_name);
if (base_table) {
// If the given IndexName is a vector index, not a GSI or LSI,
// give a more helpful message than just an index not found.
if (base_table->schema()->has_index(rjson::to_sstring(*index_name))) {
throw api_error::validation(
fmt::format("IndexName '{}' is a vector index for table '{}, so VectorSearch is mandatory in Query.", rjson::to_string_view(*index_name), orig_table_name));
}
throw api_error::validation(
fmt::format("Requested resource not found: Index '{}' for table '{}'", rjson::to_string_view(*index_name), orig_table_name));
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", orig_table_name));
}
} else {
throw api_error::resource_not_found(
fmt::format("Requested resource not found: Table: {} not found", table_name));
}
}
}
// Parse the "Select" parameter of a Scan or Query operation, throwing a
// ValidationException in various forbidden combinations of options and
// finally returning one of three options:
// 1. regular - the default scan behavior of returning all or specific
// attributes ("ALL_ATTRIBUTES" or "SPECIFIC_ATTRIBUTES").
// 2. count - just count the items ("COUNT")
// 3. projection - return projected attributes ("ALL_PROJECTED_ATTRIBUTES")
// An ValidationException is thrown when recognizing an invalid combination
// of options - such as ALL_PROJECTED_ATTRIBUTES for a base table, or
// SPECIFIC_ATTRIBUTES without ProjectionExpression or AttributesToGet.
static select_type parse_select(const rjson::value& request, table_or_view_type table_type) {
const rjson::value* select_value = rjson::find(request, "Select");
const bool has_attributes_to_get = request.HasMember("AttributesToGet");
const bool has_projection_expression = request.HasMember("ProjectionExpression");
if (!select_value) {
// "Select" is not specified:
// If ProjectionExpression or AttributesToGet are present,
// then Select defaults to SPECIFIC_ATTRIBUTES:
if (has_projection_expression || has_attributes_to_get) {
return select_type::regular;
}
// Otherwise, "Select" defaults to ALL_ATTRIBUTES on a base table,
// or ALL_PROJECTED_ATTRIBUTES on an index. This is explicitly
// documented in the DynamoDB API reference.
return table_type == table_or_view_type::base ?
select_type::regular : select_type::projection;
}
if (!select_value->IsString()) {
throw api_error::validation("Select parameter must be a string");
}
std::string_view select = rjson::to_string_view(*select_value);
if (select == "SPECIFIC_ATTRIBUTES") {
if (has_projection_expression || has_attributes_to_get) {
return select_type::regular;
}
throw api_error::validation("Select=SPECIFIC_ATTRIBUTES requires AttributesToGet or ProjectionExpression");
}
if (has_projection_expression || has_attributes_to_get) {
throw api_error::validation("AttributesToGet or ProjectionExpression require Select to be either SPECIFIC_ATTRIBUTES or missing");
}
if (select == "COUNT") {
return select_type::count;
}
if (select == "ALL_ATTRIBUTES") {
// FIXME: when we support projections (#5036), if this is a GSI and
// not all attributes are projected to it, we should throw.
return select_type::regular;
}
if (select == "ALL_PROJECTED_ATTRIBUTES") {
if (table_type == table_or_view_type::base) {
throw api_error::validation("ALL_PROJECTED_ATTRIBUTES only allowed for indexes");
}
return select_type::projection;
}
throw api_error::validation(fmt::format("Unknown Select value '{}'. Allowed choices: ALL_ATTRIBUTES, SPECIFIC_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, COUNT",
select));
}
// "filter" represents a condition that can be applied to individual items
// read by a Query or Scan operation, to decide whether to keep the item.
// A filter is constructed from a Query or Scan request. This uses the
// relevant fields in the query (FilterExpression or QueryFilter/ScanFilter +
// ConditionalOperator). These fields are pre-checked and pre-parsed as much
// as possible, to ensure that later checking of many items is efficient.
class filter {
private:
// Holding QueryFilter/ScanFilter + ConditionalOperator:
struct conditions_filter {
bool require_all;
rjson::value conditions;
};
// Holding a parsed FilterExpression:
struct expression_filter {
parsed::condition_expression expression;
};
std::optional<std::variant<conditions_filter, expression_filter>> _imp;
public:
// Filtering for Scan and Query are very similar, but there are some
// small differences, especially the names of the request attributes.
enum class request_type { SCAN, QUERY };
// Note that a filter does not store pointers to the query used to
// construct it.
filter(parsed::expression_cache& parsed_expression_cache, const rjson::value& request, request_type rt,
std::unordered_set<std::string>& used_attribute_names,
std::unordered_set<std::string>& used_attribute_values);
bool check(const rjson::value& item) const;
bool filters_on(std::string_view attribute) const;
// for_filters_on() runs the given function on the attributes that the
// filter works on. It may run for the same attribute more than once if
// used more than once in the filter.
void for_filters_on(const noncopyable_function<void(std::string_view)>& func) const;
operator bool() const { return bool(_imp); }
};
filter::filter(parsed::expression_cache& parsed_expression_cache, const rjson::value& request, request_type rt,
std::unordered_set<std::string>& used_attribute_names,
std::unordered_set<std::string>& used_attribute_values) {
const rjson::value* expression = rjson::find(request, "FilterExpression");
const char* conditions_attribute = (rt == request_type::SCAN) ? "ScanFilter" : "QueryFilter";
const rjson::value* conditions = rjson::find(request, conditions_attribute);
auto conditional_operator = get_conditional_operator(request);
if (conditional_operator != conditional_operator_type::MISSING &&
(!conditions || (conditions->IsObject() && conditions->GetObject().ObjectEmpty()))) {
throw api_error::validation(
format("'ConditionalOperator' parameter cannot be specified for missing or empty {}",
conditions_attribute));
}
if (expression && conditions) {
throw api_error::validation(
format("FilterExpression and {} are not allowed together", conditions_attribute));
}
if (expression) {
if (!expression->IsString()) {
throw api_error::validation("FilterExpression must be a string");
}
if (expression->GetStringLength() == 0) {
throw api_error::validation("FilterExpression must not be empty");
}
if (rjson::find(request, "AttributesToGet")) {
throw api_error::validation("Cannot use both old-style and new-style parameters in same request: FilterExpression and AttributesToGet");
}
try {
auto parsed = parsed_expression_cache.parse_condition_expression(rjson::to_string_view(*expression), "FilterExpression");
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
const rjson::value* expression_attribute_values = rjson::find(request, "ExpressionAttributeValues");
resolve_condition_expression(parsed,
expression_attribute_names, expression_attribute_values,
used_attribute_names, used_attribute_values);
_imp = expression_filter { std::move(parsed) };
} catch(expressions_syntax_error& e) {
throw api_error::validation(e.what());
}
}
if (conditions) {
if (rjson::find(request, "ProjectionExpression")) {
throw api_error::validation(format("Cannot use both old-style and new-style parameters in same request: {} and ProjectionExpression", conditions_attribute));
}
bool require_all = conditional_operator != conditional_operator_type::OR;
_imp = conditions_filter { require_all, rjson::copy(*conditions) };
}
}
bool filter::check(const rjson::value& item) const {
if (!_imp) {
return true;
}
return std::visit(overloaded_functor {
[&] (const conditions_filter& f) -> bool {
return verify_condition(f.conditions, f.require_all, &item);
},
[&] (const expression_filter& f) -> bool {
return verify_condition_expression(f.expression, &item);
}
}, *_imp);
}
bool filter::filters_on(std::string_view attribute) const {
if (!_imp) {
return false;
}
return std::visit(overloaded_functor {
[&] (const conditions_filter& f) -> bool {
for (auto it = f.conditions.MemberBegin(); it != f.conditions.MemberEnd(); ++it) {
if (rjson::to_string_view(it->name) == attribute) {
return true;
}
}
return false;
},
[&] (const expression_filter& f) -> bool {
return condition_expression_on(f.expression, attribute);
}
}, *_imp);
}
void filter::for_filters_on(const noncopyable_function<void(std::string_view)>& func) const {
if (_imp) {
std::visit(overloaded_functor {
[&] (const conditions_filter& f) -> void {
for (auto it = f.conditions.MemberBegin(); it != f.conditions.MemberEnd(); ++it) {
func(rjson::to_string_view(it->name));
}
},
[&] (const expression_filter& f) -> void {
return for_condition_expression_on(f.expression, func);
}
}, *_imp);
}
}
class describe_items_visitor {
typedef std::vector<const column_definition*> columns_t;
const columns_t& _columns;
const std::optional<attrs_to_get>& _attrs_to_get;
std::unordered_set<std::string> _extra_filter_attrs;
const filter& _filter;
typename columns_t::const_iterator _column_it;
rjson::value _item;
// _items is a chunked_vector<rjson::value> instead of a RapidJson array
// (rjson::value) because unfortunately RapidJson arrays are stored
// contiguously in memory, and cause large allocations when a Query/Scan
// returns a long list of short items (issue #23535).
utils::chunked_vector<rjson::value> _items;
size_t _scanned_count;
public:
describe_items_visitor(const columns_t& columns, const std::optional<attrs_to_get>& attrs_to_get, filter& filter)
: _columns(columns)
, _attrs_to_get(attrs_to_get)
, _filter(filter)
, _column_it(columns.begin())
, _item(rjson::empty_object())
, _scanned_count(0)
{
// _filter.check() may need additional attributes not listed in
// _attrs_to_get (i.e., not requested as part of the output).
// We list those in _extra_filter_attrs. We will include them in
// the JSON but take them out before finally returning the JSON.
if (_attrs_to_get) {
_filter.for_filters_on([&] (std::string_view attr) {
std::string a(attr); // no heterogeneous maps searches :-(
if (!_attrs_to_get->contains(a)) {
_extra_filter_attrs.emplace(std::move(a));
}
});
}
}
void start_row() {
_column_it = _columns.begin();
}
void accept_value(managed_bytes_view_opt result_bytes_view) {
if (!result_bytes_view) {
++_column_it;
return;
}
result_bytes_view->with_linearized([this] (bytes_view bv) {
std::string column_name = (*_column_it)->name_as_text();
if (column_name != executor::ATTRS_COLUMN_NAME) {
if (!_attrs_to_get || _attrs_to_get->contains(column_name) || _extra_filter_attrs.contains(column_name)) {
if (!_item.HasMember(column_name.c_str())) {
rjson::add_with_string_name(_item, column_name, rjson::empty_object());
}
rjson::value& field = _item[column_name.c_str()];
rjson::add_with_string_name(field, type_to_string((*_column_it)->type), json_key_column_value(bv, **_column_it));
}
} else {
auto deserialized = attrs_type()->deserialize(bv);
auto keys_and_values = value_cast<map_type_impl::native_type>(deserialized);
for (auto entry : keys_and_values) {
std::string attr_name = value_cast<sstring>(entry.first);
if (!_attrs_to_get || _attrs_to_get->contains(attr_name) || _extra_filter_attrs.contains(attr_name)) {
bytes value = value_cast<bytes>(entry.second);
// Even if _attrs_to_get asked to keep only a part of a
// top-level attribute, we keep the entire attribute
// at this stage, because the item filter might still
// need the other parts (it was easier for us to keep
// extra_filter_attrs at top-level granularity). We'll
// filter the unneeded parts after item filtering.
rjson::add_with_string_name(_item, attr_name, deserialize_item(value));
}
}
}
});
++_column_it;
}
void end_row() {
if (_filter.check(_item)) {
// As noted above, we kept entire top-level attributes listed in
// _attrs_to_get. We may need to only keep parts of them.
if (_attrs_to_get) {
for (const auto& attr: *_attrs_to_get) {
// If !attr.has_value() it means we were asked not to keep
// attr entirely, but just parts of it.
if (!attr.second.has_value()) {
rjson::value* toplevel= rjson::find(_item, attr.first);
if (toplevel && !hierarchy_filter(*toplevel, attr.second)) {
rjson::remove_member(_item, attr.first);
}
}
}
}
// Remove the extra attributes _extra_filter_attrs which we had
// to add just for the filter, and not requested to be returned:
for (const auto& attr : _extra_filter_attrs) {
rjson::remove_member(_item, attr);
}
_items.push_back(std::move(_item));
}
_item = rjson::empty_object();
++_scanned_count;
}
utils::chunked_vector<rjson::value> get_items() && {
return std::move(_items);
}
size_t get_scanned_count() {
return _scanned_count;
}
};
// describe_items() returns a JSON object that includes members "Count"
// and "ScannedCount", but *not* "Items" - that is returned separately
// as a chunked_vector to avoid large contiguous allocations which
// RapidJSON does of its array. The caller should add "Items" to the
// returned JSON object if needed, or print it separately.
// The returned chunked_vector (the items) is std::optional<>, because
// the user may have requested only to count items, and not return any
// items - which is different from returning an empty list of items.
static future<std::tuple<rjson::value, std::optional<utils::chunked_vector<rjson::value>>, size_t>> describe_items(
const cql3::selection::selection& selection,
std::unique_ptr<cql3::result_set> result_set,
std::optional<attrs_to_get>&& attrs_to_get,
filter&& filter) {
describe_items_visitor visitor(selection.get_columns(), attrs_to_get, filter);
co_await result_set->visit_gently(visitor);
auto scanned_count = visitor.get_scanned_count();
utils::chunked_vector<rjson::value> items = std::move(visitor).get_items();
rjson::value items_descr = rjson::empty_object();
auto size = items.size();
rjson::add(items_descr, "Count", rjson::value(size));
rjson::add(items_descr, "ScannedCount", rjson::value(scanned_count));
// If attrs_to_get && attrs_to_get->empty(), this means the user asked not
// to get any attributes (i.e., a Scan or Query with Select=COUNT) and we
// shouldn't return "Items" at all.
// TODO: consider optimizing the case of Select=COUNT without a filter.
// In that case, we currently build a list of empty items and here drop
// it. We could just count the items and not bother with the empty items.
// (However, remember that when we do have a filter, we need the items).
std::optional<utils::chunked_vector<rjson::value>> opt_items;
if (!attrs_to_get || !attrs_to_get->empty()) {
opt_items = std::move(items);
}
co_return std::tuple(std::move(items_descr), std::move(opt_items), size);
}
static rjson::value encode_paging_state(const schema& schema, const service::pager::paging_state& paging_state) {
rjson::value last_evaluated_key = rjson::empty_object();
std::vector<bytes> exploded_pk = paging_state.get_partition_key().explode();
auto exploded_pk_it = exploded_pk.begin();
for (const column_definition& cdef : schema.partition_key_columns()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef.name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef.name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
++exploded_pk_it;
}
auto pos = paging_state.get_position_in_partition();
if (pos.has_key()) {
// Alternator itself allows at most one column in clustering key, but
// user can use Alternator api to access system tables which might have
// multiple clustering key columns. So we need to handle that case here.
auto cdef_it = schema.clustering_key_columns().begin();
for(const auto &exploded_ck : pos.key().explode()) {
rjson::add_with_string_name(last_evaluated_key, std::string_view(cdef_it->name_as_text()), rjson::empty_object());
rjson::value& key_entry = last_evaluated_key[cdef_it->name_as_text()];
rjson::add_with_string_name(key_entry, type_to_string(cdef_it->type), json_key_column_value(exploded_ck, *cdef_it));
++cdef_it;
}
}
// To avoid possible conflicts (and thus having to reserve these names) we
// avoid adding the weight and region fields of the position to the paging
// state. Alternator will never need these as it doesn't have range
// tombstones (the only thing that can generate a position other than at(row)).
// We conditionally include these fields when reading CQL tables through alternator.
if (!is_alternator_keyspace(schema.ks_name()) && (!pos.has_key() || pos.get_bound_weight() != bound_weight::equal)) {
rjson::add_with_string_name(last_evaluated_key, scylla_paging_region, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_region.data()], "S", rjson::from_string(fmt::to_string(pos.region())));
rjson::add_with_string_name(last_evaluated_key, scylla_paging_weight, rjson::empty_object());
rjson::add(last_evaluated_key[scylla_paging_weight.data()], "N", static_cast<int>(pos.get_bound_weight()));
}
return last_evaluated_key;
}
// RapidJSON allocates arrays contiguously in memory, so we want to avoid
// returning a large number of items as a single rapidjson array, and use
// a chunked_vector instead. The following constant is an arbitrary cutoff
// point for when to switch from a rapidjson array to a chunked_vector.
static constexpr int max_items_for_rapidjson_array = 256;
static future<executor::request_return_type> do_query(service::storage_proxy& proxy,
schema_ptr table_schema,
const rjson::value* exclusive_start_key,
dht::partition_range_vector partition_ranges,
std::vector<query::clustering_range> ck_bounds,
std::optional<attrs_to_get> attrs_to_get,
uint32_t limit,
db::consistency_level cl,
filter filter,
query::partition_slice::option_set custom_opts,
service::client_state& client_state,
alternator::stats& stats,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool enforce_authorization,
bool warn_authorization) {
lw_shared_ptr<service::pager::paging_state> old_paging_state = nullptr;
tracing::trace(trace_state, "Performing a database query");
// Reverse the schema and the clustering bounds as the underlying code expects
// reversed queries in the native reversed format.
auto query_schema = table_schema;
const bool reversed = custom_opts.contains<query::partition_slice::option::reversed>();
if (reversed) {
query_schema = table_schema->get_reversed();
std::reverse(ck_bounds.begin(), ck_bounds.end());
for (auto& bound : ck_bounds) {
bound = query::reverse(bound);
}
}
if (exclusive_start_key) {
partition_key pk = pk_from_json(*exclusive_start_key, table_schema);
auto pos = position_in_partition::for_partition_start();
if (table_schema->clustering_key_size() > 0) {
pos = pos_from_json(*exclusive_start_key, table_schema);
}
old_paging_state = make_lw_shared<service::pager::paging_state>(pk, pos, query::max_partitions, query_id::create_null_id(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 0);
}
co_await verify_permission(enforce_authorization, warn_authorization, client_state, table_schema, auth::permission::SELECT, stats);
auto regular_columns =
table_schema->regular_columns() | std::views::transform(&column_definition::id)
| std::ranges::to<query::column_id_vector>();
auto static_columns =
table_schema->static_columns() | std::views::transform(&column_definition::id)
| std::ranges::to<query::column_id_vector>();
auto selection = cql3::selection::selection::wildcard(table_schema);
query::partition_slice::option_set opts = selection->get_query_options();
opts.add(custom_opts);
auto partition_slice = query::partition_slice(std::move(ck_bounds), std::move(static_columns), std::move(regular_columns), opts);
auto command = ::make_lw_shared<query::read_command>(query_schema->id(), query_schema->version(), partition_slice, proxy.get_max_result_size(partition_slice),
query::tombstone_limit(proxy.get_tombstone_limit()));
elogger.trace("Executing read query (reversed {}): table schema {}, query schema {}", partition_slice.is_reversed(), table_schema->version(), query_schema->version());
auto query_state_ptr = std::make_unique<service::query_state>(client_state, trace_state, std::move(permit));
// FIXME: should be moved above, set on opts, so get_max_result_size knows it?
command->slice.options.set<query::partition_slice::option::allow_short_read>();
auto query_options = std::make_unique<cql3::query_options>(cl, std::vector<cql3::raw_value>{});
query_options = std::make_unique<cql3::query_options>(std::move(query_options), std::move(old_paging_state));
auto p = service::pager::query_pagers::pager(proxy, query_schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr);
std::unique_ptr<cql3::result_set> rs = co_await p->fetch_page(limit, gc_clock::now(), executor::default_timeout());
if (!p->is_exhausted()) {
rs->get_metadata().set_paging_state(p->state());
}
auto paging_state = rs->get_metadata().paging_state();
bool has_filter = filter;
auto [items_descr, opt_items, size] = co_await describe_items(*selection, std::move(rs), std::move(attrs_to_get), std::move(filter));
if (paging_state) {
rjson::add(items_descr, "LastEvaluatedKey", encode_paging_state(*table_schema, *paging_state));
}
if (has_filter) {
stats.cql_stats.filtered_rows_read_total += p->stats().rows_read_total;
// update our "filtered_row_matched_total" for all the rows matched, despited the filter
stats.cql_stats.filtered_rows_matched_total += size;
}
if (opt_items) {
if (opt_items->size() >= max_items_for_rapidjson_array) {
// There are many items, better print the JSON and the array of
// items (opt_items) separately to avoid RapidJSON's contiguous
// allocation of arrays.
co_return make_streamed_with_extra_array(std::move(items_descr), "Items", std::move(*opt_items));
}
// There aren't many items in the chunked vector opt_items,
// let's just insert them into the JSON object and print the
// full JSON normally.
rjson::value items_json = rjson::empty_array();
for (auto& item : *opt_items) {
rjson::push_back(items_json, std::move(item));
}
rjson::add(items_descr, "Items", std::move(items_json));
}
if (is_big(items_descr)) {
co_return make_streamed(std::move(items_descr));
}
co_return rjson::print(std::move(items_descr));
}
static dht::token token_for_segment(int segment, int total_segments) {
throwing_assert(total_segments > 1 && segment >= 0 && segment < total_segments);
uint64_t delta = std::numeric_limits<uint64_t>::max() / total_segments;
return dht::token::from_int64(std::numeric_limits<int64_t>::min() + delta * segment);
}
static dht::partition_range get_range_for_segment(int segment, int total_segments) {
if (total_segments == 1) {
return dht::partition_range::make_open_ended_both_sides();
}
if (segment == 0) {
dht::token ending_token = token_for_segment(1, total_segments);
return dht::partition_range::make_ending_with(
dht::partition_range::bound(dht::ring_position::ending_at(ending_token), false));
} else if (segment == total_segments - 1) {
dht::token starting_token = token_for_segment(segment, total_segments);
return dht::partition_range::make_starting_with(
dht::partition_range::bound(dht::ring_position::starting_at(starting_token)));
} else {
dht::token starting_token = token_for_segment(segment, total_segments);
dht::token ending_token = token_for_segment(segment + 1, total_segments);
return dht::partition_range::make(
dht::partition_range::bound(dht::ring_position::starting_at(starting_token)),
dht::partition_range::bound(dht::ring_position::ending_at(ending_token), false)
);
}
}
future<executor::request_return_type> executor::scan(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr<audit::audit_info_alternator>& audit_info) {
_stats.api_operations.scan++;
elogger.trace("Scanning {}", request);
auto [schema, table_type] = get_table_or_view(_proxy, request);
db::consistency_level cl = get_read_consistency(request);
maybe_audit(audit_info, audit::statement_category::QUERY, schema->ks_name(), schema->cf_name(), "Scan", request, cl);
tracing::add_alternator_table_name(trace_state, schema->cf_name());
get_stats_from_schema(_proxy, *schema)->api_operations.scan++;
auto segment = get_int_attribute(request, "Segment");
auto total_segments = get_int_attribute(request, "TotalSegments");
if (segment || total_segments) {
if (!segment || !total_segments) {
return make_ready_future<request_return_type>(api_error::validation(
"Both Segment and TotalSegments attributes need to be present for a parallel scan"));
}
if (*segment < 0 || *segment >= *total_segments) {
return make_ready_future<request_return_type>(api_error::validation(
"Segment must be non-negative and less than TotalSegments"));
}
if (*total_segments < 0 || *total_segments > 1000000) {
return make_ready_future<request_return_type>(api_error::validation(
"TotalSegments must be non-negative and less or equal to 1000000"));
}
}
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
if (table_type == table_or_view_type::gsi && cl != db::consistency_level::LOCAL_ONE) {
return make_ready_future<request_return_type>(api_error::validation(
"Consistent reads are not allowed on global indexes (GSI)"));
}
rjson::value* limit_json = rjson::find(request, "Limit");
uint32_t limit = limit_json ? limit_json->GetUint64() : std::numeric_limits<uint32_t>::max();
if (limit <= 0) {
return make_ready_future<request_return_type>(api_error::validation("Limit must be greater than 0"));
}
select_type select = parse_select(request, table_type);
std::unordered_set<std::string> used_attribute_names;
std::unordered_set<std::string> used_attribute_values;
auto attrs_to_get = calculate_attrs_to_get(request, *_parsed_expression_cache, used_attribute_names, select);
dht::partition_range_vector partition_ranges;
if (segment) {
auto range = get_range_for_segment(*segment, *total_segments);
if (exclusive_start_key) {
auto ring_pos = dht::ring_position{dht::decorate_key(*schema, pk_from_json(*exclusive_start_key, schema))};
if (!range.contains(ring_pos, dht::ring_position_comparator(*schema))) {
return make_ready_future<request_return_type>(api_error::validation(
format("The provided starting key is invalid: Invalid ExclusiveStartKey. Please use ExclusiveStartKey "
"with correct Segment. TotalSegments: {} Segment: {}", *total_segments, *segment)));
}
}
partition_ranges.push_back(range);
} else {
partition_ranges.push_back(dht::partition_range::make_open_ended_both_sides());
}
std::vector<query::clustering_range> ck_bounds{query::clustering_range::make_open_ended_both_sides()};
filter filter(*_parsed_expression_cache, request, filter::request_type::SCAN, used_attribute_names, used_attribute_values);
// Note: Unlike Query, Scan does allow a filter on the key attributes.
// For some *specific* cases of key filtering, such an equality test on
// partition key or comparison operator for the sort key, we could have
// optimized the filtering by modifying partition_ranges and/or
// ck_bounds. We haven't done this optimization yet.
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
const rjson::value* expression_attribute_values = rjson::find(request, "ExpressionAttributeValues");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "Scan");
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Scan");
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), query::partition_slice::option_set(), client_state, _stats, trace_state, std::move(permit), _enforce_authorization, _warn_authorization);
}
static dht::partition_range calculate_pk_bound(schema_ptr schema, const column_definition& pk_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
auto op = get_comparison_operator(comp_definition);
if (op != comparison_operator_type::EQ) {
throw api_error::validation(format("Hash key can only be restricted with equality operator (EQ). {} not supported.", comp_definition));
}
if (attrs.Size() != 1) {
throw api_error::validation(format("A single attribute is required for a hash key EQ restriction: {}", attrs));
}
bytes raw_value = get_key_from_typed_value(attrs[0], pk_cdef);
partition_key pk = partition_key::from_singular_bytes(*schema, std::move(raw_value));
auto decorated_key = dht::decorate_key(*schema, pk);
return dht::partition_range(decorated_key);
}
static query::clustering_range get_clustering_range_for_begins_with(bytes&& target, const clustering_key& ck, schema_ptr schema, data_type t) {
auto it = boost::range::find_end(target, bytes("\xFF"), std::not_equal_to<bytes::value_type>());
if (it != target.end()) {
++*it;
target.resize(std::distance(target.begin(), it) + 1);
clustering_key upper_limit = clustering_key::from_single_value(*schema, target);
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit, false));
}
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck));
}
static query::clustering_range calculate_ck_bound(schema_ptr schema, const column_definition& ck_cdef, const rjson::value& comp_definition, const rjson::value& attrs) {
auto op = get_comparison_operator(comp_definition);
const size_t expected_attrs_size = (op == comparison_operator_type::BETWEEN) ? 2 : 1;
if (attrs.Size() != expected_attrs_size) {
throw api_error::validation(format("{} arguments expected for a sort key restriction: {}", expected_attrs_size, attrs));
}
bytes raw_value = get_key_from_typed_value(attrs[0], ck_cdef);
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
switch (op) {
case comparison_operator_type::EQ:
return query::clustering_range(ck);
case comparison_operator_type::LE:
return query::clustering_range::make_ending_with(query::clustering_range::bound(ck));
case comparison_operator_type::LT:
return query::clustering_range::make_ending_with(query::clustering_range::bound(ck, false));
case comparison_operator_type::GE:
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck));
case comparison_operator_type::GT:
return query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false));
case comparison_operator_type::BETWEEN: {
bytes raw_upper_limit = get_key_from_typed_value(attrs[1], ck_cdef);
clustering_key upper_limit = clustering_key::from_single_value(*schema, raw_upper_limit);
return query::clustering_range::make(query::clustering_range::bound(ck), query::clustering_range::bound(upper_limit));
}
case comparison_operator_type::BEGINS_WITH: {
if (raw_value.empty()) {
return query::clustering_range::make_open_ended_both_sides();
}
// NOTICE(sarna): A range starting with given prefix and ending (non-inclusively) with a string "incremented" by a single
// character at the end. Throws for NUMBER instances.
if (!ck_cdef.type->is_compatible_with(*utf8_type)) {
throw api_error::validation(fmt::format("BEGINS_WITH operator cannot be applied to type {}", type_to_string(ck_cdef.type)));
}
return get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef.type);
}
default:
throw api_error::validation(format("Operator {} not supported for sort key", comp_definition));
}
}
// Calculates primary key bounds from KeyConditions
static std::pair<dht::partition_range_vector, std::vector<query::clustering_range>>
calculate_bounds_conditions(schema_ptr schema, const rjson::value& conditions) {
dht::partition_range_vector partition_ranges;
std::vector<query::clustering_range> ck_bounds;
for (auto it = conditions.MemberBegin(); it != conditions.MemberEnd(); ++it) {
sstring key = rjson::to_sstring(it->name);
const rjson::value& condition = it->value;
const rjson::value& comp_definition = rjson::get(condition, "ComparisonOperator");
const rjson::value& attr_list = rjson::get(condition, "AttributeValueList");
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ? &schema->clustering_key_columns().front() : nullptr;
if (key == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
partition_ranges.push_back(calculate_pk_bound(schema, pk_cdef, comp_definition, attr_list));
}
if (ck_cdef && key == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation("Currently only a single restriction per key is allowed");
}
ck_bounds.push_back(calculate_ck_bound(schema, *ck_cdef, comp_definition, attr_list));
}
}
// Validate that a query's conditions must be on the hash key, and
// optionally also on the sort key if it exists.
if (partition_ranges.empty()) {
throw api_error::validation(format("Query missing condition on hash key '{}'", schema->partition_key_columns().front().name_as_text()));
}
if (schema->clustering_key_size() == 0) {
if (conditions.MemberCount() != 1) {
throw api_error::validation("Only one condition allowed in table with only hash key");
}
} else {
if (conditions.MemberCount() == 2 && ck_bounds.empty()) {
throw api_error::validation(format("Query missing condition on sort key '{}'", schema->clustering_key_columns().front().name_as_text()));
} else if (conditions.MemberCount() > 2) {
throw api_error::validation("Only one or two conditions allowed in table with hash key and sort key");
}
}
if (ck_bounds.empty()) {
ck_bounds.push_back(query::clustering_range::make_open_ended_both_sides());
}
return {std::move(partition_ranges), std::move(ck_bounds)};
}
// Extract the top-level column name specified in a KeyConditionExpression.
// If a nested attribute path is given, a ValidationException is generated.
// If the column name is a #reference to ExpressionAttributeNames, the
// reference is resolved.
// Note this function returns a string_view, which may refer to data in the
// given parsed::value or expression_attribute_names.
static std::string_view get_toplevel(const parsed::value& v,
const rjson::value* expression_attribute_names,
std::unordered_set<std::string>& used_attribute_names)
{
const parsed::path& path = std::get<parsed::path>(v._value);
if (path.has_operators()) {
throw api_error::validation("KeyConditionExpression does not support nested attributes");
}
std::string_view column_name = path.root();
if (column_name.size() > 0 && column_name[0] == '#') {
used_attribute_names.emplace(column_name);
if (!expression_attribute_names) {
throw api_error::validation(
fmt::format("ExpressionAttributeNames missing, entry '{}' required by KeyConditionExpression",
column_name));
}
const rjson::value* value = rjson::find(*expression_attribute_names, column_name);
if (!value || !value->IsString()) {
throw api_error::validation(
fmt::format("ExpressionAttributeNames missing entry '{}' required by KeyConditionExpression",
column_name));
}
column_name = rjson::to_string_view(*value);
}
return column_name;
}
// Extract a constant value specified in a KeyConditionExpression.
// This constant was originally parsed as a reference (:name) to a member of
// ExpressionAttributeValues, but at this point, after resolve_value(), it
// was already converted into a JSON value.
// This function decodes the value (using its given expected type) into bytes
// which Scylla uses as the actual key value. If the value has the wrong type,
// or the input had other problems, a ValidationException is thrown.
static bytes get_constant_value(const parsed::value& v,
const column_definition& column)
{
const parsed::constant& constant = std::get<parsed::constant>(v._value);
const parsed::constant::literal& lit = std::get<parsed::constant::literal>(constant._value);
return get_key_from_typed_value(*lit, column);
}
// condition_expression_and_list extracts a list of ANDed primitive conditions
// from a condition_expression. This is useful for KeyConditionExpression,
// which may not use OR or NOT. If the given condition_expression does use
// OR or NOT, this function throws a ValidationException.
static void condition_expression_and_list(
const parsed::condition_expression& condition_expression,
std::vector<const parsed::primitive_condition*>& conditions)
{
if (condition_expression._negated) {
throw api_error::validation("KeyConditionExpression cannot use NOT");
}
std::visit(overloaded_functor {
[&] (const parsed::primitive_condition& cond) {
conditions.push_back(&cond);
},
[&] (const parsed::condition_expression::condition_list& list) {
if (list.op == '|' && list.conditions.size() > 1) {
throw api_error::validation("KeyConditionExpression cannot use OR");
}
for (const parsed::condition_expression& cond : list.conditions) {
condition_expression_and_list(cond, conditions);
}
}
}, condition_expression._expression);
}
// Calculates primary key bounds from KeyConditionExpression
static std::pair<dht::partition_range_vector, std::vector<query::clustering_range>>
calculate_bounds_condition_expression(schema_ptr schema,
const rjson::value& expression,
const rjson::value* expression_attribute_values,
std::unordered_set<std::string>& used_attribute_values,
const rjson::value* expression_attribute_names,
std::unordered_set<std::string>& used_attribute_names,
parsed::expression_cache& parsed_expression_cache)
{
if (!expression.IsString()) {
throw api_error::validation("KeyConditionExpression must be a string");
}
if (expression.GetStringLength() == 0) {
throw api_error::validation("KeyConditionExpression must not be empty");
}
// We parse the KeyConditionExpression with the same parser we use for
// ConditionExpression. But KeyConditionExpression only supports a subset
// of the ConditionExpression features, so we have many additional
// verifications below that the key condition is legal. Briefly, a valid
// key condition must contain a single partition key and a single
// sort-key range.
parsed::condition_expression p;
try {
p = parsed_expression_cache.parse_condition_expression(rjson::to_string_view(expression), "KeyConditionExpression");
} catch(expressions_syntax_error& e) {
throw api_error::validation(e.what());
}
resolve_condition_expression(p,
expression_attribute_names, expression_attribute_values,
used_attribute_names, used_attribute_values);
std::vector<const parsed::primitive_condition*> conditions;
condition_expression_and_list(p, conditions);
if (conditions.size() < 1 || conditions.size() > 2) {
throw api_error::validation(
"KeyConditionExpression syntax error: must have 1 or 2 conditions");
}
// Scylla allows us to have an (equality) constraint on the partition key
// pk_cdef, and a range constraint on the *first* clustering key ck_cdef.
// Note that this is also good enough for our GSI implementation - the
// GSI's user-specified sort key will be the first clustering key.
// FIXME: In the case described in issue #5320 (base and GSI both have
// just hash key - but different ones), this may allow the user to Query
// using the base key which isn't officially part of the GSI.
const column_definition& pk_cdef = schema->partition_key_columns().front();
const column_definition* ck_cdef = schema->clustering_key_size() > 0 ?
&schema->clustering_key_columns().front() : nullptr;
dht::partition_range_vector partition_ranges;
std::vector<query::clustering_range> ck_bounds;
for (const parsed::primitive_condition* condp : conditions) {
const parsed::primitive_condition& cond = *condp;
// In all comparison operators, one operand must be a column name,
// the other is a constant (value reference). We remember which is
// which in toplevel_ind, and also the column name in key (not just
// for comparison operators).
std::string_view key;
int toplevel_ind;
switch (cond._values.size()) {
case 1: {
// The only legal single-value condition is a begin_with() function,
// and it must have two parameters - a top-level attribute and a
// value reference..
const parsed::value::function_call *f = std::get_if<parsed::value::function_call>(&cond._values[0]._value);
if (!f) {
throw api_error::validation("KeyConditionExpression cannot be just a value");
}
if (f->_function_name != "begins_with") {
throw api_error::validation(
fmt::format("KeyConditionExpression function '{}' not supported",f->_function_name));
}
if (f->_parameters.size() != 2 || !f->_parameters[0].is_path() ||
!f->_parameters[1].is_constant()) {
throw api_error::validation(
"KeyConditionExpression begins_with() takes attribute and value");
}
key = get_toplevel(f->_parameters[0], expression_attribute_names, used_attribute_names);
toplevel_ind = -1;
break;
}
case 2:
if (cond._values[0].is_path() && cond._values[1].is_constant()) {
toplevel_ind = 0;
} else if (cond._values[1].is_path() && cond._values[0].is_constant()) {
toplevel_ind = 1;
} else {
throw api_error::validation("KeyConditionExpression must compare attribute with constant");
}
key = get_toplevel(cond._values[toplevel_ind], expression_attribute_names, used_attribute_names);
break;
case 3:
// Only BETWEEN has three operands. First must be a column name,
// two other must be value references (constants):
if (cond._op != parsed::primitive_condition::type::BETWEEN) {
// Shouldn't happen unless we have a bug in the parser
throw std::logic_error(format("Wrong number of values {} in primitive_condition", cond._values.size()));
}
if (cond._values[0].is_path() && cond._values[1].is_constant() && cond._values[2].is_constant()) {
toplevel_ind = 0;
key = get_toplevel(cond._values[0], expression_attribute_names, used_attribute_names);
} else {
throw api_error::validation("KeyConditionExpression must compare attribute with constants");
}
break;
default:
// Shouldn't happen unless we have a bug in the parser
throw std::logic_error(format("Wrong number of values {} in primitive_condition", cond._values.size()));
}
if (cond._op == parsed::primitive_condition::type::IN) {
throw api_error::validation("KeyConditionExpression does not support IN operator");
} else if (cond._op == parsed::primitive_condition::type::NE) {
throw api_error::validation("KeyConditionExpression does not support NE operator");
} else if (cond._op == parsed::primitive_condition::type::EQ) {
// the EQ operator (=) is the only one which can be used for both
// the partition key and sort key:
if (sstring(key) == pk_cdef.name_as_text()) {
if (!partition_ranges.empty()) {
throw api_error::validation(
"KeyConditionExpression allows only one condition for each key");
}
bytes raw_value = get_constant_value(cond._values[!toplevel_ind], pk_cdef);
partition_key pk = partition_key::from_singular_bytes(*schema, std::move(raw_value));
auto decorated_key = dht::decorate_key(*schema, pk);
partition_ranges.push_back(dht::partition_range(decorated_key));
} else if (ck_cdef && sstring(key) == ck_cdef->name_as_text()) {
if (!ck_bounds.empty()) {
throw api_error::validation(
"KeyConditionExpression allows only one condition for each key");
}
bytes raw_value = get_constant_value(cond._values[!toplevel_ind], *ck_cdef);
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
ck_bounds.push_back(query::clustering_range(ck));
} else {
throw api_error::validation(
fmt::format("KeyConditionExpression condition on non-key attribute {}", key));
}
continue;
}
// If we're still here, it's any other operator besides EQ, and these
// are allowed *only* on the clustering key:
if (sstring(key) == pk_cdef.name_as_text()) {
throw api_error::validation(
fmt::format("KeyConditionExpression only '=' condition is supported on partition key {}", key));
} else if (!ck_cdef || sstring(key) != ck_cdef->name_as_text()) {
throw api_error::validation(
fmt::format("KeyConditionExpression condition on non-key attribute {}", key));
}
if (!ck_bounds.empty()) {
throw api_error::validation(
"KeyConditionExpression allows only one condition for each key");
}
if (cond._op == parsed::primitive_condition::type::BETWEEN) {
clustering_key ck1 = clustering_key::from_single_value(*schema,
get_constant_value(cond._values[1], *ck_cdef));
clustering_key ck2 = clustering_key::from_single_value(*schema,
get_constant_value(cond._values[2], *ck_cdef));
ck_bounds.push_back(query::clustering_range::make(
query::clustering_range::bound(ck1), query::clustering_range::bound(ck2)));
continue;
} else if (cond._values.size() == 1) {
// We already verified above, that this case this can only be a
// function call to begins_with(), with the first parameter the
// key, the second the value reference.
bytes raw_value = get_constant_value(
std::get<parsed::value::function_call>(cond._values[0]._value)._parameters[1], *ck_cdef);
if (!ck_cdef->type->is_compatible_with(*utf8_type)) {
// begins_with() supported on bytes and strings (both stored
// in the database as strings) but not on numbers.
throw api_error::validation(
fmt::format("KeyConditionExpression begins_with() not supported on type {}",
type_to_string(ck_cdef->type)));
} else if (raw_value.empty()) {
ck_bounds.push_back(query::clustering_range::make_open_ended_both_sides());
} else {
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
ck_bounds.push_back(get_clustering_range_for_begins_with(std::move(raw_value), ck, schema, ck_cdef->type));
}
continue;
}
// All remaining operator have one value reference parameter in index
// !toplevel_ind. Note how toplevel_ind==1 reverses the direction of
// an inequality.
bytes raw_value = get_constant_value(cond._values[!toplevel_ind], *ck_cdef);
clustering_key ck = clustering_key::from_single_value(*schema, raw_value);
if ((cond._op == parsed::primitive_condition::type::LT && toplevel_ind == 0) ||
(cond._op == parsed::primitive_condition::type::GT && toplevel_ind == 1)) {
ck_bounds.push_back(query::clustering_range::make_ending_with(query::clustering_range::bound(ck, false)));
} else if ((cond._op == parsed::primitive_condition::type::GT && toplevel_ind == 0) ||
(cond._op == parsed::primitive_condition::type::LT && toplevel_ind == 1)) {
ck_bounds.push_back(query::clustering_range::make_starting_with(query::clustering_range::bound(ck, false)));
} else if ((cond._op == parsed::primitive_condition::type::LE && toplevel_ind == 0) ||
(cond._op == parsed::primitive_condition::type::GE && toplevel_ind == 1)) {
ck_bounds.push_back(query::clustering_range::make_ending_with(query::clustering_range::bound(ck)));
} else if ((cond._op == parsed::primitive_condition::type::GE && toplevel_ind == 0) ||
(cond._op == parsed::primitive_condition::type::LE && toplevel_ind == 1)) {
ck_bounds.push_back(query::clustering_range::make_starting_with(query::clustering_range::bound(ck)));
}
}
if (partition_ranges.empty()) {
throw api_error::validation(
format("KeyConditionExpression requires a condition on partition key {}", pk_cdef.name_as_text()));
}
if (ck_bounds.empty()) {
ck_bounds.push_back(query::clustering_range::make_open_ended_both_sides());
}
return {std::move(partition_ranges), std::move(ck_bounds)};
}
static future<executor::request_return_type> query_vector(
service::storage_proxy& proxy,
vector_search::vector_store_client& vsc,
rjson::value request,
service::client_state& client_state,
tracing::trace_state_ptr trace_state,
service_permit permit,
bool enforce_authorization,
bool warn_authorization,
alternator::stats& stats,
parsed::expression_cache& parsed_expr_cache) {
schema_ptr base_schema = get_table(proxy, request);
get_stats_from_schema(proxy, *base_schema)->api_operations.query++;
tracing::add_alternator_table_name(trace_state, base_schema->cf_name());
// If vector search is requested, IndexName must be given and must
// refer to a vector index - not to a GSI or LSI.
const rjson::value* index_name_v = rjson::find(request, "IndexName");
if (!index_name_v || !index_name_v->IsString()) {
co_return api_error::validation(
"VectorSearch requires IndexName referring to a vector index");
}
std::string_view index_name = rjson::to_string_view(*index_name_v);
int dimensions = 0;
bool is_vector = false;
for (const index_metadata& im : base_schema->indices()) {
if (im.name() == index_name) {
const auto& opts = im.options();
// The only secondary index we expect to see is a vector index.
// We also expect it to have a valid "dimensions".
auto it = opts.find(db::index::secondary_index::custom_class_option_name);
if (it == opts.end() || it->second != "vector_index") {
on_internal_error(elogger, fmt::format("IndexName '{}' is a secondary index but not a vector index.", index_name));
}
it = opts.find("dimensions");
if (it != opts.end()) {
try {
dimensions = std::stoi(it->second);
} catch (std::logic_error&) {}
}
throwing_assert(dimensions > 0);
is_vector = true;
break;
}
}
if (!is_vector) {
co_return api_error::validation(
format("VectorSearch IndexName '{}' is not a vector index.", index_name));
}
// QueryVector is required inside VectorSearch.
const rjson::value* vector_search = rjson::find(request, "VectorSearch");
if (!vector_search || !vector_search->IsObject()) {
co_return api_error::validation(
"VectorSearch requires a VectorSearch parameter");
}
const rjson::value* query_vector = rjson::find(*vector_search, "QueryVector");
if (!query_vector || !query_vector->IsObject()) {
co_return api_error::validation(
"VectorSearch requires a QueryVector parameter");
}
// QueryVector should be is a DynamoDB value, which must be of type "L"
// (a list), containing only elements of type "N" (numbers). The number
// of these elements must be exactly the "dimensions" defined for this
// vector index. We'll now validate all these assumptions and parse
// all the numbers in the vector into an std::vector<float> query_vec -
// the type that ann() wants.
const rjson::value* qv_list = rjson::find(*query_vector, "L");
if (!qv_list || !qv_list->IsArray()) {
co_return api_error::validation(
"VectorSearch QueryVector must be a list of numbers");
}
const auto& arr = qv_list->GetArray();
if ((int)arr.Size() != dimensions) {
co_return api_error::validation(
format("VectorSearch QueryVector length {} does not match index Dimensions {}",
arr.Size(), dimensions));
}
std::vector<float> query_vec;
query_vec.reserve(arr.Size());
for (const rjson::value& elem : arr) {
if (!elem.IsObject()) {
co_return api_error::validation(
"VectorSearch QueryVector must contain only numbers");
}
const rjson::value* n_val = rjson::find(elem, "N");
if (!n_val || !n_val->IsString()) {
co_return api_error::validation(
"VectorSearch QueryVector must contain only numbers");
}
std::string_view num_str = rjson::to_string_view(*n_val);
float f;
auto [ptr, ec] = std::from_chars(num_str.data(), num_str.data() + num_str.size(), f);
if (ec != std::errc{} || ptr != num_str.data() + num_str.size() || !std::isfinite(f)) {
co_return api_error::validation(
format("VectorSearch QueryVector element '{}' is not a valid number", num_str));
}
query_vec.push_back(f);
}
// Limit is mandatory for vector search: it defines k, the number of
// nearest neighbors to return.
const rjson::value* limit_json = rjson::find(request, "Limit");
if (!limit_json || !limit_json->IsUint()) {
co_return api_error::validation("VectorSearch requires a positive integer Limit parameter");
}
uint32_t limit = limit_json->GetUint();
if (limit == 0) {
co_return api_error::validation("Limit must be greater than 0");
}
// Consistent reads are not supported for vector search, just like GSI.
if (get_read_consistency(request) != db::consistency_level::LOCAL_ONE) {
co_return api_error::validation(
"Consistent reads are not allowed on vector indexes");
}
// Pagination (ExclusiveStartKey) is not supported for vector search.
if (rjson::find(request, "ExclusiveStartKey")) {
co_return api_error::validation(
"VectorSearch does not support pagination (ExclusiveStartKey)");
}
// ScanIndexForward is not supported for vector search: the ordering of
// results is determined by vector distance, not by the sort key.
if (rjson::find(request, "ScanIndexForward")) {
co_return api_error::validation(
"VectorSearch does not support ScanIndexForward");
}
std::unordered_set<std::string> used_attribute_names;
std::unordered_set<std::string> used_attribute_values;
// Parse the Select parameter and determine which attributes to return.
// For a vector index, the default Select is ALL_ATTRIBUTES (full items).
// ALL_PROJECTED_ATTRIBUTES is significantly more efficient because it
// returns what the vector store returned without looking up additional
// base-table data. Currently only the primary key attributes are projected
// but in the future we'll implement projecting additional attributes into
// the vector index - these additional attributes will also be usable for
// filtering). COUNT returns only the count without items.
select_type select = parse_select(request, table_or_view_type::vector_index);
std::optional<alternator::attrs_to_get> attrs_to_get_opt;
if (select == select_type::projection) {
// ALL_PROJECTED_ATTRIBUTES for a vector index: return only key attributes.
alternator::attrs_to_get key_attrs;
for (const column_definition& cdef : base_schema->partition_key_columns()) {
attribute_path_map_add("Select", key_attrs, cdef.name_as_text());
}
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
attribute_path_map_add("Select", key_attrs, cdef.name_as_text());
}
attrs_to_get_opt = std::move(key_attrs);
} else {
attrs_to_get_opt = calculate_attrs_to_get(request, parsed_expr_cache, used_attribute_names, select);
}
// QueryFilter (the old-style API) is not supported for vector search Queries.
if (rjson::find(request, "QueryFilter")) {
co_return api_error::validation(
"VectorSearch does not support QueryFilter; use FilterExpression instead");
}
// FilterExpression: post-filter the vector search results by any attribute.
filter flt(parsed_expr_cache, request, filter::request_type::QUERY,
used_attribute_names, used_attribute_values);
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "Query");
const rjson::value* expression_attribute_values = rjson::find(request, "ExpressionAttributeValues");
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Query");
// Verify the user has SELECT permission on the base table, as we
// do for every type of read operation after validating the input
// parameters.
co_await verify_permission(enforce_authorization, warn_authorization,
client_state, base_schema, auth::permission::SELECT, stats);
// Query the vector store for the approximate nearest neighbors.
auto timeout = executor::default_timeout();
abort_on_expiry aoe(timeout);
rjson::value pre_filter = rjson::empty_object(); // TODO, implement
auto pkeys_result = co_await vsc.ann(
base_schema->ks_name(), std::string(index_name), base_schema,
std::move(query_vec), limit, pre_filter, aoe.abort_source());
if (!pkeys_result.has_value()) {
const sstring error_msg = std::visit(vector_search::error_visitor{}, pkeys_result.error());
co_return api_error::validation(error_msg);
}
const std::vector<vector_search::primary_key>& pkeys = pkeys_result.value();
// For SELECT=COUNT with no filter: skip fetching from the base table and
// just return the count of candidates returned by the vector store.
// If a filter is present, fall through to the base-table fetch to apply it.
if (select == select_type::count && !flt) {
rjson::value response = rjson::empty_object();
rjson::add(response, "Count", rjson::value(static_cast<int>(pkeys.size())));
rjson::add(response, "ScannedCount", rjson::value(static_cast<int>(pkeys.size())));
co_return rjson::print(std::move(response));
}
// For SELECT=ALL_PROJECTED_ATTRIBUTES with no filter: skip fetching from
// the base table and build items directly from the key columns returned by
// the vector store. If a filter is present, fall through to the base-table
// fetch to apply it.
if (select == select_type::projection && !flt) {
rjson::value items_json = rjson::empty_array();
for (const auto& pkey : pkeys) {
rjson::value item = rjson::empty_object();
std::vector<bytes> exploded_pk = pkey.partition.key().explode();
auto exploded_pk_it = exploded_pk.begin();
for (const column_definition& cdef : base_schema->partition_key_columns()) {
rjson::value key_val = rjson::empty_object();
rjson::add_with_string_name(key_val, type_to_string(cdef.type), json_key_column_value(*exploded_pk_it, cdef));
rjson::add_with_string_name(item, std::string_view(cdef.name_as_text()), std::move(key_val));
++exploded_pk_it;
}
if (base_schema->clustering_key_size() > 0) {
std::vector<bytes> exploded_ck = pkey.clustering.explode();
auto exploded_ck_it = exploded_ck.begin();
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
rjson::value key_val = rjson::empty_object();
rjson::add_with_string_name(key_val, type_to_string(cdef.type), json_key_column_value(*exploded_ck_it, cdef));
rjson::add_with_string_name(item, std::string_view(cdef.name_as_text()), std::move(key_val));
++exploded_ck_it;
}
}
rjson::push_back(items_json, std::move(item));
}
rjson::value response = rjson::empty_object();
rjson::add(response, "Count", rjson::value(static_cast<int>(items_json.Size())));
rjson::add(response, "ScannedCount", rjson::value(static_cast<int>(pkeys.size())));
rjson::add(response, "Items", std::move(items_json));
co_return rjson::print(std::move(response));
}
// TODO: For SELECT=SPECIFIC_ATTRIBUTES, if they are part of the projected
// attributes, we should use the above optimized code path - not fall through
// to the read from the base table as below as we need to do if the specific
// attributes contain non-projected columns.
// Fetch the matching items from the base table and build the response.
// When a filter is present, we always fetch the full item so that all
// attributes are available for filter evaluation, regardless of the
// projection required for the final response.
auto selection = cql3::selection::selection::wildcard(base_schema);
auto regular_columns = base_schema->regular_columns()
| std::views::transform(&column_definition::id)
| std::ranges::to<query::column_id_vector>();
auto attrs_to_get = ::make_shared<const std::optional<alternator::attrs_to_get>>(
flt ? std::nullopt : std::move(attrs_to_get_opt));
rjson::value items_json = rjson::empty_array();
int matched_count = 0;
// Query each primary key individually, in the order returned by the
// vector store, to preserve vector-distance ordering in the response.
// FIXME: do this more efficiently with a batched read that preserves ordering.
for (const auto& pkey : pkeys) {
std::vector<query::clustering_range> bounds{
base_schema->clustering_key_size() > 0
? query::clustering_range::make_singular(pkey.clustering)
: query::clustering_range::make_open_ended_both_sides()};
auto partition_slice = query::partition_slice(std::move(bounds), {},
regular_columns, selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(
base_schema->id(), base_schema->version(), partition_slice,
proxy.get_max_result_size(partition_slice),
query::tombstone_limit(proxy.get_tombstone_limit()));
service::storage_proxy::coordinator_query_result qr =
co_await proxy.query(base_schema, command,
{dht::partition_range(pkey.partition)},
db::consistency_level::LOCAL_ONE,
service::storage_proxy::coordinator_query_options(
timeout, permit, client_state, trace_state));
auto opt_item = describe_single_item(base_schema, partition_slice,
*selection, *qr.query_result, *attrs_to_get);
if (opt_item && (!flt || flt.check(*opt_item))) {
++matched_count;
if (select != select_type::count) {
if (select == select_type::projection) {
// A filter caused us to fall through here instead of
// taking the projection early-exit above. Reconstruct
// the key-only item from the full item we fetched.
rjson::value key_item = rjson::empty_object();
for (const column_definition& cdef : base_schema->partition_key_columns()) {
if (const rjson::value* v = rjson::find(*opt_item, cdef.name_as_text())) {
rjson::add_with_string_name(key_item, cdef.name_as_text(), rjson::copy(*v));
}
}
for (const column_definition& cdef : base_schema->clustering_key_columns()) {
if (const rjson::value* v = rjson::find(*opt_item, cdef.name_as_text())) {
rjson::add_with_string_name(key_item, cdef.name_as_text(), rjson::copy(*v));
}
}
rjson::push_back(items_json, std::move(key_item));
} else {
// When a filter caused us to fetch the full item, apply the
// requested projection (attrs_to_get_opt) before returning it.
// This mirrors describe_items_visitor::end_row() which removes
// extra filter attributes from the returned item.
if (flt && attrs_to_get_opt) {
for (const auto& [attr_name, subpath] : *attrs_to_get_opt) {
if (!subpath.has_value()) {
if (rjson::value* toplevel = rjson::find(*opt_item, attr_name)) {
if (!hierarchy_filter(*toplevel, subpath)) {
rjson::remove_member(*opt_item, attr_name);
}
}
}
}
std::vector<std::string> to_remove;
for (auto it = opt_item->MemberBegin(); it != opt_item->MemberEnd(); ++it) {
std::string key(it->name.GetString(), it->name.GetStringLength());
if (!attrs_to_get_opt->contains(key)) {
to_remove.push_back(std::move(key));
}
}
for (const auto& key : to_remove) {
rjson::remove_member(*opt_item, key);
}
}
rjson::push_back(items_json, std::move(*opt_item));
}
}
}
}
rjson::value response = rjson::empty_object();
if (select == select_type::count) {
rjson::add(response, "Count", rjson::value(matched_count));
} else {
rjson::add(response, "Count", rjson::value(static_cast<int>(items_json.Size())));
rjson::add(response, "Items", std::move(items_json));
}
rjson::add(response, "ScannedCount", rjson::value(static_cast<int>(pkeys.size())));
co_return rjson::print(std::move(response));
}
future<executor::request_return_type> executor::query(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr<audit::audit_info_alternator>& audit_info) {
_stats.api_operations.query++;
elogger.trace("Querying {}", request);
if (rjson::find(request, "VectorSearch")) {
// If vector search is requested, we have a separate code path.
// IndexName must be given and must refer to a vector index - not
// to a GSI or LSI as the code below assumes.
return query_vector(_proxy, _vsc, std::move(request), client_state, trace_state, std::move(permit),
_enforce_authorization, _warn_authorization, _stats, *_parsed_expression_cache);
}
auto [schema, table_type] = get_table_or_view(_proxy, request);
db::consistency_level cl = get_read_consistency(request);
maybe_audit(audit_info, audit::statement_category::QUERY, schema->ks_name(), schema->cf_name(), "Query", request, cl);
get_stats_from_schema(_proxy, *schema)->api_operations.query++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value* exclusive_start_key = rjson::find(request, "ExclusiveStartKey");
if (table_type == table_or_view_type::gsi && cl != db::consistency_level::LOCAL_ONE) {
return make_ready_future<request_return_type>(api_error::validation(
"Consistent reads are not allowed on global indexes (GSI)"));
}
rjson::value* limit_json = rjson::find(request, "Limit");
uint32_t limit = limit_json ? limit_json->GetUint64() : std::numeric_limits<uint32_t>::max();
if (limit <= 0) {
return make_ready_future<request_return_type>(api_error::validation("Limit must be greater than 0"));
}
const bool forward = get_bool_attribute(request, "ScanIndexForward", true);
rjson::value* key_conditions = rjson::find(request, "KeyConditions");
rjson::value* key_condition_expression = rjson::find(request, "KeyConditionExpression");
std::unordered_set<std::string> used_attribute_values;
std::unordered_set<std::string> used_attribute_names;
if (key_conditions && key_condition_expression) {
throw api_error::validation("Query does not allow both "
"KeyConditions and KeyConditionExpression to be given together");
} else if (!key_conditions && !key_condition_expression) {
throw api_error::validation("Query must have one of "
"KeyConditions or KeyConditionExpression");
}
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
const rjson::value* expression_attribute_values = rjson::find(request, "ExpressionAttributeValues");
// exactly one of key_conditions or key_condition_expression
auto [partition_ranges, ck_bounds] = key_conditions
? calculate_bounds_conditions(schema, *key_conditions)
: calculate_bounds_condition_expression(schema, *key_condition_expression,
expression_attribute_values,
used_attribute_values,
expression_attribute_names,
used_attribute_names, *_parsed_expression_cache);
filter filter(*_parsed_expression_cache, request, filter::request_type::QUERY,
used_attribute_names, used_attribute_values);
// A query is not allowed to filter on the partition key or the sort key.
for (const column_definition& cdef : schema->partition_key_columns()) { // just one
if (filter.filters_on(cdef.name_as_text())) {
return make_ready_future<request_return_type>(api_error::validation(
format("QueryFilter can only contain non-primary key attributes: Partition key attribute: {}", cdef.name_as_text())));
}
}
for (const column_definition& cdef : schema->clustering_key_columns()) {
if (filter.filters_on(cdef.name_as_text())) {
return make_ready_future<request_return_type>(api_error::validation(
format("QueryFilter can only contain non-primary key attributes: Sort key attribute: {}", cdef.name_as_text())));
}
// FIXME: this "break" can avoid listing some clustering key columns
// we added for GSIs just because they existed in the base table -
// but not in all cases. We still have issue #5320.
break;
}
select_type select = parse_select(request, table_type);
auto attrs_to_get = calculate_attrs_to_get(request, *_parsed_expression_cache, used_attribute_names, select);
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "Query");
verify_all_are_used(expression_attribute_values, used_attribute_values, "ExpressionAttributeValues", "Query");
query::partition_slice::option_set opts;
opts.set_if<query::partition_slice::option::reversed>(!forward);
return do_query(_proxy, schema, exclusive_start_key, std::move(partition_ranges), std::move(ck_bounds), std::move(attrs_to_get), limit, cl,
std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization);
}
// Converts a multi-row selection result to JSON compatible with DynamoDB.
// For each row, this method calls item_callback, which takes the size of
// the item as the parameter.
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
const query::partition_slice&& slice,
shared_ptr<cql3::selection::selection> selection,
foreign_ptr<lw_shared_ptr<query::result>> query_result,
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
noncopyable_function<void(uint64_t)> item_callback) {
cql3::selection::result_set_builder builder(*selection, gc_clock::now());
query::result_view::consume(*query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, *selection));
auto result_set = builder.build();
std::vector<rjson::value> ret;
for (auto& result_row : result_set->rows()) {
rjson::value item = rjson::empty_object();
uint64_t item_length_in_bytes = 0;
describe_single_item(*selection, result_row, *attrs_to_get, item, &item_length_in_bytes);
if (item_callback) {
item_callback(item_length_in_bytes);
}
ret.push_back(std::move(item));
co_await coroutine::maybe_yield();
}
co_return ret;
}
// describe_item() wraps the result of describe_single_item() by a map
// as needed by the GetItem request. It should not be used for other purposes,
// use describe_single_item() instead.
static rjson::value describe_item(schema_ptr schema,
const query::partition_slice& slice,
const cql3::selection::selection& selection,
const query::result& query_result,
const std::optional<attrs_to_get>& attrs_to_get,
consumed_capacity_counter& consumed_capacity,
uint64_t& metric) {
std::optional<rjson::value> opt_item = describe_single_item(std::move(schema), slice, selection, std::move(query_result), attrs_to_get, &consumed_capacity._total_bytes);
rjson::value item_descr = rjson::empty_object();
if (opt_item) {
rjson::add(item_descr, "Item", std::move(*opt_item));
}
consumed_capacity.add_consumed_capacity_to_response_if_needed(item_descr);
metric += consumed_capacity.get_half_units();
return item_descr;
}
future<executor::request_return_type> executor::get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr<audit::audit_info_alternator>& audit_info) {
_stats.api_operations.get_item++;
auto start_time = std::chrono::steady_clock::now();
elogger.trace("Getting item {}", request);
schema_ptr schema = get_table(_proxy, request);
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *schema);
per_table_stats->api_operations.get_item++;
tracing::add_alternator_table_name(trace_state, schema->cf_name());
rjson::value& query_key = request["Key"];
db::consistency_level cl = get_read_consistency(request);
maybe_audit(audit_info, audit::statement_category::QUERY, schema->ks_name(), schema->cf_name(), "GetItem", request, cl);
partition_key pk = pk_from_json(query_key, schema);
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
std::vector<query::clustering_range> bounds;
if (schema->clustering_key_size() == 0) {
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
} else {
clustering_key ck = ck_from_json(query_key, schema);
bounds.push_back(query::clustering_range::make_singular(std::move(ck)));
}
check_key(query_key, schema);
//TODO(sarna): It would be better to fetch only some attributes of the map, not all
auto regular_columns =
schema->regular_columns() | std::views::transform(&column_definition::id)
| std::ranges::to<query::column_id_vector>();
auto selection = cql3::selection::selection::wildcard(schema);
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(schema->id(), schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()));
std::unordered_set<std::string> used_attribute_names;
auto attrs_to_get = calculate_attrs_to_get(request, *_parsed_expression_cache, used_attribute_names);
const rjson::value* expression_attribute_names = rjson::find(request, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names, "ExpressionAttributeNames", "GetItem");
rcu_consumed_capacity_counter add_capacity(request, cl == db::consistency_level::LOCAL_QUORUM);
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, schema, auth::permission::SELECT, _stats);
service::storage_proxy::coordinator_query_result qr =
co_await _proxy.query(
schema, std::move(command), std::move(partition_ranges), cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), std::move(permit), client_state, trace_state));
per_table_stats->api_operations.get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
_stats.api_operations.get_item_latency.mark(std::chrono::steady_clock::now() - start_time);
uint64_t rcu_half_units = 0;
rjson::value res = describe_item(schema, partition_slice, *selection, *qr.query_result, std::move(attrs_to_get), add_capacity, rcu_half_units);
per_table_stats->rcu_half_units_total += rcu_half_units;
_stats.rcu_half_units_total += rcu_half_units;
// Update item size metrics only if we found an item.
if (qr.query_result->row_count().value_or(0) > 0) {
per_table_stats->operation_sizes.get_item_op_size_kb.add(bytes_to_kb_ceil(add_capacity._total_bytes));
}
co_return rjson::print(std::move(res));
}
future<executor::request_return_type> executor::batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr<audit::audit_info_alternator>& audit_info) {
// FIXME: In this implementation, an unbounded batch size can cause
// unbounded response JSON object to be buffered in memory, unbounded
// parallelism of the requests, and unbounded amount of non-preemptable
// work in the following loops. So we should limit the batch size, and/or
// the response size, as DynamoDB does.
_stats.api_operations.batch_get_item++;
rjson::value& request_items = request["RequestItems"];
auto start_time = std::chrono::steady_clock::now();
// We need to validate all the parameters before starting any asynchronous
// query, and fail the entire request on any parse error. So we parse all
// the input into our own vector "requests", each element a table_requests
// listing all the request aimed at a single table. For efficiency, inside
// each table_requests we further group together all reads going to the
// same partition, so we can later send them together.
bool should_add_rcu = rcu_consumed_capacity_counter::should_add_capacity(request);
struct table_requests {
schema_ptr schema;
db::consistency_level cl;
::shared_ptr<const std::optional<alternator::attrs_to_get>> attrs_to_get;
// clustering_keys keeps a sorted set of clustering keys. It must
// be sorted for the read below (see #10827). Additionally each
// clustering key is mapped to the original rjson::value "Key".
using clustering_keys = std::map<clustering_key, rjson::value*, clustering_key::less_compare>;
std::unordered_map<partition_key, clustering_keys, partition_key::hashing, partition_key::equality> requests;
table_requests(schema_ptr s)
: schema(std::move(s))
, requests(8, partition_key::hashing(*schema), partition_key::equality(*schema))
{}
void add(rjson::value& key) {
auto pk = pk_from_json(key, schema);
auto it = requests.find(pk);
if (it == requests.end()) {
it = requests.emplace(pk, clustering_key::less_compare(*schema)).first;
}
auto ck = ck_from_json(key, schema);
if (auto [_, inserted] = it->second.emplace(ck, &key); !inserted) {
throw api_error::validation("Provided list of item keys contains duplicates");
}
}
};
std::vector<table_requests> requests;
uint batch_size = 0;
for (auto it = request_items.MemberBegin(); it != request_items.MemberEnd(); ++it) {
table_requests rs(get_table_from_batch_request(_proxy, it));
tracing::add_alternator_table_name(trace_state, rs.schema->cf_name());
rs.cl = get_read_consistency(it->value);
std::unordered_set<std::string> used_attribute_names;
rs.attrs_to_get = ::make_shared<const std::optional<attrs_to_get>>(calculate_attrs_to_get(it->value, *_parsed_expression_cache, used_attribute_names));
const rjson::value* expression_attribute_names = rjson::find(it->value, "ExpressionAttributeNames");
verify_all_are_used(expression_attribute_names, used_attribute_names,"ExpressionAttributeNames", "GetItem");
auto& keys = (it->value)["Keys"];
for (rjson::value& key : keys.GetArray()) {
rs.add(key);
check_key(key, rs.schema);
}
batch_size += rs.requests.size();
requests.emplace_back(std::move(rs));
}
for (const table_requests& tr : requests) {
co_await verify_permission(_enforce_authorization, _warn_authorization, client_state, tr.schema, auth::permission::SELECT, _stats);
}
_stats.api_operations.batch_get_item_batch_total += batch_size;
_stats.api_operations.batch_get_item_histogram.add(batch_size);
// If we got here, all "requests" are valid, so let's start the
// requests for the different partitions all in parallel.
std::vector<future<std::vector<rjson::value>>> response_futures;
std::vector<uint64_t> consumed_rcu_half_units_per_table(requests.size());
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
bool is_quorum = rs.cl == db::consistency_level::LOCAL_QUORUM;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_histogram.add(rs.requests.size());
for (const auto& [pk, cks] : rs.requests) {
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*rs.schema, pk))};
std::vector<query::clustering_range> bounds;
if (rs.schema->clustering_key_size() == 0) {
bounds.push_back(query::clustering_range::make_open_ended_both_sides());
} else {
for (auto& ck : cks) {
bounds.push_back(query::clustering_range::make_singular(ck.first));
}
}
auto regular_columns =
rs.schema->regular_columns() | std::views::transform(&column_definition::id)
| std::ranges::to<query::column_id_vector>();
auto selection = cql3::selection::selection::wildcard(rs.schema);
auto partition_slice = query::partition_slice(std::move(bounds), {}, std::move(regular_columns), selection->get_query_options());
auto command = ::make_lw_shared<query::read_command>(rs.schema->id(), rs.schema->version(), partition_slice, _proxy.get_max_result_size(partition_slice),
query::tombstone_limit(_proxy.get_tombstone_limit()));
command->allow_limit = db::allow_per_partition_rate_limit::yes;
const auto item_callback = [is_quorum, per_table_stats, &rcus_per_table = consumed_rcu_half_units_per_table[i]](uint64_t size) {
rcus_per_table += rcu_consumed_capacity_counter::get_half_units(size, is_quorum);
// Update item size only if the item exists.
if (size > 0) {
per_table_stats->operation_sizes.batch_get_item_op_size_kb.add(bytes_to_kb_ceil(size));
}
};
future<std::vector<rjson::value>> f = _proxy.query(rs.schema, std::move(command), std::move(partition_ranges), rs.cl,
service::storage_proxy::coordinator_query_options(executor::default_timeout(), permit, client_state, trace_state)).then(
[schema = rs.schema, partition_slice = std::move(partition_slice), selection = std::move(selection), attrs_to_get = rs.attrs_to_get, item_callback = std::move(item_callback)] (service::storage_proxy::coordinator_query_result qr) mutable {
utils::get_local_injector().inject("alternator_batch_get_item", [] { throw std::runtime_error("batch_get_item injection"); });
return describe_multi_item(std::move(schema), std::move(partition_slice), std::move(selection), std::move(qr.query_result), std::move(attrs_to_get), std::move(item_callback));
});
response_futures.push_back(std::move(f));
}
}
// Wait for all requests to complete, and then return the response.
// In case of full failure (no reads succeeded), an arbitrary error
// from one of the operations will be returned.
bool some_succeeded = false;
std::exception_ptr eptr;
std::set<sstring> table_names; // for auditing
// FIXME: will_log() here doesn't pass keyspace/table, so keyspace-level audit
// filtering is bypassed — a batch spanning multiple tables is audited as a whole.
bool should_audit = _audit.local_is_initialized() && _audit.local().will_log(audit::statement_category::QUERY);
rjson::value response = rjson::empty_object();
rjson::add(response, "Responses", rjson::empty_object());
rjson::add(response, "UnprocessedKeys", rjson::empty_object());
auto fut_it = response_futures.begin();
rjson::value consumed_capacity = rjson::empty_array();
for (size_t i = 0; i < requests.size(); i++) {
const table_requests& rs = requests[i];
std::string table = rs.schema->cf_name();
if (should_audit) {
table_names.insert(table);
}
for (const auto& [_, cks] : rs.requests) {
auto& fut = *fut_it;
++fut_it;
try {
std::vector<rjson::value> results = co_await std::move(fut);
some_succeeded = true;
if (!response["Responses"].HasMember(table)) {
rjson::add_with_string_name(response["Responses"], table, rjson::empty_array());
}
for (rjson::value& json : results) {
rjson::push_back(response["Responses"][table], std::move(json));
}
} catch(...) {
eptr = std::current_exception();
// This read of potentially several rows in one partition,
// failed. We need to add the row key(s) to UnprocessedKeys.
if (!response["UnprocessedKeys"].HasMember(table)) {
// Add the table's entry in UnprocessedKeys. Need to copy
// all the table's parameters from the request except the
// Keys field, which we start empty and then build below.
rjson::add_with_string_name(response["UnprocessedKeys"], table, rjson::empty_object());
rjson::value& unprocessed_item = response["UnprocessedKeys"][table];
rjson::value& request_item = request_items[table];
for (auto it = request_item.MemberBegin(); it != request_item.MemberEnd(); ++it) {
if (it->name != "Keys") {
rjson::add_with_string_name(unprocessed_item,
rjson::to_string_view(it->name), rjson::copy(it->value));
}
}
rjson::add_with_string_name(unprocessed_item, "Keys", rjson::empty_array());
}
for (auto& ck : cks) {
rjson::push_back(response["UnprocessedKeys"][table]["Keys"], std::move(*ck.second));
}
}
}
uint64_t rcu_half_units = consumed_rcu_half_units_per_table[i];
_stats.rcu_half_units_total += rcu_half_units;
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->rcu_half_units_total += rcu_half_units;
if (should_add_rcu) {
rjson::value entry = rjson::empty_object();
rjson::add(entry, "TableName", table);
rjson::add(entry, "CapacityUnits", rcu_half_units*0.5);
rjson::push_back(consumed_capacity, std::move(entry));
}
}
if (should_add_rcu) {
rjson::add(response, "ConsumedCapacity", std::move(consumed_capacity));
}
elogger.trace("Unprocessed keys: {}", response["UnprocessedKeys"]);
// NOTE: Each table in the batch has its own CL (set by get_read_consistency()),
// but the audit entry records a single CL for the whole batch. We use ANY as a
// placeholder to indicate "mixed / not applicable".
// FIXME: Auditing is executed only for a complete success
maybe_audit(audit_info, audit::statement_category::QUERY, "",
print_names_for_audit(table_names), "BatchGetItem", request, db::consistency_level::ANY);
if (!some_succeeded && eptr) {
co_await coroutine::return_exception_ptr(std::move(eptr));
}
auto duration = std::chrono::steady_clock::now() - start_time;
_stats.api_operations.batch_get_item_latency.mark(duration);
for (const table_requests& rs : requests) {
lw_shared_ptr<stats> per_table_stats = get_stats_from_schema(_proxy, *rs.schema);
per_table_stats->api_operations.batch_get_item_latency.mark(duration);
}
if (is_big(response)) {
co_return make_streamed(std::move(response));
} else {
co_return rjson::print(std::move(response));
}
}
} // namespace alternator