mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-23 01:50:35 +00:00
Merge 'Alternator: Add vector search support' from Nadav Har'El
This series adds support for vector search in Alternator based on the existing implementation in CQL. The series adds APIs for `CreateTable` and `UpdateTable` to add or remove vector indexes to Alternator tables, `DescribeTable` to list them and check the indexing status, and `Query` to perform a vector search - which contacts the vector store for the actual ANN (approximate nearest neighbor) search. Correct functionality of these features depend on some features of the the vector store, that were already done (see https://github.com/scylladb/vector-store/pull/394). This initial implementation is fully functional, and can already be useful, but we do not yet support all the features we hope to eventually support. Here are things that we have **not** done yet, and plan to do later in follow-up pull requests: 1. Support a new optimized vector type ("V") - in addition to the "list of numbers" type supported in this version. 2. Allow choosing a different similarity function when creating an index, by SimilarityFunction in VectorIndex definition. 3. Allow choosing quantization (f32/f16/bf16/i8/b1) to ask the vector index to compress stored vectors. 4. Support oversampling and rescoring, defined per-index and per-query. 5. Support HNSW tuning parameters — maximum_node_connections, construction_beam_width, search_beam_width. 6. Support pre-filtering over key columns, which are available at the vector store, by sending the filter to the vector store (translated from DynamoDB filter syntax to the vector's store's filter syntax). A decision still need to be made if this will use KeyConditionExpression or FilterExpression. This version supports only post-filtering (with `FilterExpression`). 7. Support projecting non-key attributes into the index (Projection=INCLUDE and Projection=ALL), and then 1. pre-filtering using these attributes, and 2. efficiently return these attributes (using Select=ALL_PROJECTED_ATTRIBUTES, which today returns just the key columns). 8. Optimize the performance of `Query`, which today is inefficient for Select=ALL_ATTRIBUTES because it serially retrieves the matching items one at a time. 9. Returning the similarity scores with the items (the design proposes ReturnVectorSearchSimilarity). 10. Add more vector-search-specific metrics, beyond the metric we already have counting Query requests. For example separate latency and request-count metrics for vector-search Queries (distinct from GSI/LSI queries), and a metric accumulating the total Limit (K) across all vector search queries. 11. Consider how (and if at all) we want to run the tests in test/alternator/test_vector.py that need the vector store in the CI. Currently they are skipped in CI and only run manually (with `test/alternator/run --vs test_vector`). 12. UpdateTable 'Update' operation to modify index parameters. Only some can be modified, e.g., Oversampling. 13. Support for "local index" (separate index for each partition). 14. Make sure that vector search and Streams can be enabled concurrently on the same table - both need CDC but we need to verify that one doesn't confuse the other or disables options that the other needs. We can only do this after we have Alternator Streams running on tablets (since vector store requires tablets). Testing the new Alternator vector search end-to-end requires running both Scylla and the vector store together. We will have such end-to-end tests in the vector store repository (see https://github.com/scylladb/vector-store/pull/392), but we also add in this pull request many end-to-end tests written in Python, that can be run with the command "test/alternator/run --vs test_vector.py". The "--vs" option tells the run script to run both Scylla and the vector store (currently assumed to be in `.../vector-store/target/release/vector-store`). About 65% of the tests in this pull request check supported syntax and error paths so can run without the vector store, while about 35% of the tests do perform actual Query operations and require the vector store to be running. Currently, the tests that do require the vector store will not get run by CI, but can be easily re-run manually with `test/alternator/run --vs test_vector.py`. In total, this series includes 78 functional tests in 2200 lines of Python code. This series also includes documentation for the new Alternator feature and the new APIs introduced. You can see a more detailed design document here: https://docs.google.com/document/d/1cxLI7n-AgV5hhH1DTyU_Es8_f-t8Acql-1f58eQjZLY/edit Two patches in this series split the huge alternator/executor.cc, after this series continued to grow it and it reached a whoppng 7,000 lines. These patches are just reorganization of code, no functional changes. But it's time that we finally do this (Refs #5783), we can't just continue to grow executor.cc with no end... Closes scylladb/scylladb#29046 * github.com:scylladb/scylladb: test/alternator: add option to "run" script to run with vector search alternator: document vector search test/alternator: fix retries in new_dynamodb_session test/alternator: test for allowed characters in attribute names test/alternator: tests for vector index support alternator, vector: add validation of non-finite numbers in Query alternator: Query: improve error message when VectorSearch is missing alternator: add per-table metrics for vector query alternator: clean up duplicated code alternator: fix default Select of Query alternator: split executor.cc even more alternator: split alternator/executor.cc alternator: validate vector index attribute values on write alternator: DescribeTable for vector index: add IndexStatus and Backfilling alternator: implement Query with a vector index alternator: fix bug in describe_multi_item() alternator: prevent adding GSI conflicting with a vector index alternator: implement UpdateTable with a vector index alternator: implement DescribeTable with a vector index alternator: implement CreateTable with a vector index alternator: reject empty attribute names cdc: fix on_pre_create_column_families to create CDC log for vector search
This commit is contained in:
@@ -9,6 +9,8 @@ target_sources(alternator
|
||||
controller.cc
|
||||
server.cc
|
||||
executor.cc
|
||||
executor_read.cc
|
||||
executor_util.cc
|
||||
stats.cc
|
||||
serialization.cc
|
||||
expressions.cc
|
||||
|
||||
253
alternator/attribute_path.hh
Normal file
253
alternator/attribute_path.hh
Normal file
@@ -0,0 +1,253 @@
|
||||
/*
|
||||
* Copyright 2019-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <variant>
|
||||
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/overloaded_functor.hh"
|
||||
#include "alternator/error.hh"
|
||||
#include "alternator/expressions_types.hh"
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// An attribute_path_map object is used to hold data for various attributes
|
||||
// paths (parsed::path) in a hierarchy of attribute paths. Each attribute path
|
||||
// has a root attribute, and then modified by member and index operators -
|
||||
// for example in "a.b[2].c" we have "a" as the root, then ".b" member, then
|
||||
// "[2]" index, and finally ".c" member.
|
||||
// Data can be added to an attribute_path_map using the add() function, but
|
||||
// requires that attributes with data not be *overlapping* or *conflicting*:
|
||||
//
|
||||
// 1. Two attribute paths which are identical or an ancestor of one another
|
||||
// are considered *overlapping* and not allowed. If a.b.c has data,
|
||||
// we can't add more data in a.b.c or any of its descendants like a.b.c.d.
|
||||
//
|
||||
// 2. Two attribute paths which need the same parent to have both a member and
|
||||
// an index are considered *conflicting* and not allowed. E.g., if a.b has
|
||||
// data, you can't add a[1]. The meaning of adding both would be that the
|
||||
// attribute a is both a map and an array, which isn't sensible.
|
||||
//
|
||||
// These two requirements are common to the two places where Alternator uses
|
||||
// this abstraction to describe how a hierarchical item is to be transformed:
|
||||
//
|
||||
// 1. In ProjectExpression: for filtering from a full top-level attribute
|
||||
// only the parts for which user asked in ProjectionExpression.
|
||||
//
|
||||
// 2. In UpdateExpression: for taking the previous value of a top-level
|
||||
// attribute, and modifying it based on the instructions in the user
|
||||
// wrote in UpdateExpression.
|
||||
|
||||
template<typename T>
|
||||
class attribute_path_map_node {
|
||||
public:
|
||||
using data_t = T;
|
||||
// We need the extra unique_ptr<> here because libstdc++ unordered_map
|
||||
// doesn't work with incomplete types :-(
|
||||
using members_t = std::unordered_map<std::string, std::unique_ptr<attribute_path_map_node<T>>>;
|
||||
// The indexes list is sorted because DynamoDB requires handling writes
|
||||
// beyond the end of a list in index order.
|
||||
using indexes_t = std::map<unsigned, std::unique_ptr<attribute_path_map_node<T>>>;
|
||||
// The prohibition on "overlap" and "conflict" explained above means
|
||||
// That only one of data, members or indexes is non-empty.
|
||||
std::optional<std::variant<data_t, members_t, indexes_t>> _content;
|
||||
|
||||
bool is_empty() const { return !_content; }
|
||||
bool has_value() const { return _content && std::holds_alternative<data_t>(*_content); }
|
||||
bool has_members() const { return _content && std::holds_alternative<members_t>(*_content); }
|
||||
bool has_indexes() const { return _content && std::holds_alternative<indexes_t>(*_content); }
|
||||
// get_members() assumes that has_members() is true
|
||||
members_t& get_members() { return std::get<members_t>(*_content); }
|
||||
const members_t& get_members() const { return std::get<members_t>(*_content); }
|
||||
indexes_t& get_indexes() { return std::get<indexes_t>(*_content); }
|
||||
const indexes_t& get_indexes() const { return std::get<indexes_t>(*_content); }
|
||||
T& get_value() { return std::get<T>(*_content); }
|
||||
const T& get_value() const { return std::get<T>(*_content); }
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using attribute_path_map = std::unordered_map<std::string, attribute_path_map_node<T>>;
|
||||
|
||||
using attrs_to_get_node = attribute_path_map_node<std::monostate>;
|
||||
// attrs_to_get lists which top-level attribute are needed, and possibly also
|
||||
// which part of the top-level attribute is really needed (when nested
|
||||
// attribute paths appeared in the query).
|
||||
// Most code actually uses optional<attrs_to_get>. There, a disengaged
|
||||
// optional means we should get all attributes, not specific ones.
|
||||
using attrs_to_get = attribute_path_map<std::monostate>;
|
||||
|
||||
// takes a given JSON value and drops its parts which weren't asked to be
|
||||
// kept. It modifies the given JSON value, or returns false to signify that
|
||||
// the entire object should be dropped.
|
||||
// Note that The JSON value is assumed to be encoded using the DynamoDB
|
||||
// conventions - i.e., it is really a map whose key has a type string,
|
||||
// and the value is the real object.
|
||||
template<typename T>
|
||||
bool hierarchy_filter(rjson::value& val, const attribute_path_map_node<T>& h) {
|
||||
if (!val.IsObject() || val.MemberCount() != 1) {
|
||||
// This shouldn't happen. We shouldn't have stored malformed objects.
|
||||
// But today Alternator does not validate the structure of nested
|
||||
// documents before storing them, so this can happen on read.
|
||||
throw api_error::internal(format("Malformed value object read: {}", val));
|
||||
}
|
||||
const char* type = val.MemberBegin()->name.GetString();
|
||||
rjson::value& v = val.MemberBegin()->value;
|
||||
if (h.has_members()) {
|
||||
const auto& members = h.get_members();
|
||||
if (type[0] != 'M' || !v.IsObject()) {
|
||||
// If v is not an object (dictionary, map), none of the members
|
||||
// can match.
|
||||
return false;
|
||||
}
|
||||
rjson::value newv = rjson::empty_object();
|
||||
for (auto it = v.MemberBegin(); it != v.MemberEnd(); ++it) {
|
||||
std::string attr = rjson::to_string(it->name);
|
||||
auto x = members.find(attr);
|
||||
if (x != members.end()) {
|
||||
if (x->second) {
|
||||
// Only a part of this attribute is to be filtered, do it.
|
||||
if (hierarchy_filter(it->value, *x->second)) {
|
||||
// because newv started empty and attr are unique
|
||||
// (keys of v), we can use add() here
|
||||
rjson::add_with_string_name(newv, attr, std::move(it->value));
|
||||
}
|
||||
} else {
|
||||
// The entire attribute is to be kept
|
||||
rjson::add_with_string_name(newv, attr, std::move(it->value));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newv.MemberCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
v = newv;
|
||||
} else if (h.has_indexes()) {
|
||||
const auto& indexes = h.get_indexes();
|
||||
if (type[0] != 'L' || !v.IsArray()) {
|
||||
return false;
|
||||
}
|
||||
rjson::value newv = rjson::empty_array();
|
||||
const auto& a = v.GetArray();
|
||||
for (unsigned i = 0; i < v.Size(); i++) {
|
||||
auto x = indexes.find(i);
|
||||
if (x != indexes.end()) {
|
||||
if (x->second) {
|
||||
if (hierarchy_filter(a[i], *x->second)) {
|
||||
rjson::push_back(newv, std::move(a[i]));
|
||||
}
|
||||
} else {
|
||||
// The entire attribute is to be kept
|
||||
rjson::push_back(newv, std::move(a[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newv.Size() == 0) {
|
||||
return false;
|
||||
}
|
||||
v = newv;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Add a path to an attribute_path_map. Throws a validation error if the path
|
||||
// "overlaps" with one already in the filter (one is a sub-path of the other)
|
||||
// or "conflicts" with it (both a member and index is requested).
|
||||
template<typename T>
|
||||
void attribute_path_map_add(const char* source, attribute_path_map<T>& map, const parsed::path& p, T value = {}) {
|
||||
using node = attribute_path_map_node<T>;
|
||||
// The first step is to look for the top-level attribute (p.root()):
|
||||
auto it = map.find(p.root());
|
||||
if (it == map.end()) {
|
||||
if (p.has_operators()) {
|
||||
it = map.emplace(p.root(), node {std::nullopt}).first;
|
||||
} else {
|
||||
(void) map.emplace(p.root(), node {std::move(value)}).first;
|
||||
// Value inserted for top-level node. We're done.
|
||||
return;
|
||||
}
|
||||
} else if(!p.has_operators()) {
|
||||
// If p is top-level and we already have it or a part of it
|
||||
// in map, it's a forbidden overlapping path.
|
||||
throw api_error::validation(fmt::format(
|
||||
"Invalid {}: two document paths overlap at {}", source, p.root()));
|
||||
} else if (it->second.has_value()) {
|
||||
// If we're here, it != map.end() && p.has_operators && it->second.has_value().
|
||||
// This means the top-level attribute already has a value, and we're
|
||||
// trying to add a non-top-level value. It's an overlap.
|
||||
throw api_error::validation(fmt::format("Invalid {}: two document paths overlap at {}", source, p.root()));
|
||||
}
|
||||
node* h = &it->second;
|
||||
// The second step is to walk h from the top-level node to the inner node
|
||||
// where we're supposed to insert the value:
|
||||
for (const auto& op : p.operators()) {
|
||||
std::visit(overloaded_functor {
|
||||
[&] (const std::string& member) {
|
||||
if (h->is_empty()) {
|
||||
*h = node {typename node::members_t()};
|
||||
} else if (h->has_indexes()) {
|
||||
throw api_error::validation(format("Invalid {}: two document paths conflict at {}", source, p));
|
||||
} else if (h->has_value()) {
|
||||
throw api_error::validation(format("Invalid {}: two document paths overlap at {}", source, p));
|
||||
}
|
||||
typename node::members_t& members = h->get_members();
|
||||
auto it = members.find(member);
|
||||
if (it == members.end()) {
|
||||
it = members.insert({member, std::make_unique<node>()}).first;
|
||||
}
|
||||
h = it->second.get();
|
||||
},
|
||||
[&] (unsigned index) {
|
||||
if (h->is_empty()) {
|
||||
*h = node {typename node::indexes_t()};
|
||||
} else if (h->has_members()) {
|
||||
throw api_error::validation(format("Invalid {}: two document paths conflict at {}", source, p));
|
||||
} else if (h->has_value()) {
|
||||
throw api_error::validation(format("Invalid {}: two document paths overlap at {}", source, p));
|
||||
}
|
||||
typename node::indexes_t& indexes = h->get_indexes();
|
||||
auto it = indexes.find(index);
|
||||
if (it == indexes.end()) {
|
||||
it = indexes.insert({index, std::make_unique<node>()}).first;
|
||||
}
|
||||
h = it->second.get();
|
||||
}
|
||||
}, op);
|
||||
}
|
||||
// Finally, insert the value in the node h.
|
||||
if (h->is_empty()) {
|
||||
*h = node {std::move(value)};
|
||||
} else {
|
||||
throw api_error::validation(format("Invalid {}: two document paths overlap at {}", source, p));
|
||||
}
|
||||
}
|
||||
|
||||
// A very simplified version of the above function for the special case of
|
||||
// adding only top-level attribute. It's not only simpler, we also use a
|
||||
// different error message, referring to a "duplicate attribute" instead of
|
||||
// "overlapping paths". DynamoDB also has this distinction (errors in
|
||||
// AttributesToGet refer to duplicates, not overlaps, but errors in
|
||||
// ProjectionExpression refer to overlap - even if it's an exact duplicate).
|
||||
template<typename T>
|
||||
void attribute_path_map_add(const char* source, attribute_path_map<T>& map, const std::string& attr, T value = {}) {
|
||||
using node = attribute_path_map_node<T>;
|
||||
auto it = map.find(attr);
|
||||
if (it == map.end()) {
|
||||
map.emplace(attr, node {std::move(value)});
|
||||
} else {
|
||||
throw api_error::validation(fmt::format(
|
||||
"Invalid {}: Duplicate attribute: {}", source, attr));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
@@ -18,6 +18,7 @@
|
||||
#include "service/memory_limiter.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
#include "vector_search/vector_store_client.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
@@ -35,6 +36,7 @@ controller::controller(
|
||||
sharded<service::memory_limiter>& memory_limiter,
|
||||
sharded<auth::service>& auth_service,
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
sharded<vector_search::vector_store_client>& vsc,
|
||||
const db::config& config,
|
||||
seastar::scheduling_group sg)
|
||||
: protocol_server(sg)
|
||||
@@ -47,6 +49,7 @@ controller::controller(
|
||||
, _memory_limiter(memory_limiter)
|
||||
, _auth_service(auth_service)
|
||||
, _sl_controller(sl_controller)
|
||||
, _vsc(vsc)
|
||||
, _config(config)
|
||||
{
|
||||
}
|
||||
@@ -92,7 +95,7 @@ future<> controller::start_server() {
|
||||
return cfg.alternator_timeout_in_ms;
|
||||
};
|
||||
_executor.start(std::ref(_gossiper), std::ref(_proxy), std::ref(_ss), std::ref(_mm), std::ref(_sys_dist_ks),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value(),
|
||||
sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), std::ref(_vsc), _ssg.value(),
|
||||
sharded_parameter(get_timeout_in_ms, std::ref(_config))).get();
|
||||
_server.start(std::ref(_executor), std::ref(_proxy), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sl_controller)).get();
|
||||
// Note: from this point on, if start_server() throws for any reason,
|
||||
|
||||
@@ -43,6 +43,10 @@ namespace qos {
|
||||
class service_level_controller;
|
||||
}
|
||||
|
||||
namespace vector_search {
|
||||
class vector_store_client;
|
||||
}
|
||||
|
||||
namespace alternator {
|
||||
|
||||
// This is the official DynamoDB API version.
|
||||
@@ -65,6 +69,7 @@ class controller : public protocol_server {
|
||||
sharded<service::memory_limiter>& _memory_limiter;
|
||||
sharded<auth::service>& _auth_service;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
sharded<vector_search::vector_store_client>& _vsc;
|
||||
const db::config& _config;
|
||||
|
||||
std::vector<socket_address> _listen_addresses;
|
||||
@@ -83,6 +88,7 @@ public:
|
||||
sharded<service::memory_limiter>& memory_limiter,
|
||||
sharded<auth::service>& auth_service,
|
||||
sharded<qos::service_level_controller>& sl_controller,
|
||||
sharded<vector_search::vector_store_client>& vsc,
|
||||
const db::config& config,
|
||||
seastar::scheduling_group sg);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -11,6 +11,7 @@
|
||||
#include <seastar/core/future.hh>
|
||||
#include "audit/audit.hh"
|
||||
#include "seastarx.hh"
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
@@ -21,13 +22,16 @@
|
||||
#include "db/config.hh"
|
||||
|
||||
#include "alternator/error.hh"
|
||||
#include "stats.hh"
|
||||
#include "alternator/attribute_path.hh"
|
||||
#include "alternator/stats.hh"
|
||||
#include "alternator/executor_util.hh"
|
||||
|
||||
#include "utils/rjson.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "utils/simple_value_with_expiry.hh"
|
||||
|
||||
#include "tracing/trace_state.hh"
|
||||
|
||||
|
||||
namespace db {
|
||||
class system_distributed_keyspace;
|
||||
}
|
||||
@@ -51,6 +55,10 @@ namespace service {
|
||||
class storage_service;
|
||||
}
|
||||
|
||||
namespace vector_search {
|
||||
class vector_store_client;
|
||||
}
|
||||
|
||||
namespace cdc {
|
||||
class metadata;
|
||||
}
|
||||
@@ -63,82 +71,13 @@ class gossiper;
|
||||
|
||||
class schema_builder;
|
||||
|
||||
|
||||
namespace alternator {
|
||||
|
||||
enum class table_status;
|
||||
class rmw_operation;
|
||||
class put_or_delete_item;
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension.
|
||||
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema);
|
||||
|
||||
// An attribute_path_map object is used to hold data for various attributes
|
||||
// paths (parsed::path) in a hierarchy of attribute paths. Each attribute path
|
||||
// has a root attribute, and then modified by member and index operators -
|
||||
// for example in "a.b[2].c" we have "a" as the root, then ".b" member, then
|
||||
// "[2]" index, and finally ".c" member.
|
||||
// Data can be added to an attribute_path_map using the add() function, but
|
||||
// requires that attributes with data not be *overlapping* or *conflicting*:
|
||||
//
|
||||
// 1. Two attribute paths which are identical or an ancestor of one another
|
||||
// are considered *overlapping* and not allowed. If a.b.c has data,
|
||||
// we can't add more data in a.b.c or any of its descendants like a.b.c.d.
|
||||
//
|
||||
// 2. Two attribute paths which need the same parent to have both a member and
|
||||
// an index are considered *conflicting* and not allowed. E.g., if a.b has
|
||||
// data, you can't add a[1]. The meaning of adding both would be that the
|
||||
// attribute a is both a map and an array, which isn't sensible.
|
||||
//
|
||||
// These two requirements are common to the two places where Alternator uses
|
||||
// this abstraction to describe how a hierarchical item is to be transformed:
|
||||
//
|
||||
// 1. In ProjectExpression: for filtering from a full top-level attribute
|
||||
// only the parts for which user asked in ProjectionExpression.
|
||||
//
|
||||
// 2. In UpdateExpression: for taking the previous value of a top-level
|
||||
// attribute, and modifying it based on the instructions in the user
|
||||
// wrote in UpdateExpression.
|
||||
|
||||
template<typename T>
|
||||
class attribute_path_map_node {
|
||||
public:
|
||||
using data_t = T;
|
||||
// We need the extra unique_ptr<> here because libstdc++ unordered_map
|
||||
// doesn't work with incomplete types :-(
|
||||
using members_t = std::unordered_map<std::string, std::unique_ptr<attribute_path_map_node<T>>>;
|
||||
// The indexes list is sorted because DynamoDB requires handling writes
|
||||
// beyond the end of a list in index order.
|
||||
using indexes_t = std::map<unsigned, std::unique_ptr<attribute_path_map_node<T>>>;
|
||||
// The prohibition on "overlap" and "conflict" explained above means
|
||||
// That only one of data, members or indexes is non-empty.
|
||||
std::optional<std::variant<data_t, members_t, indexes_t>> _content;
|
||||
|
||||
bool is_empty() const { return !_content; }
|
||||
bool has_value() const { return _content && std::holds_alternative<data_t>(*_content); }
|
||||
bool has_members() const { return _content && std::holds_alternative<members_t>(*_content); }
|
||||
bool has_indexes() const { return _content && std::holds_alternative<indexes_t>(*_content); }
|
||||
// get_members() assumes that has_members() is true
|
||||
members_t& get_members() { return std::get<members_t>(*_content); }
|
||||
const members_t& get_members() const { return std::get<members_t>(*_content); }
|
||||
indexes_t& get_indexes() { return std::get<indexes_t>(*_content); }
|
||||
const indexes_t& get_indexes() const { return std::get<indexes_t>(*_content); }
|
||||
T& get_value() { return std::get<T>(*_content); }
|
||||
const T& get_value() const { return std::get<T>(*_content); }
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using attribute_path_map = std::unordered_map<std::string, attribute_path_map_node<T>>;
|
||||
|
||||
using attrs_to_get_node = attribute_path_map_node<std::monostate>;
|
||||
// attrs_to_get lists which top-level attribute are needed, and possibly also
|
||||
// which part of the top-level attribute is really needed (when nested
|
||||
// attribute paths appeared in the query).
|
||||
// Most code actually uses optional<attrs_to_get>. There, a disengaged
|
||||
// optional means we should get all attributes, not specific ones.
|
||||
using attrs_to_get = attribute_path_map<std::monostate>;
|
||||
|
||||
namespace parsed {
|
||||
class expression_cache;
|
||||
}
|
||||
@@ -150,6 +89,7 @@ class executor : public peering_sharded_service<executor> {
|
||||
service::migration_manager& _mm;
|
||||
db::system_distributed_keyspace& _sdks;
|
||||
cdc::metadata& _cdc_metadata;
|
||||
vector_search::vector_store_client& _vsc;
|
||||
utils::updateable_value<bool> _enforce_authorization;
|
||||
utils::updateable_value<bool> _warn_authorization;
|
||||
seastar::sharded<audit::audit>& _audit;
|
||||
@@ -177,7 +117,6 @@ public:
|
||||
// is written in chunks to the output_stream. This allows for efficient
|
||||
// handling of large responses without needing to allocate a large buffer
|
||||
// in memory.
|
||||
using body_writer = noncopyable_function<future<>(output_stream<char>&&)>;
|
||||
using request_return_type = std::variant<std::string, body_writer, api_error>;
|
||||
stats _stats;
|
||||
// The metric_groups object holds this stat object's metrics registered
|
||||
@@ -193,6 +132,7 @@ public:
|
||||
service::migration_manager& mm,
|
||||
db::system_distributed_keyspace& sdks,
|
||||
cdc::metadata& cdc_metadata,
|
||||
vector_search::vector_store_client& vsc,
|
||||
smp_service_group ssg,
|
||||
utils::updateable_value<uint32_t> default_timeout_in_ms);
|
||||
~executor();
|
||||
@@ -225,15 +165,9 @@ public:
|
||||
future<> start();
|
||||
future<> stop();
|
||||
|
||||
static sstring table_name(const schema&);
|
||||
static db::timeout_clock::time_point default_timeout();
|
||||
private:
|
||||
static thread_local utils::updateable_value<uint32_t> s_default_timeout_in_ms;
|
||||
public:
|
||||
static schema_ptr find_table(service::storage_proxy&, std::string_view table_name);
|
||||
static schema_ptr find_table(service::storage_proxy&, const rjson::value& request);
|
||||
|
||||
private:
|
||||
friend class rmw_operation;
|
||||
|
||||
// Helper to set up auditing for an Alternator operation. Checks whether
|
||||
@@ -247,7 +181,6 @@ private:
|
||||
const rjson::value& request,
|
||||
std::optional<db::consistency_level> cl = std::nullopt);
|
||||
|
||||
static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string,std::string> * = nullptr, const std::map<sstring, sstring> *tags = nullptr);
|
||||
future<rjson::value> fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
future<executor::request_return_type> create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization,
|
||||
bool warn_authorization, const db::tablets_mode_t::mode tablets_mode, std::unique_ptr<audit::audit_info_alternator>& audit_info);
|
||||
@@ -263,62 +196,11 @@ private:
|
||||
tracing::trace_state_ptr trace_state, service_permit permit);
|
||||
|
||||
public:
|
||||
static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string,std::string>&, const std::map<sstring, sstring> *tags = nullptr);
|
||||
|
||||
static std::optional<rjson::value> describe_single_item(schema_ptr,
|
||||
const query::partition_slice&,
|
||||
const cql3::selection::selection&,
|
||||
const query::result&,
|
||||
const std::optional<attrs_to_get>&,
|
||||
uint64_t* = nullptr);
|
||||
|
||||
// Converts a multi-row selection result to JSON compatible with DynamoDB.
|
||||
// For each row, this method calls item_callback, which takes the size of
|
||||
// the item as the parameter.
|
||||
static future<std::vector<rjson::value>> describe_multi_item(schema_ptr schema,
|
||||
const query::partition_slice&& slice,
|
||||
shared_ptr<cql3::selection::selection> selection,
|
||||
foreign_ptr<lw_shared_ptr<query::result>> query_result,
|
||||
shared_ptr<const std::optional<attrs_to_get>> attrs_to_get,
|
||||
noncopyable_function<void(uint64_t)> item_callback = {});
|
||||
|
||||
static void describe_single_item(const cql3::selection::selection&,
|
||||
const std::vector<managed_bytes_opt>&,
|
||||
const std::optional<attrs_to_get>&,
|
||||
rjson::value&,
|
||||
uint64_t* item_length_in_bytes = nullptr,
|
||||
bool = false);
|
||||
|
||||
static bool add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp);
|
||||
static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp);
|
||||
static void supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp);
|
||||
};
|
||||
|
||||
// is_big() checks approximately if the given JSON value is "bigger" than
|
||||
// the given big_size number of bytes. The goal is to *quickly* detect
|
||||
// oversized JSON that, for example, is too large to be serialized to a
|
||||
// contiguous string - we don't need an accurate size for that. Moreover,
|
||||
// as soon as we detect that the JSON is indeed "big", we can return true
|
||||
// and don't need to continue calculating its exact size.
|
||||
// For simplicity, we use a recursive implementation. This is fine because
|
||||
// Alternator limits the depth of JSONs it reads from inputs, and doesn't
|
||||
// add more than a couple of levels in its own output construction.
|
||||
bool is_big(const rjson::value& val, int big_size = 100'000);
|
||||
|
||||
// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
||||
// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats);
|
||||
|
||||
/**
|
||||
* Make return type for serializing the object "streamed",
|
||||
* i.e. direct to HTTP output stream. Note: only useful for
|
||||
* (very) large objects as there are overhead issues with this
|
||||
* as well, but for massive lists of return objects this can
|
||||
* help avoid large allocations/many re-allocs
|
||||
*/
|
||||
executor::body_writer make_streamed(rjson::value&&);
|
||||
|
||||
// returns table creation time in seconds since epoch for `db_clock`
|
||||
double get_table_creation_time(const schema &schema);
|
||||
|
||||
@@ -344,5 +226,4 @@ arn_parts parse_arn(std::string_view arn, std::string_view arn_field_name, std::
|
||||
|
||||
// The format is ks1|ks2|ks3... and table1|table2|table3...
|
||||
sstring print_names_for_audit(const std::set<sstring>& names);
|
||||
|
||||
}
|
||||
|
||||
1957
alternator/executor_read.cc
Normal file
1957
alternator/executor_read.cc
Normal file
File diff suppressed because it is too large
Load Diff
559
alternator/executor_util.cc
Normal file
559
alternator/executor_util.cc
Normal file
@@ -0,0 +1,559 @@
|
||||
/*
|
||||
* Copyright 2019-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
#include "alternator/executor_util.hh"
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/error.hh"
|
||||
#include "auth/resource.hh"
|
||||
#include "auth/service.hh"
|
||||
#include "cdc/log.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "db/tags/utils.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "cql3/selection/selection.hh"
|
||||
#include "cql3/result_set.hh"
|
||||
#include "serialization.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "types/map.hh"
|
||||
#include <fmt/format.h>
|
||||
|
||||
namespace alternator {
|
||||
|
||||
extern logging::logger elogger; // from executor.cc
|
||||
|
||||
std::optional<int> get_int_attribute(const rjson::value& value, std::string_view attribute_name) {
|
||||
const rjson::value* attribute_value = rjson::find(value, attribute_name);
|
||||
if (!attribute_value)
|
||||
return {};
|
||||
if (!attribute_value->IsInt()) {
|
||||
throw api_error::validation(fmt::format("Expected integer value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return attribute_value->GetInt();
|
||||
}
|
||||
|
||||
std::string get_string_attribute(const rjson::value& value, std::string_view attribute_name, const char* default_return) {
|
||||
const rjson::value* attribute_value = rjson::find(value, attribute_name);
|
||||
if (!attribute_value)
|
||||
return default_return;
|
||||
if (!attribute_value->IsString()) {
|
||||
throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return rjson::to_string(*attribute_value);
|
||||
}
|
||||
|
||||
bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return) {
|
||||
const rjson::value* attribute_value = rjson::find(value, attribute_name);
|
||||
if (!attribute_value) {
|
||||
return default_return;
|
||||
}
|
||||
if (!attribute_value->IsBool()) {
|
||||
throw api_error::validation(fmt::format("Expected boolean value for attribute {}, got: {}",
|
||||
attribute_name, value));
|
||||
}
|
||||
return attribute_value->GetBool();
|
||||
}
|
||||
|
||||
std::optional<std::string> find_table_name(const rjson::value& request) {
|
||||
const rjson::value* table_name_value = rjson::find(request, "TableName");
|
||||
if (!table_name_value) {
|
||||
return std::nullopt;
|
||||
}
|
||||
if (!table_name_value->IsString()) {
|
||||
throw api_error::validation("Non-string TableName field in request");
|
||||
}
|
||||
std::string table_name = rjson::to_string(*table_name_value);
|
||||
return table_name;
|
||||
}
|
||||
|
||||
std::string get_table_name(const rjson::value& request) {
|
||||
auto name = find_table_name(request);
|
||||
if (!name) {
|
||||
throw api_error::validation("Missing TableName field in request");
|
||||
}
|
||||
return *name;
|
||||
}
|
||||
|
||||
schema_ptr find_table(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
auto table_name = find_table_name(request);
|
||||
if (!table_name) {
|
||||
return nullptr;
|
||||
}
|
||||
return find_table(proxy, *table_name);
|
||||
}
|
||||
|
||||
schema_ptr find_table(service::storage_proxy& proxy, std::string_view table_name) {
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + sstring(table_name), table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
// DynamoDB returns validation error even when table does not exist
|
||||
// and the table name is invalid.
|
||||
validate_table_name(table_name);
|
||||
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Table: {} not found", table_name));
|
||||
}
|
||||
}
|
||||
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request) {
|
||||
auto schema = find_table(proxy, request);
|
||||
if (!schema) {
|
||||
// if we get here then the name was missing, since syntax or missing actual CF
|
||||
// checks throw. Slow path, but just call get_table_name to generate exception.
|
||||
get_table_name(request);
|
||||
}
|
||||
return schema;
|
||||
}
|
||||
|
||||
map_type attrs_type() {
|
||||
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
|
||||
return t;
|
||||
}
|
||||
|
||||
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema) {
|
||||
auto tags_ptr = db::get_tags_of_table(schema);
|
||||
if (tags_ptr) {
|
||||
return *tags_ptr;
|
||||
} else {
|
||||
throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name()));
|
||||
}
|
||||
}
|
||||
|
||||
bool is_alternator_keyspace(std::string_view ks_name) {
|
||||
return ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX);
|
||||
}
|
||||
|
||||
// This tag is set on a GSI when the user did not specify a range key, causing
|
||||
// Alternator to add the base table's range key as a spurious range key. It is
|
||||
// used by describe_key_schema() to suppress reporting that key.
|
||||
extern const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY;
|
||||
|
||||
void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map<std::string, std::string>* attribute_types, const std::map<sstring, sstring>* tags) {
|
||||
rjson::value key_schema = rjson::empty_array();
|
||||
const bool ignore_range_keys_as_spurious = tags != nullptr && tags->contains(SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY);
|
||||
|
||||
for (const column_definition& cdef : schema.partition_key_columns()) {
|
||||
rjson::value key = rjson::empty_object();
|
||||
rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text()));
|
||||
rjson::add(key, "KeyType", "HASH");
|
||||
rjson::push_back(key_schema, std::move(key));
|
||||
if (attribute_types) {
|
||||
(*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type);
|
||||
}
|
||||
}
|
||||
if (!ignore_range_keys_as_spurious) {
|
||||
// NOTE: user requested key (there can be at most one) will always come first.
|
||||
// There might be more keys following it, which were added, but those were
|
||||
// not requested by the user, so we ignore them.
|
||||
for (const column_definition& cdef : schema.clustering_key_columns()) {
|
||||
rjson::value key = rjson::empty_object();
|
||||
rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text()));
|
||||
rjson::add(key, "KeyType", "RANGE");
|
||||
rjson::push_back(key_schema, std::move(key));
|
||||
if (attribute_types) {
|
||||
(*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
rjson::add(parent, "KeySchema", std::move(key_schema));
|
||||
}
|
||||
|
||||
// Check if the given string has valid characters for a table name, i.e. only
|
||||
// a-z, A-Z, 0-9, _ (underscore), - (dash), . (dot). Note that this function
|
||||
// does not check the length of the name - instead, use validate_table_name()
|
||||
// to validate both the characters and the length.
|
||||
static bool valid_table_name_chars(std::string_view name) {
|
||||
for (auto c : name) {
|
||||
if ((c < 'a' || c > 'z') &&
|
||||
(c < 'A' || c > 'Z') &&
|
||||
(c < '0' || c > '9') &&
|
||||
c != '_' &&
|
||||
c != '-' &&
|
||||
c != '.') {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string view_name(std::string_view table_name, std::string_view index_name, const std::string& delim, bool validate_len) {
|
||||
if (index_name.length() < 3) {
|
||||
throw api_error::validation("IndexName must be at least 3 characters long");
|
||||
}
|
||||
if (!valid_table_name_chars(index_name)) {
|
||||
throw api_error::validation(
|
||||
fmt::format("IndexName '{}' must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", index_name));
|
||||
}
|
||||
std::string ret = std::string(table_name) + delim + std::string(index_name);
|
||||
if (ret.length() > max_auxiliary_table_name_length && validate_len) {
|
||||
throw api_error::validation(
|
||||
fmt::format("The total length of TableName ('{}') and IndexName ('{}') cannot exceed {} characters",
|
||||
table_name, index_name, max_auxiliary_table_name_length - delim.size()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string gsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) {
|
||||
return view_name(table_name, index_name, ":", validate_len);
|
||||
}
|
||||
|
||||
std::string lsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) {
|
||||
return view_name(table_name, index_name, "!:", validate_len);
|
||||
}
|
||||
|
||||
void check_key(const rjson::value& key, const schema_ptr& schema) {
|
||||
if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) {
|
||||
throw api_error::validation("Given key attribute not in schema");
|
||||
}
|
||||
}
|
||||
|
||||
void verify_all_are_used(const rjson::value* field,
|
||||
const std::unordered_set<std::string>& used, const char* field_name, const char* operation) {
|
||||
if (!field) {
|
||||
return;
|
||||
}
|
||||
for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) {
|
||||
if (!used.contains(rjson::to_string(it->name))) {
|
||||
throw api_error::validation(
|
||||
format("{} has spurious '{}', not used in {}",
|
||||
field_name, rjson::to_string_view(it->name), operation));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function increments the authorization_failures counter, and may also
|
||||
// log a warn-level message and/or throw an access_denied exception, depending
|
||||
// on what enforce_authorization and warn_authorization are set to.
|
||||
// Note that if enforce_authorization is false, this function will return
|
||||
// without throwing. So a caller that doesn't want to continue after an
|
||||
// authorization_error must explicitly return after calling this function.
|
||||
static void authorization_error(stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) {
|
||||
stats.authorization_failures++;
|
||||
if (enforce_authorization) {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("alternator_warn_authorization=true: {}", msg);
|
||||
}
|
||||
throw api_error::access_denied(std::move(msg));
|
||||
} else {
|
||||
if (warn_authorization) {
|
||||
elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> verify_permission(
|
||||
bool enforce_authorization,
|
||||
bool warn_authorization,
|
||||
const service::client_state& client_state,
|
||||
const schema_ptr& schema,
|
||||
auth::permission permission_to_check,
|
||||
stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
// Unfortunately, the fix for issue #23218 did not modify the function
|
||||
// that we use here - check_has_permissions(). So if we want to allow
|
||||
// writes to internal tables (from try_get_internal_table()) only to a
|
||||
// superuser, we need to explicitly check it here.
|
||||
if (permission_to_check == auth::permission::MODIFY && is_internal_keyspace(schema->ks_name())) {
|
||||
if (!client_state.user() ||
|
||||
!client_state.user()->name ||
|
||||
!co_await client_state.get_auth_service()->underlying_role_manager().is_superuser(*client_state.user()->name)) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"Write access denied on internal table {}.{} to role {} because it is not a superuser",
|
||||
schema->ks_name(), schema->cf_name(), username));
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name());
|
||||
if (!client_state.user() || !client_state.user()->name ||
|
||||
!co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
// Using exceptions for errors makes this function faster in the
|
||||
// success path (when the operation is allowed).
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"{} access on table {}.{} is denied to role {}, client address {}",
|
||||
auth::permissions::to_string(permission_to_check),
|
||||
schema->ks_name(), schema->cf_name(), username, client_state.get_client_address()));
|
||||
}
|
||||
}
|
||||
|
||||
// Similar to verify_permission() above, but just for CREATE operations.
|
||||
// Those do not operate on any specific table, so require permissions on
|
||||
// ALL KEYSPACES instead of any specific table.
|
||||
future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, stats& stats) {
|
||||
if (!enforce_authorization && !warn_authorization) {
|
||||
co_return;
|
||||
}
|
||||
auto resource = auth::resource(auth::resource_kind::data);
|
||||
if (!co_await client_state.check_has_permission(auth::command_desc(auth::permission::CREATE, resource))) {
|
||||
sstring username = "<anonymous>";
|
||||
if (client_state.user() && client_state.user()->name) {
|
||||
username = client_state.user()->name.value();
|
||||
}
|
||||
authorization_error(stats, enforce_authorization, warn_authorization, fmt::format(
|
||||
"CREATE access on ALL KEYSPACES is denied to role {}", username));
|
||||
}
|
||||
}
|
||||
|
||||
schema_ptr try_get_internal_table(const data_dictionary::database& db, std::string_view table_name) {
|
||||
size_t it = table_name.find(executor::INTERNAL_TABLE_PREFIX);
|
||||
if (it != 0) {
|
||||
return schema_ptr{};
|
||||
}
|
||||
table_name.remove_prefix(executor::INTERNAL_TABLE_PREFIX.size());
|
||||
size_t delim = table_name.find_first_of('.');
|
||||
if (delim == std::string_view::npos) {
|
||||
return schema_ptr{};
|
||||
}
|
||||
std::string_view ks_name = table_name.substr(0, delim);
|
||||
table_name.remove_prefix(ks_name.size() + 1);
|
||||
// Only internal keyspaces can be accessed to avoid leakage
|
||||
auto ks = db.try_find_keyspace(ks_name);
|
||||
if (!ks || !ks->is_internal()) {
|
||||
return schema_ptr{};
|
||||
}
|
||||
try {
|
||||
return db.find_schema(ks_name, table_name);
|
||||
} catch (data_dictionary::no_such_column_family&) {
|
||||
// DynamoDB returns validation error even when table does not exist
|
||||
// and the table name is invalid.
|
||||
validate_table_name(table_name);
|
||||
throw api_error::resource_not_found(
|
||||
fmt::format("Requested resource not found: Internal table: {}.{} not found", ks_name, table_name));
|
||||
}
|
||||
}
|
||||
|
||||
schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) {
|
||||
sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings
|
||||
try {
|
||||
return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name);
|
||||
} catch(data_dictionary::no_such_column_family&) {
|
||||
// DynamoDB returns validation error even when table does not exist
|
||||
// and the table name is invalid.
|
||||
validate_table_name(table_name);
|
||||
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
|
||||
}
|
||||
}
|
||||
|
||||
lw_shared_ptr<stats> get_stats_from_schema(service::storage_proxy& sp, const schema& schema) {
|
||||
try {
|
||||
replica::table& table = sp.local_db().find_column_family(schema.id());
|
||||
if (!table.get_stats().alternator_stats) {
|
||||
table.get_stats().alternator_stats = seastar::make_shared<table_stats>(schema.ks_name(), schema.cf_name());
|
||||
}
|
||||
return table.get_stats().alternator_stats->_stats;
|
||||
} catch (std::runtime_error&) {
|
||||
// If we're here it means that a table we are currently working on was deleted before the
|
||||
// operation completed, returning a temporary object is fine, if the table get deleted so will its metrics
|
||||
return make_lw_shared<stats>();
|
||||
}
|
||||
}
|
||||
|
||||
void describe_single_item(const cql3::selection::selection& selection,
|
||||
const std::vector<managed_bytes_opt>& result_row,
|
||||
const std::optional<attrs_to_get>& attrs_to_get,
|
||||
rjson::value& item,
|
||||
uint64_t* item_length_in_bytes,
|
||||
bool include_all_embedded_attributes)
|
||||
{
|
||||
const auto& columns = selection.get_columns();
|
||||
auto column_it = columns.begin();
|
||||
for (const managed_bytes_opt& cell : result_row) {
|
||||
if (!cell) {
|
||||
++column_it;
|
||||
continue;
|
||||
}
|
||||
std::string column_name = (*column_it)->name_as_text();
|
||||
if (column_name != executor::ATTRS_COLUMN_NAME) {
|
||||
if (item_length_in_bytes) {
|
||||
(*item_length_in_bytes) += column_name.length() + cell->size();
|
||||
}
|
||||
if (!attrs_to_get || attrs_to_get->contains(column_name)) {
|
||||
// item is expected to start empty, and column_name are unique
|
||||
// so add() makes sense
|
||||
rjson::add_with_string_name(item, column_name, rjson::empty_object());
|
||||
rjson::value& field = item[column_name.c_str()];
|
||||
cell->with_linearized([&] (bytes_view linearized_cell) {
|
||||
rjson::add_with_string_name(field, type_to_string((*column_it)->type), json_key_column_value(linearized_cell, **column_it));
|
||||
});
|
||||
}
|
||||
} else {
|
||||
auto deserialized = attrs_type()->deserialize(*cell);
|
||||
auto keys_and_values = value_cast<map_type_impl::native_type>(deserialized);
|
||||
for (auto entry : keys_and_values) {
|
||||
std::string attr_name = value_cast<sstring>(entry.first);
|
||||
if (item_length_in_bytes) {
|
||||
(*item_length_in_bytes) += attr_name.length();
|
||||
}
|
||||
if (include_all_embedded_attributes || !attrs_to_get || attrs_to_get->contains(attr_name)) {
|
||||
bytes value = value_cast<bytes>(entry.second);
|
||||
if (item_length_in_bytes && value.length()) {
|
||||
// ScyllaDB uses one extra byte compared to DynamoDB for the bytes length
|
||||
(*item_length_in_bytes) += value.length() - 1;
|
||||
}
|
||||
rjson::value v = deserialize_item(value);
|
||||
if (attrs_to_get) {
|
||||
auto it = attrs_to_get->find(attr_name);
|
||||
if (it != attrs_to_get->end()) {
|
||||
// attrs_to_get may have asked for only part of
|
||||
// this attribute. hierarchy_filter() modifies v,
|
||||
// and returns false when nothing is to be kept.
|
||||
if (!hierarchy_filter(v, it->second)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// item is expected to start empty, and attribute
|
||||
// names are unique so add() makes sense
|
||||
rjson::add_with_string_name(item, attr_name, std::move(v));
|
||||
} else if (item_length_in_bytes) {
|
||||
(*item_length_in_bytes) += value_cast<bytes>(entry.second).length() - 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
++column_it;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<rjson::value> describe_single_item(schema_ptr schema,
|
||||
const query::partition_slice& slice,
|
||||
const cql3::selection::selection& selection,
|
||||
const query::result& query_result,
|
||||
const std::optional<attrs_to_get>& attrs_to_get,
|
||||
uint64_t* item_length_in_bytes) {
|
||||
rjson::value item = rjson::empty_object();
|
||||
|
||||
cql3::selection::result_set_builder builder(selection, gc_clock::now());
|
||||
query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection));
|
||||
|
||||
auto result_set = builder.build();
|
||||
if (result_set->empty()) {
|
||||
if (item_length_in_bytes) {
|
||||
// empty results is counted as having a minimal length (e.g. 1 byte).
|
||||
(*item_length_in_bytes) += 1;
|
||||
}
|
||||
// If there is no matching item, we're supposed to return an empty
|
||||
// object without an Item member - not one with an empty Item member
|
||||
return {};
|
||||
}
|
||||
if (result_set->size() > 1) {
|
||||
// If the result set contains multiple rows, the code should have
|
||||
// called describe_multi_item(), not this function.
|
||||
throw std::logic_error("describe_single_item() asked to describe multiple items");
|
||||
}
|
||||
describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item, item_length_in_bytes);
|
||||
return item;
|
||||
}
|
||||
|
||||
static void check_big_array(const rjson::value& val, int& size_left);
|
||||
static void check_big_object(const rjson::value& val, int& size_left);
|
||||
|
||||
// For simplicity, we use a recursive implementation. This is fine because
|
||||
// Alternator limits the depth of JSONs it reads from inputs, and doesn't
|
||||
// add more than a couple of levels in its own output construction.
|
||||
bool is_big(const rjson::value& val, int big_size) {
|
||||
if (val.IsString()) {
|
||||
return ssize_t(val.GetStringLength()) > big_size;
|
||||
} else if (val.IsObject()) {
|
||||
check_big_object(val, big_size);
|
||||
return big_size < 0;
|
||||
} else if (val.IsArray()) {
|
||||
check_big_array(val, big_size);
|
||||
return big_size < 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static void check_big_array(const rjson::value& val, int& size_left) {
|
||||
// Assume a fixed size of 10 bytes for each number, boolean, etc., or
|
||||
// beginning of a sub-object. This doesn't have to be accurate.
|
||||
size_left -= 10 * val.Size();
|
||||
for (const auto& v : val.GetArray()) {
|
||||
if (size_left < 0) {
|
||||
return;
|
||||
}
|
||||
// Note that we avoid recursive calls for the leaves (anything except
|
||||
// array or object) because usually those greatly outnumber the trunk.
|
||||
if (v.IsString()) {
|
||||
size_left -= v.GetStringLength();
|
||||
} else if (v.IsObject()) {
|
||||
check_big_object(v, size_left);
|
||||
} else if (v.IsArray()) {
|
||||
check_big_array(v, size_left);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void check_big_object(const rjson::value& val, int& size_left) {
|
||||
size_left -= 10 * val.MemberCount();
|
||||
for (const auto& m : val.GetObject()) {
|
||||
if (size_left < 0) {
|
||||
return;
|
||||
}
|
||||
size_left -= m.name.GetStringLength();
|
||||
if (m.value.IsString()) {
|
||||
size_left -= m.value.GetStringLength();
|
||||
} else if (m.value.IsObject()) {
|
||||
check_big_object(m.value, size_left);
|
||||
} else if (m.value.IsArray()) {
|
||||
check_big_array(m.value, size_left);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void validate_table_name(std::string_view name, const char* source) {
|
||||
if (name.length() < 3 || name.length() > max_table_name_length) {
|
||||
throw api_error::validation(
|
||||
format("{} must be at least 3 characters long and at most {} characters long", source, max_table_name_length));
|
||||
}
|
||||
if (!valid_table_name_chars(name)) {
|
||||
throw api_error::validation(
|
||||
format("{} must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", source));
|
||||
}
|
||||
}
|
||||
|
||||
void validate_cdc_log_name_length(std::string_view table_name) {
|
||||
if (cdc::log_name(table_name).length() > max_auxiliary_table_name_length) {
|
||||
// CDC will add cdc_log_suffix ("_scylla_cdc_log") to the table name
|
||||
// to create its log table, and this will exceed the maximum allowed
|
||||
// length. To provide a more helpful error message, we assume that
|
||||
// cdc::log_name() always adds a suffix of the same length.
|
||||
int suffix_len = cdc::log_name(table_name).length() - table_name.length();
|
||||
throw api_error::validation(fmt::format("Streams or vector search cannot be enabled on a table whose name is longer than {} characters: {}",
|
||||
max_auxiliary_table_name_length - suffix_len, table_name));
|
||||
}
|
||||
}
|
||||
|
||||
body_writer make_streamed(rjson::value&& value) {
|
||||
return [value = std::move(value)](output_stream<char>&& _out) mutable -> future<> {
|
||||
auto out = std::move(_out);
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
co_await rjson::print(value, out);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
co_await out.close();
|
||||
co_await rjson::destroy_gently(std::move(value));
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace alternator
|
||||
247
alternator/executor_util.hh
Normal file
247
alternator/executor_util.hh
Normal file
@@ -0,0 +1,247 @@
|
||||
/*
|
||||
* Copyright 2019-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
*/
|
||||
|
||||
// This header file, and the implementation file executor_util.cc, contain
|
||||
// various utility functions that are reused in many different operations
|
||||
// (API requests) across Alternator's code - in files such as executor.cc,
|
||||
// executor_read.cc, streams.cc, ttl.cc, and more. These utility functions
|
||||
// include things like extracting and validating pieces from a JSON request,
|
||||
// checking permissions, constructing auxiliary table names, and more.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/util/noncopyable_function.hh>
|
||||
|
||||
#include "utils/rjson.hh"
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "types/types.hh"
|
||||
#include "auth/permission.hh"
|
||||
#include "alternator/stats.hh"
|
||||
#include "alternator/attribute_path.hh"
|
||||
#include "utils/managed_bytes.hh"
|
||||
|
||||
namespace query { class partition_slice; class result; }
|
||||
namespace cql3::selection { class selection; }
|
||||
namespace data_dictionary { class database; }
|
||||
namespace service { class storage_proxy; class client_state; }
|
||||
|
||||
namespace alternator {
|
||||
|
||||
/// The body_writer is used for streaming responses - where the response body
|
||||
/// is written in chunks to the output_stream. This allows for efficient
|
||||
/// handling of large responses without needing to allocate a large buffer in
|
||||
/// memory. It is one of the variants of executor::request_return_type.
|
||||
using body_writer = noncopyable_function<future<>(output_stream<char>&&)>;
|
||||
|
||||
/// Get the value of an integer attribute, or an empty optional if it is
|
||||
/// missing. If the attribute exists, but is not an integer, a descriptive
|
||||
/// api_error is thrown.
|
||||
std::optional<int> get_int_attribute(const rjson::value& value, std::string_view attribute_name);
|
||||
|
||||
/// Get the value of a string attribute, or a default value if it is missing.
|
||||
/// If the attribute exists, but is not a string, a descriptive api_error is
|
||||
/// thrown.
|
||||
std::string get_string_attribute(const rjson::value& value, std::string_view attribute_name, const char* default_return);
|
||||
|
||||
/// Get the value of a boolean attribute, or a default value if it is missing.
|
||||
/// If the attribute exists, but is not a bool, a descriptive api_error is
|
||||
/// thrown.
|
||||
bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return);
|
||||
|
||||
/// Extract table name from a request.
|
||||
/// Most requests expect the table's name to be listed in a "TableName" field.
|
||||
/// get_table_name() returns the name or api_error in case the table name is
|
||||
/// missing or not a string.
|
||||
std::string get_table_name(const rjson::value& request);
|
||||
|
||||
/// find_table_name() is like get_table_name() except that it returns an
|
||||
/// optional table name - it returns an empty optional when the TableName
|
||||
/// is missing from the request, instead of throwing as get_table_name()
|
||||
/// does. However, find_table_name() still throws if a TableName exists but
|
||||
/// is not a string.
|
||||
std::optional<std::string> find_table_name(const rjson::value& request);
|
||||
|
||||
/// Extract table schema from a request.
|
||||
/// Many requests expect the table's name to be listed in a "TableName" field
|
||||
/// and need to look it up as an existing table. The get_table() function
|
||||
/// does this, with the appropriate validation and api_error in case the table
|
||||
/// name is missing, invalid or the table doesn't exist. If everything is
|
||||
/// successful, it returns the table's schema.
|
||||
schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
|
||||
/// This find_table() variant is like get_table() excepts that it returns a
|
||||
/// nullptr instead of throwing if the request does not mention a TableName.
|
||||
/// In other cases of errors (i.e., a table is mentioned but doesn't exist)
|
||||
/// this function throws too.
|
||||
schema_ptr find_table(service::storage_proxy& proxy, const rjson::value& request);
|
||||
|
||||
/// This find_table() variant is like the previous one except that it takes
|
||||
/// the table name directly instead of a request object. It is used in cases
|
||||
/// where we already have the table name extracted from the request.
|
||||
schema_ptr find_table(service::storage_proxy& proxy, std::string_view table_name);
|
||||
|
||||
// We would have liked to support table names up to 255 bytes, like DynamoDB.
|
||||
// But Scylla creates a directory whose name is the table's name plus 33
|
||||
// bytes (dash and UUID), and since directory names are limited to 255 bytes,
|
||||
// we need to limit table names to 222 bytes, instead of 255. See issue #4480.
|
||||
// We actually have two limits here,
|
||||
// * max_table_name_length is the limit that Alternator will impose on names
|
||||
// of new Alternator tables.
|
||||
// * max_auxiliary_table_name_length is the potentially higher absolute limit
|
||||
// that Scylla imposes on the names of auxiliary tables that Alternator
|
||||
// wants to create internally - i.e. materialized views or CDC log tables.
|
||||
// The second limit might mean that it is not possible to add a GSI to an
|
||||
// existing table, because the name of the new auxiliary table may go over
|
||||
// the limit. The second limit is also one of the reasons why the first limit
|
||||
// is set lower than 222 - to have room to enable streams which add the extra
|
||||
// suffix "_scylla_cdc_log" to the table name.
|
||||
inline constexpr int max_table_name_length = 192;
|
||||
inline constexpr int max_auxiliary_table_name_length = 222;
|
||||
|
||||
/// validate_table_name() validates the TableName parameter in a request - it
|
||||
/// should be called in CreateTable, and in other requests only when noticing
|
||||
/// that the named table doesn't exist.
|
||||
/// The DynamoDB developer guide, https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules
|
||||
/// specifies that table "names must be between 3 and 255 characters long and
|
||||
/// can contain only the following characters: a-z, A-Z, 0-9, _ (underscore),
|
||||
/// - (dash), . (dot)". However, Alternator only allows max_table_name_length
|
||||
/// characters (see above) - not 255.
|
||||
/// validate_table_name() throws the appropriate api_error if this validation
|
||||
/// fails.
|
||||
void validate_table_name(std::string_view name, const char* source = "TableName");
|
||||
|
||||
/// Validate that a CDC log table could be created for the base table with a
|
||||
/// given table_name, and if not, throw a user-visible api_error::validation.
|
||||
/// It is not possible to create a CDC log table if the table name is so long
|
||||
/// that adding the 15-character suffix "_scylla_cdc_log" (cdc_log_suffix)
|
||||
/// makes it go over max_auxiliary_table_name_length.
|
||||
/// Note that if max_table_name_length is set to less than 207 (which is
|
||||
/// max_auxiliary_table_name_length-15), then this function will never
|
||||
/// fail. However, it's still important to call it in UpdateTable, in case
|
||||
/// we have pre-existing tables with names longer than this to avoid #24598.
|
||||
void validate_cdc_log_name_length(std::string_view table_name);
|
||||
|
||||
/// Checks if a keyspace, given by its name, is an Alternator keyspace.
|
||||
/// This just checks if the name begins in executor::KEYSPACE_NAME_PREFIX,
|
||||
/// a prefix that all keyspaces created by Alternator's CreateTable use.
|
||||
bool is_alternator_keyspace(std::string_view ks_name);
|
||||
|
||||
/// Wraps db::get_tags_of_table() and throws api_error::validation if the
|
||||
/// table is missing the tags extension.
|
||||
const std::map<sstring, sstring>& get_tags_of_table_or_throw(schema_ptr schema);
|
||||
|
||||
/// Returns a type object representing the type of the ":attrs" column used
|
||||
/// by Alternator to store all non-key attribute. This type is a map from
|
||||
/// string (attribute name) to bytes (serialized attribute value).
|
||||
map_type attrs_type();
|
||||
|
||||
// In DynamoDB index names are local to a table, while in Scylla, materialized
|
||||
// view names are global (in a keyspace). So we need to compose a unique name
|
||||
// for the view taking into account both the table's name and the index name.
|
||||
// We concatenate the table and index name separated by a delim character
|
||||
// (a character not allowed by DynamoDB in ordinary table names, default: ":").
|
||||
// The downside of this approach is that it limits the sum of the lengths,
|
||||
// instead of each component individually as DynamoDB does.
|
||||
// The view_name() function assumes the table_name has already been validated
|
||||
// but validates the legality of index_name and the combination of both.
|
||||
std::string view_name(std::string_view table_name, std::string_view index_name,
|
||||
const std::string& delim = ":", bool validate_len = true);
|
||||
std::string gsi_name(std::string_view table_name, std::string_view index_name,
|
||||
bool validate_len = true);
|
||||
std::string lsi_name(std::string_view table_name, std::string_view index_name,
|
||||
bool validate_len = true);
|
||||
|
||||
/// After calling pk_from_json() and ck_from_json() to extract the pk and ck
|
||||
/// components of a key, and if that succeeded, call check_key() to further
|
||||
/// check that the key doesn't have any spurious components.
|
||||
void check_key(const rjson::value& key, const schema_ptr& schema);
|
||||
|
||||
/// Fail with api_error::validation if the expression if has unused attribute
|
||||
/// names or values. This is how DynamoDB behaves, so we do too.
|
||||
void verify_all_are_used(const rjson::value* field,
|
||||
const std::unordered_set<std::string>& used,
|
||||
const char* field_name,
|
||||
const char* operation);
|
||||
|
||||
/// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY,
|
||||
/// SELECT, DROP, etc.) on the given table. When permission is denied an
|
||||
/// appropriate user-readable api_error::access_denied is thrown.
|
||||
future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, stats& stats);
|
||||
|
||||
/// Similar to verify_permission() above, but just for CREATE operations.
|
||||
/// Those do not operate on any specific table, so require permissions on
|
||||
/// ALL KEYSPACES instead of any specific table.
|
||||
future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, stats& stats);
|
||||
|
||||
// Sets a KeySchema JSON array inside the given parent object describing the
|
||||
// key attributes of the given schema as HASH or RANGE keys. Additionally,
|
||||
// adds mappings from key attribute names to their DynamoDB type string into
|
||||
// attribute_types.
|
||||
void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map<std::string, std::string>* attribute_types = nullptr, const std::map<sstring, sstring>* tags = nullptr);
|
||||
|
||||
/// is_big() checks approximately if the given JSON value is "bigger" than
|
||||
/// the given big_size number of bytes. The goal is to *quickly* detect
|
||||
/// oversized JSON that, for example, is too large to be serialized to a
|
||||
/// contiguous string - we don't need an accurate size for that. Moreover,
|
||||
/// as soon as we detect that the JSON is indeed "big", we can return true
|
||||
/// and don't need to continue calculating its exact size.
|
||||
bool is_big(const rjson::value& val, int big_size = 100'000);
|
||||
|
||||
/// try_get_internal_table() handles the special case that the given table_name
|
||||
/// begins with INTERNAL_TABLE_PREFIX (".scylla.alternator."). In that case,
|
||||
/// this function assumes that the rest of the name refers to an internal
|
||||
/// Scylla table (e.g., system table) and returns the schema of that table -
|
||||
/// or an exception if it doesn't exist. Otherwise, if table_name does not
|
||||
/// start with INTERNAL_TABLE_PREFIX, this function returns an empty schema_ptr
|
||||
/// and the caller should look for a normal Alternator table with that name.
|
||||
schema_ptr try_get_internal_table(const data_dictionary::database& db, std::string_view table_name);
|
||||
|
||||
/// get_table_from_batch_request() is used by batch write/read operations to
|
||||
/// look up the schema for a table named in a batch request, by the JSON member
|
||||
/// name (which is the table name in a BatchWriteItem or BatchGetItem request).
|
||||
schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request);
|
||||
|
||||
/// Returns (or lazily creates) the per-table stats object for the given schema.
|
||||
/// If the table has been deleted, returns a temporary stats object.
|
||||
lw_shared_ptr<stats> get_stats_from_schema(service::storage_proxy& sp, const schema& schema);
|
||||
|
||||
/// Writes one item's attributes into `item` from the given selection result
|
||||
/// row. If include_all_embedded_attributes is true, all attributes from the
|
||||
/// ATTRS_COLUMN map column are included regardless of attrs_to_get.
|
||||
void describe_single_item(const cql3::selection::selection&,
|
||||
const std::vector<managed_bytes_opt>&,
|
||||
const std::optional<attrs_to_get>&,
|
||||
rjson::value&,
|
||||
uint64_t* item_length_in_bytes = nullptr,
|
||||
bool include_all_embedded_attributes = false);
|
||||
|
||||
/// Converts a single result row to a JSON item, or returns an empty optional
|
||||
/// if the result is empty.
|
||||
std::optional<rjson::value> describe_single_item(schema_ptr,
|
||||
const query::partition_slice&,
|
||||
const cql3::selection::selection&,
|
||||
const query::result&,
|
||||
const std::optional<attrs_to_get>&,
|
||||
uint64_t* item_length_in_bytes = nullptr);
|
||||
|
||||
/// Make a body_writer (function that can write output incrementally to the
|
||||
/// HTTP stream) from the given JSON object.
|
||||
/// Note: only useful for (very) large objects as there are overhead issues
|
||||
/// with this as well, but for massive lists of return objects this can
|
||||
/// help avoid large allocations/many re-allocs.
|
||||
body_writer make_streamed(rjson::value&&);
|
||||
|
||||
} // namespace alternator
|
||||
@@ -744,7 +744,7 @@ void validate_attr_name_length(std::string_view supplementary_context, size_t at
|
||||
constexpr const size_t DYNAMODB_NONKEY_ATTR_NAME_SIZE_MAX = 65535;
|
||||
|
||||
const size_t max_length = is_key ? DYNAMODB_KEY_ATTR_NAME_SIZE_MAX : DYNAMODB_NONKEY_ATTR_NAME_SIZE_MAX;
|
||||
if (attr_name_length > max_length) {
|
||||
if (attr_name_length > max_length || attr_name_length == 0) {
|
||||
std::string error_msg;
|
||||
if (!error_msg_prefix.empty()) {
|
||||
error_msg += error_msg_prefix;
|
||||
@@ -754,7 +754,11 @@ void validate_attr_name_length(std::string_view supplementary_context, size_t at
|
||||
error_msg += supplementary_context;
|
||||
error_msg += " - ";
|
||||
}
|
||||
error_msg += fmt::format("Attribute name is too large, must be less than {} bytes", std::to_string(max_length + 1));
|
||||
if (attr_name_length == 0) {
|
||||
error_msg += "Empty attribute name";
|
||||
} else {
|
||||
error_msg += fmt::format("Attribute name is too large, must be less than {} bytes", std::to_string(max_length + 1));
|
||||
}
|
||||
throw api_error::validation(error_msg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +264,7 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) {
|
||||
body_writer compress(response_compressor::compression_type ct, const db::config& cfg, body_writer&& bw) {
|
||||
return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream<char>&& out) mutable -> future<> {
|
||||
output_stream_options opts;
|
||||
opts.trim_to_size = true;
|
||||
@@ -287,7 +287,7 @@ executor::body_writer compress(response_compressor::compression_type ct, const d
|
||||
};
|
||||
}
|
||||
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) {
|
||||
future<std::unique_ptr<http::reply>> response_compressor::generate_reply(std::unique_ptr<http::reply> rep, sstring accept_encoding, const char* content_type, body_writer&& body_writer) {
|
||||
response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits<size_t>::max());
|
||||
if (ct != response_compressor::compression_type::none) {
|
||||
rep->add_header("Content-Encoding", get_encoding_name(ct));
|
||||
|
||||
@@ -85,7 +85,7 @@ public:
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, std::string&& response_body);
|
||||
future<std::unique_ptr<http::reply>> generate_reply(std::unique_ptr<http::reply> rep,
|
||||
sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer);
|
||||
sstring accept_encoding, const char* content_type, body_writer&& body_writer);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -14,12 +14,12 @@
|
||||
#include "types/concrete_types.hh"
|
||||
#include "types/json_utils.hh"
|
||||
#include "mutation/position_in_partition.hh"
|
||||
#include "alternator/executor_util.hh"
|
||||
|
||||
static logging::logger slogger("alternator-serialization");
|
||||
|
||||
namespace alternator {
|
||||
|
||||
bool is_alternator_keyspace(const sstring& ks_name);
|
||||
|
||||
type_info type_info_from_string(std::string_view type) {
|
||||
static thread_local const std::unordered_map<std::string_view, type_info> type_infos = {
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "alternator/server.hh"
|
||||
#include "audit/audit.hh"
|
||||
#include "alternator/executor_util.hh"
|
||||
#include "gms/application_state.hh"
|
||||
#include "utils/log.hh"
|
||||
#include <fmt/ranges.h>
|
||||
@@ -143,7 +144,7 @@ public:
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(str));
|
||||
},
|
||||
[&] (executor::body_writer&& body_writer) {
|
||||
[&] (body_writer&& body_writer) {
|
||||
return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding),
|
||||
REPLY_CONTENT_TYPE, std::move(body_writer));
|
||||
},
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
|
||||
#include "executor.hh"
|
||||
#include "streams.hh"
|
||||
#include "alternator/executor_util.hh"
|
||||
#include "data_dictionary/data_dictionary.hh"
|
||||
#include "utils/rjson.hh"
|
||||
|
||||
@@ -282,7 +283,7 @@ future<alternator::executor::request_return_type> alternator::executor::list_str
|
||||
auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) };
|
||||
rjson::add(new_entry, "StreamArn", arn);
|
||||
rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s)));
|
||||
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(table_name(*s))));
|
||||
rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name())));
|
||||
rjson::push_back(streams, std::move(new_entry));
|
||||
--limit;
|
||||
}
|
||||
@@ -883,7 +884,7 @@ future<executor::request_return_type> executor::describe_stream(client_state& cl
|
||||
|
||||
rjson::add(stream_desc, "StreamArn", stream_arn);
|
||||
rjson::add(stream_desc, "StreamViewType", type);
|
||||
rjson::add(stream_desc, "TableName", rjson::from_string(table_name(*bs)));
|
||||
rjson::add(stream_desc, "TableName", rjson::from_string(bs->cf_name()));
|
||||
|
||||
describe_key_schema(stream_desc, *bs);
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@
|
||||
#include "cql3/query_options.hh"
|
||||
#include "cql3/column_identifier.hh"
|
||||
#include "alternator/executor.hh"
|
||||
#include "alternator/executor_util.hh"
|
||||
#include "alternator/controller.hh"
|
||||
#include "alternator/serialization.hh"
|
||||
#include "alternator/ttl_tag.hh"
|
||||
|
||||
@@ -195,7 +195,7 @@ public:
|
||||
for (auto sp : cfms) {
|
||||
const auto& schema = *sp;
|
||||
|
||||
if (!schema.cdc_options().enabled()) {
|
||||
if (!cdc_enabled(schema)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -1438,6 +1438,8 @@ alternator = [
|
||||
'alternator/controller.cc',
|
||||
'alternator/server.cc',
|
||||
'alternator/executor.cc',
|
||||
'alternator/executor_read.cc',
|
||||
'alternator/executor_util.cc',
|
||||
'alternator/stats.cc',
|
||||
'alternator/serialization.cc',
|
||||
'alternator/expressions.cc',
|
||||
|
||||
@@ -151,4 +151,5 @@ attribute a, modifying only a.b[3].c, and then writing back a.
|
||||
compatibility
|
||||
new-apis
|
||||
network
|
||||
vector-search
|
||||
```
|
||||
|
||||
322
docs/alternator/vector-search.md
Normal file
322
docs/alternator/vector-search.md
Normal file
@@ -0,0 +1,322 @@
|
||||
# Alternator Vector Search
|
||||
|
||||
## Introduction
|
||||
|
||||
Alternator vector search is a ScyllaDB extension to the DynamoDB-compatible
|
||||
API that enables _approximate nearest neighbor_ (ANN) search on numeric
|
||||
vectors stored as item attributes.
|
||||
|
||||
In a typical use case, each item in a table contains a high-dimensional
|
||||
embedding vector (e.g., produced by a machine-learning model), and a query
|
||||
asks for the _k_ items whose stored vectors are closest to a given query
|
||||
vector. This kind of similarity search is a building block for
|
||||
recommendation engines, semantic text search, image retrieval, and other
|
||||
AI/ML workloads.
|
||||
|
||||
Because this feature does not exist in Amazon DynamoDB, all applications
|
||||
that use it must be written specifically for Alternator.
|
||||
|
||||
For a broader introduction to Vector Search concepts and terminology, see the
|
||||
[Vector Search Concepts](https://cloud.docs.scylladb.com/stable/vector-search/vector-search-concepts.html)
|
||||
and
|
||||
[Vector Search Glossary](https://cloud.docs.scylladb.com/stable/vector-search/vector-search-glossary.html)
|
||||
sections of the ScyllaDB Cloud documentation.
|
||||
|
||||
## Overview
|
||||
|
||||
The workflow has three steps:
|
||||
|
||||
1. **Create** a table (or update an existing one) with one or more
|
||||
_vector indexes_.
|
||||
2. **Write** items that include the indexed vector attribute, just like any
|
||||
other list attribute.
|
||||
3. **Query** using the `VectorSearch` parameter to retrieve the _k_ nearest
|
||||
neighbors.
|
||||
|
||||
## API extensions
|
||||
|
||||
### CreateTable — VectorIndexes parameter
|
||||
|
||||
A new optional parameter `VectorIndexes` can be passed to `CreateTable`.
|
||||
It is a list of vector index definitions, each specifying:
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `IndexName` | String | Unique name for this vector index. Follows the same naming rules as table names: 3–192 characters, matching the regex `[a-zA-Z0-9._-]+`. |
|
||||
| `VectorAttribute` | Structure | Describes the attribute to index (see below). |
|
||||
| `Projection` | Structure | Optional. Specifies which attributes are projected into the vector index (see below). |
|
||||
|
||||
**VectorAttribute fields:**
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `AttributeName` | String | The item attribute that holds the vector. It must not be a key column. |
|
||||
| `Dimensions` | Integer | The fixed size of the vector (number of elements). |
|
||||
|
||||
**Projection fields:**
|
||||
|
||||
The optional `Projection` parameter is identical to the one used for DynamoDB
|
||||
GSI and LSI, and specifies which attributes are stored in the vector index and
|
||||
returned when `Select=ALL_PROJECTED_ATTRIBUTES` is used in a vector search query:
|
||||
|
||||
| `ProjectionType` | Description |
|
||||
|-----------------|-------------|
|
||||
| `KEYS_ONLY` | Only the primary key attributes of the base table (hash key and range key if present) are projected into the index. This is the default when `Projection` is omitted. |
|
||||
| `ALL` | All table attributes are projected into the index. *(Not yet supported.)* |
|
||||
| `INCLUDE` | The primary key attributes plus the additional non-key attributes listed in `NonKeyAttributes` are projected. *(Not yet supported.)* |
|
||||
|
||||
> **Note:** Currently only `ProjectionType=KEYS_ONLY` is implemented. Specifying
|
||||
> `ProjectionType=ALL` or `ProjectionType=INCLUDE` returns a `ValidationException`.
|
||||
> Since `KEYS_ONLY` is also the default, omitting `Projection` entirely is
|
||||
> equivalent to specifying `{'ProjectionType': 'KEYS_ONLY'}`.
|
||||
|
||||
Example (using boto3):
|
||||
```python
|
||||
table = dynamodb.create_table(
|
||||
TableName='my-table',
|
||||
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
|
||||
BillingMode='PAY_PER_REQUEST',
|
||||
VectorIndexes=[
|
||||
{
|
||||
'IndexName': 'embedding-index',
|
||||
'VectorAttribute': {'AttributeName': 'embedding', 'Dimensions': 1536},
|
||||
}
|
||||
],
|
||||
)
|
||||
```
|
||||
|
||||
**Constraints:**
|
||||
- A vector index may not share a name with another vector index, a GSI, or an LSI on the same table.
|
||||
- The target attribute must not be a key column or an index key column.
|
||||
- `Dimensions` must be a positive integer up to the implementation maximum.
|
||||
- Vector indexes require ScyllaDB to operate with tablets (not vnodes).
|
||||
- Multiple vector indexes can be created on the same table in a single `CreateTable` call.
|
||||
|
||||
---
|
||||
|
||||
### UpdateTable — VectorIndexUpdates parameter
|
||||
|
||||
A new optional parameter `VectorIndexUpdates` can be passed to `UpdateTable`
|
||||
to add or remove a vector index after the table is created. At most one
|
||||
index operation (Create or Delete) may be requested per call.
|
||||
|
||||
Each element of the list is an object with exactly one of the following keys:
|
||||
|
||||
**Create:**
|
||||
```json
|
||||
{
|
||||
"Create": {
|
||||
"IndexName": "my-vector-index",
|
||||
"VectorAttribute": {"AttributeName": "embedding", "Dimensions": 1536},
|
||||
"Projection": {"ProjectionType": "KEYS_ONLY"}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `Projection` field in the `Create` action is optional and accepts the same
|
||||
values as the `Projection` field in `CreateTable`'s `VectorIndexes` (see above).
|
||||
Currently only `ProjectionType=KEYS_ONLY` is supported; it is also the default
|
||||
when `Projection` is omitted.
|
||||
|
||||
**Delete:**
|
||||
```json
|
||||
{
|
||||
"Delete": {
|
||||
"IndexName": "my-vector-index"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The same constraints as `CreateTable`'s `VectorIndexes` apply.
|
||||
|
||||
---
|
||||
|
||||
### DescribeTable — VectorIndexes in the response
|
||||
|
||||
`DescribeTable` (and `CreateTable`'s response) returns a `VectorIndexes`
|
||||
field in the `TableDescription` object when the table has at least one
|
||||
vector index. The structure mirrors the `CreateTable` input: a list of
|
||||
objects each containing `IndexName`, `VectorAttribute`
|
||||
(`AttributeName` + `Dimensions`), and `Projection` (`ProjectionType`).
|
||||
Currently `Projection` always contains `{"ProjectionType": "KEYS_ONLY"}`
|
||||
because that is the only supported projection type.
|
||||
|
||||
Each vector index entry also includes status fields that mirror the standard
|
||||
behavior of `GlobalSecondaryIndexes` in DynamoDB:
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `IndexStatus` | String | `"ACTIVE"` when the vector store has finished building the index and it is fully operational. `"CREATING"` while the index is still being built (the vector store service is still initializing or has not yet been discovered). |
|
||||
| `Backfilling` | Boolean | Present and `true` only when `IndexStatus` is `"CREATING"` and the vector store is actively performing the initial scan of the base table (bootstrapping). When the initial scan is not yet started, or has already completed, this field is absent. |
|
||||
|
||||
When creating a vector index on a non-empty table (via `UpdateTable`), data
|
||||
already in the table is picked up by the vector store through a full-table
|
||||
scan (backfill). During this period `IndexStatus` will be `"CREATING"` and
|
||||
`Backfilling` will be `true`. Once the scan completes the vector store
|
||||
transitions to monitoring CDC for ongoing changes, and `IndexStatus` becomes
|
||||
`"ACTIVE"`.
|
||||
|
||||
**Type enforcement before and after index creation differs:**
|
||||
|
||||
- **Pre-existing items:** when the backfill scan encounters an item whose
|
||||
indexed attribute is not a list of exactly the right number of numbers
|
||||
(e.g., it is a string, a list of the wrong length, or contains
|
||||
non-numeric elements), that item is silently skipped and not indexed.
|
||||
The item remains in the base table unchanged.
|
||||
|
||||
- **New writes once the index exists:** any attempt to write a value to the
|
||||
indexed attribute that is not a list of exactly `Dimensions` numbers —
|
||||
where each number is representable as a 32-bit float — is rejected with a
|
||||
`ValidationException`. This applies to `PutItem`, `UpdateItem`, and
|
||||
`BatchWriteItem`. A missing value for the indexed attribute is always
|
||||
allowed; such items simply are not indexed.
|
||||
|
||||
> **Important:** Applications must wait until `IndexStatus` is `"ACTIVE"` before
|
||||
> issuing `Query` requests against a vector index. Queries on a vector index
|
||||
> whose `IndexStatus` is still `"CREATING"` may fail. This applies both when
|
||||
> adding a vector index to an existing table via `UpdateTable` **and** when
|
||||
> creating a new table with a `VectorIndexes` parameter in `CreateTable` — even
|
||||
> though the new table starts empty, the vector store still needs a short
|
||||
> initialization period before it can serve queries.
|
||||
|
||||
---
|
||||
|
||||
### Query — VectorSearch parameter
|
||||
|
||||
To perform a nearest-neighbor search, pass the `VectorSearch` parameter
|
||||
to `Query`. When this parameter is present the request is interpreted as a
|
||||
vector search rather than a standard key-condition query.
|
||||
|
||||
**VectorSearch fields:**
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `QueryVector` | AttributeValue (list `L`) | The query vector as a DynamoDB `AttributeValue` of type `L`. Every element must be of type `N` (number). |
|
||||
|
||||
Example:
|
||||
```python
|
||||
response = table.query(
|
||||
IndexName='embedding-index',
|
||||
Limit=10,
|
||||
VectorSearch={
|
||||
'QueryVector': {'L': [{'N': '0.1'}, {'N': '-0.3'}, {'N': '0.7'}, ...]},
|
||||
},
|
||||
)
|
||||
```
|
||||
|
||||
**Requirements:**
|
||||
|
||||
| Parameter | Details |
|
||||
|-----------|---------|
|
||||
| `IndexName` | Required. Must name a vector index on this table (not a GSI or LSI). |
|
||||
| `VectorSearch.QueryVector` | Required. A DynamoDB `AttributeValue` of type `L`; all elements must be of type `N` (number). |
|
||||
| QueryVector length | Must match the `Dimensions` configured for the named vector index. |
|
||||
| `Limit` | Required. Defines _k_ — how many nearest neighbors to return. Must be a positive integer. |
|
||||
|
||||
**Differences from standard Query:**
|
||||
|
||||
Vector search reinterprets several standard `Query` parameters in a fundamentally
|
||||
different way, and explicitly rejects others that have no meaningful interpretation:
|
||||
|
||||
- **`Limit` means top-k, not page size.** In a standard Query, `Limit` caps
|
||||
the number of items examined per page, and you page through results with
|
||||
`ExclusiveStartKey`. In vector search, `Limit` defines _k_: the ANN
|
||||
algorithm runs once and returns exactly the _k_ nearest neighbors. There
|
||||
is no natural "next page" — each page would require a full re-run of the
|
||||
search — so **`ExclusiveStartKey` is rejected**.
|
||||
|
||||
- **Results are ordered by vector distance, not by sort key.** A standard
|
||||
Query returns rows in sort-key order; `ScanIndexForward=false` reverses
|
||||
that order. Vector search always returns results ordered by their distance
|
||||
to `QueryVector` (nearest first). Having `ScanIndexForward` specify sort-key
|
||||
direction has no meaning here, so **`ScanIndexForward` is rejected**.
|
||||
|
||||
- **Eventual consistency only.** The vector store is an external service fed
|
||||
asynchronously from ScyllaDB via CDC. Like GSIs, vector indexes can never
|
||||
reflect writes instantly, so strongly-consistent reads are impossible.
|
||||
**`ConsistentRead=true` is rejected.**
|
||||
|
||||
- **No key condition.** A standard Query requires a `KeyConditionExpression`
|
||||
to select which partition to read. Vector search queries the vector store
|
||||
globally across all partitions of the table. `KeyConditions` and
|
||||
`KeyConditionExpression` are therefore not applicable and are silently
|
||||
ignored. (Local vector indexes, which would scope the search to a single
|
||||
partition and use `KeyConditionExpression`, are not yet supported.)
|
||||
|
||||
**Select parameter:**
|
||||
|
||||
The standard DynamoDB `Select` parameter is supported for vector search queries
|
||||
and controls which attributes are returned for each matching item:
|
||||
|
||||
| `Select` value | Behavior |
|
||||
|----------------|----------|
|
||||
| `ALL_PROJECTED_ATTRIBUTES` (default) | Return only the attributes projected to the vector index. Currently, only the primary key attributes (hash key, and range key if present) are projected; support for configuring additional projected attributes is not yet implemented. Note that the vector attribute itself is **not** included: the vector store may not retain the original floating-point values (e.g., it may quantize them), so the authoritative copy lives only in the base table. This is the most efficient option because Scylla can return the results directly from the vector store without an additional fetch from the base table. |
|
||||
| `ALL_ATTRIBUTES` | Return all attributes of each matching item, fetched from the base table. |
|
||||
| `SPECIFIC_ATTRIBUTES` | Return only the attributes named in `ProjectionExpression` or `AttributesToGet`. |
|
||||
| `COUNT` | Return only the count of matching items; no `Items` list is included in the response. |
|
||||
|
||||
When neither `Select` nor `ProjectionExpression`/`AttributesToGet` is specified,
|
||||
`Select` defaults to `ALL_PROJECTED_ATTRIBUTES`. When `ProjectionExpression` or
|
||||
`AttributesToGet` is present without an explicit `Select`, it implies
|
||||
`SPECIFIC_ATTRIBUTES`. Using `ProjectionExpression` or `AttributesToGet`
|
||||
together with an explicit `Select` other than `SPECIFIC_ATTRIBUTES` is an error.
|
||||
|
||||
**Note on performance:** Unlike a DynamoDB LSI, a vector index allows you to
|
||||
read non-projected attributes (e.g., with `ALL_ATTRIBUTES` or
|
||||
`SPECIFIC_ATTRIBUTES` requesting a non-key column). However, doing so requires
|
||||
an additional read from the base table for each result — similar to reading
|
||||
through a secondary index (rather than a materialized view) in CQL — and is
|
||||
therefore significantly slower than returning only projected attributes with
|
||||
`ALL_PROJECTED_ATTRIBUTES`. For latency-sensitive applications, prefer
|
||||
`ALL_PROJECTED_ATTRIBUTES` or limiting `SPECIFIC_ATTRIBUTES` to key columns.
|
||||
|
||||
**FilterExpression:**
|
||||
|
||||
Vector search supports `FilterExpression` for post-filtering results. This
|
||||
works the same way as `FilterExpression` on a standard DynamoDB `Query`: after
|
||||
the ANN search, the filter is applied to each candidate item and only matching
|
||||
items are returned.
|
||||
|
||||
**Important:** filtering happens _after_ the `Limit` nearest neighbors have
|
||||
already been selected by the vector index. If the filter discards some of
|
||||
those candidates, the response may contain **fewer than `Limit` items**. The
|
||||
server does not automatically fetch additional neighbors to replace filtered-out
|
||||
items. This is identical to how `FilterExpression` interacts with `Limit` in a
|
||||
standard DynamoDB `Query`.
|
||||
|
||||
The response always includes two count fields:
|
||||
|
||||
| Field | Description |
|
||||
|-------|-------------|
|
||||
| `ScannedCount` | The number of candidates returned by the vector index (always equal to `Limit`, unless the table contains fewer than `Limit` items). |
|
||||
| `Count` | The number of items that passed the `FilterExpression` (or equal to `ScannedCount` when no filter is present). |
|
||||
|
||||
**Interaction with `Select`:**
|
||||
|
||||
- `Select=ALL_ATTRIBUTES`: Each candidate item is fetched from the
|
||||
base table, the filter is evaluated against all its attributes, and only
|
||||
matching items are returned. `Count` reflects the number of items that passed
|
||||
the filter.
|
||||
|
||||
- `Select=SPECIFIC_ATTRIBUTES`: Each candidate item is fetched from the base
|
||||
table — including any attributes needed by the filter expression, even if
|
||||
those attributes are not listed in `ProjectionExpression` — and the filter is
|
||||
applied. Only the projected attributes are returned in the response; filter
|
||||
attributes that were not requested are not included in the returned items.
|
||||
|
||||
- `Select=COUNT`: The candidate items are still fetched from the base table and
|
||||
the filter is evaluated for each one, but no `Items` list is returned. `Count`
|
||||
reflects the number of items that passed the filter; `ScannedCount` is the
|
||||
total number of candidates examined. This is useful for counting matches
|
||||
without transferring item data to the client.
|
||||
|
||||
- `Select=ALL_PROJECTED_ATTRIBUTES` (default): When no filter is present this is the most
|
||||
efficient mode — results are returned directly from the vector store without
|
||||
any base-table reads. When a `FilterExpression` is present, however, the full
|
||||
item must be fetched from the base table to evaluate the filter, and only the
|
||||
projected (key) attributes are returned for items that pass.
|
||||
|
||||
> **Note:** `QueryFilter` (the legacy non-expression filter API) is **not**
|
||||
> supported for vector search queries and will be rejected with a
|
||||
> `ValidationException`. Use `FilterExpression` instead.
|
||||
2
main.cc
2
main.cc
@@ -2609,7 +2609,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
|
||||
api::set_server_service_levels(ctx, cql_server_ctl, qp).get();
|
||||
|
||||
alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, *cfg, dbcfg.statement_scheduling_group);
|
||||
alternator::controller alternator_ctl(gossiper, proxy, ss, mm, sys_dist_ks, cdc_generation_service, service_memory_limiter, auth_service, sl_controller, vector_store_client, *cfg, dbcfg.statement_scheduling_group);
|
||||
|
||||
// Register at_exit last, so that storage_service::drain_on_shutdown will be called first
|
||||
auto do_drain = defer_verbose_shutdown("local storage", [&ss] {
|
||||
|
||||
@@ -154,8 +154,7 @@ def new_dynamodb_session(request, dynamodb, get_valid_alternator_role):
|
||||
conf = botocore.client.Config(parameter_validation=False)
|
||||
if request.config.getoption('aws'):
|
||||
return boto3.resource('dynamodb', config=conf)
|
||||
if host.hostname == 'localhost':
|
||||
conf = conf.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300))
|
||||
conf = conf.merge(botocore.client.Config(retries={"max_attempts": 0}, read_timeout=300))
|
||||
user, secret = get_valid_alternator_role(dynamodb.meta.client._endpoint.host, role=user)
|
||||
region_name = dynamodb.meta.client.meta.region_name
|
||||
return ses.resource('dynamodb', endpoint_url=dynamodb.meta.client._endpoint.host, verify=host.scheme != 'http',
|
||||
|
||||
@@ -6,6 +6,7 @@ import run
|
||||
|
||||
import os
|
||||
import requests
|
||||
import glob
|
||||
|
||||
# When tests are to be run against AWS (the "--aws" option), it is not
|
||||
# necessary to start Scylla at all. All we need to do is to run pytest.
|
||||
@@ -83,6 +84,8 @@ def run_alternator_cmd(pid, dir):
|
||||
]
|
||||
else:
|
||||
cmd += ['--alternator-port', '8000']
|
||||
if '--vs' in sys.argv:
|
||||
cmd += ['--vector-store-primary-uri', f'http://{ip}:6080']
|
||||
cmd += extra_scylla_options
|
||||
|
||||
for i in remove_scylla_options:
|
||||
@@ -111,6 +114,49 @@ if '--https' in sys.argv:
|
||||
else:
|
||||
alternator_url=f"http://{ip}:8000"
|
||||
|
||||
# If the "--vs" option is given, also run the vector store process.
|
||||
# The vector store is run in its own temporary directory, but runs
|
||||
# on the same IP address as Scylla (otherwise, the first of the two
|
||||
# which we will run will not know where to find the second).
|
||||
def run_vector_store_cmd(pid, dir):
|
||||
global ip # same IP as Scylla, see comment above
|
||||
print('Booting Vector Store on ' + ip + ' in ' + dir + '...')
|
||||
with open(f'{dir}/.password', 'w') as f:
|
||||
print('cassandra', file=f)
|
||||
env = {
|
||||
'VECTOR_STORE_URI': f'{ip}:6080',
|
||||
'VECTOR_STORE_SCYLLADB_URI': f'{ip}:9042',
|
||||
'VECTOR_STORE_SCYLLADB_USERNAME': 'cassandra',
|
||||
'VECTOR_STORE_SCYLLADB_PASSWORD_FILE': f'{dir}/.password',
|
||||
}
|
||||
global vector_store_executable # set by the code below
|
||||
cmd = [
|
||||
vector_store_executable
|
||||
]
|
||||
return (cmd, env)
|
||||
if '--vs' in sys.argv:
|
||||
sys.argv.remove('--vs')
|
||||
# Find the vector-store executable. We look for it in the vector-store/
|
||||
# directory next to the Scylla working directory taking the newest built
|
||||
# executable, but it can also be specified by the user by setting the
|
||||
# VECTOR_STORE environment variable to the path of if os.getenv('SCYLLA'):
|
||||
global vector_store_executable
|
||||
if os.getenv('VECTOR_STORE'):
|
||||
vector_store_executable = os.path.abspath(os.getenv('VECTOR_STORE'))
|
||||
else:
|
||||
vector_store_dir = os.path.join(os.path.dirname(run.source_path), 'vector-store')
|
||||
vector_stores = glob.glob(os.path.join(vector_store_dir, 'target/*/vector-store'))
|
||||
if not vector_stores:
|
||||
print(f"Can't find a compiled Vector Store in {vector_store_dir}.\nPlease build Vector Store or set VECTOR_STORE to the path of a Vector Store executable.")
|
||||
exit(1)
|
||||
vector_store_executable = max(vector_stores, key=os.path.getmtime)
|
||||
if not os.access(vector_store_executable, os.X_OK):
|
||||
print(f"Cannot execute '{vector_store_executable}'.\nPlease set VECTOR_STORE to the path of a Vector Store executable.")
|
||||
exit(1)
|
||||
print(f"Vector Store is: {vector_store_executable}.")
|
||||
run.run_with_temporary_dir(run_vector_store_cmd)
|
||||
|
||||
|
||||
# Wait for both CQL and Alternator APIs to become responsive. We obviously
|
||||
# need the Alternator API to test Alternator, but currently we also need
|
||||
# CQL for setting up authentication.
|
||||
|
||||
@@ -1672,6 +1672,22 @@ def test_gsi_query_select_1(test_table_gsi_1):
|
||||
Select='SPECIFIC_ATTRIBUTES',
|
||||
AttributesToGet=['y'],
|
||||
KeyConditions={'c': {'AttributeValueList': [c], 'ComparisonOperator': 'EQ'}})
|
||||
assert_index_query(test_table_gsi_1, 'hello', expected_items,
|
||||
Select='SPECIFIC_ATTRIBUTES',
|
||||
ProjectionExpression='y',
|
||||
KeyConditions={'c': {'AttributeValueList': [c], 'ComparisonOperator': 'EQ'}})
|
||||
# If AttributesToGet or ProjectionExpression are used, SPECIFIC_ATTRIBUTES
|
||||
# is implied, and can be omitted.
|
||||
assert_index_query(test_table_gsi_1, 'hello', expected_items,
|
||||
AttributesToGet=['y'],
|
||||
KeyConditions={'c': {'AttributeValueList': [c], 'ComparisonOperator': 'EQ'}})
|
||||
assert_index_query(test_table_gsi_1, 'hello', expected_items,
|
||||
ProjectionExpression='y',
|
||||
KeyConditions={'c': {'AttributeValueList': [c], 'ComparisonOperator': 'EQ'}})
|
||||
assert_index_query(test_table_gsi_1, 'hello', expected_items,
|
||||
ProjectionExpression='#name',
|
||||
ExpressionAttributeNames={'#name': 'y'},
|
||||
KeyConditions={'c': {'AttributeValueList': [c], 'ComparisonOperator': 'EQ'}})
|
||||
assert not 'Items' in test_table_gsi_1.query(ConsistentRead=False,
|
||||
IndexName='hello',
|
||||
Select='COUNT',
|
||||
@@ -1714,13 +1730,24 @@ def test_gsi_query_select_2(dynamodb):
|
||||
Select='ALL_ATTRIBUTES',
|
||||
KeyConditions={'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# SPECIFIC_ATTRIBUTES (with AttributesToGet / ProjectionExpression)
|
||||
# is allowed for the projected attributes, but not for unprojected
|
||||
# attributes.
|
||||
# is allowed for the projected attributes, but not allowed for
|
||||
# unprojected attributes:
|
||||
expected_items = [{'a': z['a']} for z in items if z['x'] == x]
|
||||
assert_index_query(table, 'hello', expected_items,
|
||||
Select='SPECIFIC_ATTRIBUTES',
|
||||
AttributesToGet=['a'],
|
||||
KeyConditions={'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# Requesting an unprojected attribute 'b' via AttributesToGet or
|
||||
# ProjectionExpression returns an explicit error, not silent nothing.
|
||||
with pytest.raises(ClientError, match='ValidationException.*project'):
|
||||
table.query(IndexName='hello',
|
||||
Select='SPECIFIC_ATTRIBUTES',
|
||||
AttributesToGet=['b'],
|
||||
KeyConditions={'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
with pytest.raises(ClientError, match='ValidationException.*project'):
|
||||
table.query(IndexName='hello',
|
||||
ProjectionExpression='b',
|
||||
KeyConditions={'x': {'AttributeValueList': [x], 'ComparisonOperator': 'EQ'}})
|
||||
# Select=COUNT is also allowed, and doesn't return item content
|
||||
assert not 'Items' in table.query(ConsistentRead=False,
|
||||
IndexName='hello',
|
||||
|
||||
@@ -878,3 +878,13 @@ def test_many_attributes(test_table_s):
|
||||
AttributeUpdates={key: {'Value': more_attributes[key], 'Action': 'PUT'} for key in more_attributes.keys()})
|
||||
item = {**item, **more_attributes}
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'] == item
|
||||
|
||||
# Test that attribute names can contain basically any character - even things
|
||||
# like backslashes, quotes, spaces, newlines, and even null (!).
|
||||
def test_attribute_allowed_chars(test_table_s):
|
||||
p = random_string()
|
||||
s = bytes(range(256)).decode('latin-1')
|
||||
for chars in ['abc', ' ', "-\\\"'_.:/#&", s]:
|
||||
test_table_s.update_item(Key={'p': p},
|
||||
AttributeUpdates={chars: {'Value': chars, 'Action': 'PUT'}})
|
||||
assert test_table_s.get_item(Key={'p': p}, ConsistentRead=True)['Item'][chars] == chars
|
||||
@@ -84,6 +84,29 @@ def test_limit_attribute_length_nonkey_bad(test_table_s):
|
||||
ExpressionAttributeNames={'#name': too_long_name},
|
||||
ExpressionAttributeValues={':val': 1})
|
||||
|
||||
# Empty attribute name is also not allowed. Reproduces SCYLLADB-1069.
|
||||
# We have similar tests for empty keys in test_item.py::test_{put,update}_item_empty_key.
|
||||
def test_limit_attribute_length_nonkey_empty(test_table_s):
|
||||
p = random_string()
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.put_item(Item={'p': p, '': 1})
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.get_item(Key={'p': p}, ProjectionExpression='#name',
|
||||
ExpressionAttributeNames={'#name': ''})
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.get_item(Key={'p': p}, AttributesToGet=[''])
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.update_item(Key={'p': p}, AttributeUpdates={'': {'Value': 2, 'Action': 'PUT'}})
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='SET #name = :val',
|
||||
ExpressionAttributeNames={'#name': ''},
|
||||
ExpressionAttributeValues={':val': 3})
|
||||
with pytest.raises(ClientError, match='ValidationException.*Empty attribute name'):
|
||||
test_table_s.update_item(Key={'p': p}, UpdateExpression='SET a = :val',
|
||||
ConditionExpression='#name = :val',
|
||||
ExpressionAttributeNames={'#name': ''},
|
||||
ExpressionAttributeValues={':val': 1})
|
||||
|
||||
# Attribute length test 3: Test that *key* (hash and range) attribute names
|
||||
# up to 255 characters are allowed. In the test below we'll see that larger
|
||||
# sizes aren't allowed.
|
||||
|
||||
@@ -35,6 +35,7 @@ from botocore.exceptions import ClientError
|
||||
|
||||
from test.alternator.test_cql_rbac import new_dynamodb, new_role
|
||||
from test.alternator.util import random_string, new_test_table, is_aws, scylla_config_read, scylla_config_temporary, get_signed_request
|
||||
from test.alternator.test_vector import vs
|
||||
|
||||
# Fixture for checking if we are able to test Scylla metrics. Scylla metrics
|
||||
# are not available on AWS (of course), but may also not be available for
|
||||
@@ -396,6 +397,29 @@ def test_scan_operations(test_table_s, metrics):
|
||||
test_table_s.query(Limit=1, KeyConditionExpression='p=:p',
|
||||
ExpressionAttributeValues={':p': 'dog'})
|
||||
|
||||
def test_table_scan_operations(test_table_s, metrics):
|
||||
with check_table_increases_operation(metrics, ['Query', 'Scan'], test_table_s.name):
|
||||
test_table_s.scan(Limit=1)
|
||||
test_table_s.query(Limit=1, KeyConditionExpression='p=:p',
|
||||
ExpressionAttributeValues={':p': 'dog'})
|
||||
|
||||
# Test counter for Query with VectorSearch: both global and per-table.
|
||||
def test_query_vector_operations(vs, metrics):
|
||||
with new_test_table(vs,
|
||||
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
|
||||
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}],
|
||||
VectorIndexes=[{'IndexName': 'vind',
|
||||
'VectorAttribute': {'AttributeName': 'v', 'Dimensions': 3}}]) as table:
|
||||
with check_increases_operation(metrics, ['Query']):
|
||||
with check_table_increases_operation(metrics, ['Query'], table.name):
|
||||
# The vector store may or may not be configured; either way
|
||||
# the Query counter must be incremented.
|
||||
try:
|
||||
table.query(IndexName='vind',
|
||||
VectorSearch={'QueryVector': [1, 2, 3]}, Limit=1)
|
||||
except ClientError:
|
||||
pass
|
||||
|
||||
# Test counters for DescribeEndpoints:
|
||||
def test_describe_endpoints_operations(dynamodb, metrics):
|
||||
with check_increases_operation(metrics, ['DescribeEndpoints']):
|
||||
|
||||
2208
test/alternator/test_vector.py
Normal file
2208
test/alternator/test_vector.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -297,6 +297,36 @@ struct vector_store_client::impl {
|
||||
return _primary_uris.empty() && _secondary_uris.empty();
|
||||
}
|
||||
|
||||
auto get_index_status(keyspace_name keyspace, index_name name, abort_source& as)
|
||||
-> future<vector_store_client::index_status> {
|
||||
using index_status = vector_store_client::index_status;
|
||||
if (is_disabled()) {
|
||||
co_return index_status::creating;
|
||||
}
|
||||
auto path = format("/api/v1/indexes/{}/{}/status", keyspace, name);
|
||||
auto resp = co_await request(operation_type::GET, std::move(path), std::nullopt, as);
|
||||
if (!resp || resp->status != status_type::ok) {
|
||||
co_return index_status::creating;
|
||||
}
|
||||
try {
|
||||
auto json = rjson::parse(response_content_to_sstring(resp->content));
|
||||
const auto* status = rjson::find(json, "status");
|
||||
if (!status || !status->IsString()) {
|
||||
co_return index_status::creating;
|
||||
}
|
||||
auto sv = rjson::to_string_view(*status);
|
||||
if (sv == "SERVING") {
|
||||
co_return index_status::serving;
|
||||
}
|
||||
if (sv == "BOOTSTRAPPING") {
|
||||
co_return index_status::backfilling;
|
||||
}
|
||||
co_return index_status::creating;
|
||||
} catch (...) {
|
||||
co_return index_status::creating;
|
||||
}
|
||||
}
|
||||
|
||||
auto ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, const rjson::value& filter, abort_source& as)
|
||||
-> future<std::expected<primary_keys, ann_error>> {
|
||||
if (is_disabled()) {
|
||||
@@ -376,6 +406,10 @@ auto vector_store_client::is_disabled() const -> bool {
|
||||
return _impl->is_disabled();
|
||||
}
|
||||
|
||||
auto vector_store_client::get_index_status(keyspace_name keyspace, index_name name, abort_source& as) -> future<index_status> {
|
||||
return _impl->get_index_status(std::move(keyspace), std::move(name), as);
|
||||
}
|
||||
|
||||
auto vector_store_client::ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, const rjson::value& filter, abort_source& as)
|
||||
-> future<std::expected<primary_keys, ann_error>> {
|
||||
return _impl->ann(keyspace, name, schema, vs_vector, limit, filter, as);
|
||||
|
||||
@@ -74,6 +74,21 @@ public:
|
||||
/// Check if the vector_store_client is disabled.
|
||||
auto is_disabled() const -> bool;
|
||||
|
||||
/// The operational status of a single vector index, as reported by the vector store.
|
||||
enum class index_status {
|
||||
/// The index is not yet ready: initializing, not yet discovered, or the
|
||||
/// vector store is unreachable.
|
||||
creating,
|
||||
/// The index is performing the initial full scan of the base table
|
||||
/// (backfilling). Queries may be served but results are incomplete.
|
||||
backfilling,
|
||||
/// The index has completed the initial scan and is fully operational.
|
||||
serving,
|
||||
};
|
||||
|
||||
/// Query the vector store for the current status of a specific vector index.
|
||||
auto get_index_status(keyspace_name keyspace, index_name name, abort_source& as) -> future<index_status>;
|
||||
|
||||
/// Request the vector store service for the primary keys of the nearest neighbors
|
||||
auto ann(keyspace_name keyspace, index_name name, schema_ptr schema, vs_vector vs_vector, limit limit, const rjson::value& filter, abort_source& as)
|
||||
-> future<std::expected<primary_keys, ann_error>>;
|
||||
|
||||
Reference in New Issue
Block a user