/* * Copyright 2019-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once #include #include "seastarx.hh" #include #include #include "service/migration_manager.hh" #include "service/client_state.hh" #include "service_permit.hh" #include "db/timeout_clock.hh" #include "db/config.hh" #include "alternator/error.hh" #include "stats.hh" #include "utils/rjson.hh" #include "utils/updateable_value.hh" #include "utils/simple_value_with_expiry.hh" #include "tracing/trace_state.hh" namespace db { class system_distributed_keyspace; } namespace query { class partition_slice; class result; } namespace cql3::selection { class selection; } namespace service { class storage_proxy; class cas_shard; class storage_service; } namespace cdc { class metadata; } namespace gms { class gossiper; } class schema_builder; namespace alternator { enum class table_status; class rmw_operation; class put_or_delete_item; schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request); bool is_alternator_keyspace(const sstring& ks_name); // Wraps the db::get_tags_of_table and throws if the table is missing the tags extension. const std::map& get_tags_of_table_or_throw(schema_ptr schema); // An attribute_path_map object is used to hold data for various attributes // paths (parsed::path) in a hierarchy of attribute paths. Each attribute path // has a root attribute, and then modified by member and index operators - // for example in "a.b[2].c" we have "a" as the root, then ".b" member, then // "[2]" index, and finally ".c" member. // Data can be added to an attribute_path_map using the add() function, but // requires that attributes with data not be *overlapping* or *conflicting*: // // 1. Two attribute paths which are identical or an ancestor of one another // are considered *overlapping* and not allowed. If a.b.c has data, // we can't add more data in a.b.c or any of its descendants like a.b.c.d. // // 2. Two attribute paths which need the same parent to have both a member and // an index are considered *conflicting* and not allowed. E.g., if a.b has // data, you can't add a[1]. The meaning of adding both would be that the // attribute a is both a map and an array, which isn't sensible. // // These two requirements are common to the two places where Alternator uses // this abstraction to describe how a hierarchical item is to be transformed: // // 1. In ProjectExpression: for filtering from a full top-level attribute // only the parts for which user asked in ProjectionExpression. // // 2. In UpdateExpression: for taking the previous value of a top-level // attribute, and modifying it based on the instructions in the user // wrote in UpdateExpression. template class attribute_path_map_node { public: using data_t = T; // We need the extra unique_ptr<> here because libstdc++ unordered_map // doesn't work with incomplete types :-( using members_t = std::unordered_map>>; // The indexes list is sorted because DynamoDB requires handling writes // beyond the end of a list in index order. using indexes_t = std::map>>; // The prohibition on "overlap" and "conflict" explained above means // That only one of data, members or indexes is non-empty. std::optional> _content; bool is_empty() const { return !_content; } bool has_value() const { return _content && std::holds_alternative(*_content); } bool has_members() const { return _content && std::holds_alternative(*_content); } bool has_indexes() const { return _content && std::holds_alternative(*_content); } // get_members() assumes that has_members() is true members_t& get_members() { return std::get(*_content); } const members_t& get_members() const { return std::get(*_content); } indexes_t& get_indexes() { return std::get(*_content); } const indexes_t& get_indexes() const { return std::get(*_content); } T& get_value() { return std::get(*_content); } const T& get_value() const { return std::get(*_content); } }; template using attribute_path_map = std::unordered_map>; using attrs_to_get_node = attribute_path_map_node; // attrs_to_get lists which top-level attribute are needed, and possibly also // which part of the top-level attribute is really needed (when nested // attribute paths appeared in the query). // Most code actually uses optional. There, a disengaged // optional means we should get all attributes, not specific ones. using attrs_to_get = attribute_path_map; namespace parsed { class expression_cache; } class executor : public peering_sharded_service { gms::gossiper& _gossiper; service::storage_service& _ss; service::storage_proxy& _proxy; service::migration_manager& _mm; db::system_distributed_keyspace& _sdks; cdc::metadata& _cdc_metadata; utils::updateable_value _enforce_authorization; utils::updateable_value _warn_authorization; // An smp_service_group to be used for limiting the concurrency when // forwarding Alternator request between shards - if necessary for LWT. smp_service_group _ssg; std::unique_ptr _parsed_expression_cache; struct describe_table_info_manager; std::unique_ptr _describe_table_info_manager; future<> cache_newly_calculated_size_on_all_shards(schema_ptr schema, std::uint64_t size_in_bytes, std::chrono::nanoseconds ttl); future<> fill_table_size(rjson::value &table_description, schema_ptr schema, bool deleting); public: using client_state = service::client_state; // request_return_type is the return type of the executor methods, which // can be one of: // 1. A string, which is the response body for the request. // 2. A body_writer, an asynchronous function (returning future<>) that // takes an output_stream and writes the response body into it. // 3. An api_error, which is an error response that should be returned to // the client. // The body_writer is used for streaming responses, where the response body // is written in chunks to the output_stream. This allows for efficient // handling of large responses without needing to allocate a large buffer // in memory. using body_writer = noncopyable_function(output_stream&&)>; using request_return_type = std::variant; stats _stats; // The metric_groups object holds this stat object's metrics registered // as long as the stats object is alive. seastar::metrics::metric_groups _metrics; static constexpr auto ATTRS_COLUMN_NAME = ":attrs"; static constexpr auto KEYSPACE_NAME_PREFIX = "alternator_"; static constexpr std::string_view INTERNAL_TABLE_PREFIX = ".scylla.alternator."; executor(gms::gossiper& gossiper, service::storage_proxy& proxy, service::storage_service& ss, service::migration_manager& mm, db::system_distributed_keyspace& sdks, cdc::metadata& cdc_metadata, smp_service_group ssg, utils::updateable_value default_timeout_in_ms); ~executor(); future create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future update_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future put_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future delete_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future update_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future list_tables(client_state& client_state, service_permit permit, rjson::value request); future scan(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future describe_endpoints(client_state& client_state, service_permit permit, rjson::value request, std::string host_header); future batch_write_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future batch_get_item(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future query(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request); future tag_resource(client_state& client_state, service_permit permit, rjson::value request); future untag_resource(client_state& client_state, service_permit permit, rjson::value request); future list_tags_of_resource(client_state& client_state, service_permit permit, rjson::value request); future update_time_to_live(client_state& client_state, service_permit permit, rjson::value request); future describe_time_to_live(client_state& client_state, service_permit permit, rjson::value request); future list_streams(client_state& client_state, service_permit permit, rjson::value request); future describe_stream(client_state& client_state, service_permit permit, rjson::value request); future get_shard_iterator(client_state& client_state, service_permit permit, rjson::value request); future get_records(client_state& client_state, tracing::trace_state_ptr, service_permit permit, rjson::value request); future describe_continuous_backups(client_state& client_state, service_permit permit, rjson::value request); future<> start(); future<> stop(); static sstring table_name(const schema&); static db::timeout_clock::time_point default_timeout(); private: static thread_local utils::updateable_value s_default_timeout_in_ms; public: static schema_ptr find_table(service::storage_proxy&, std::string_view table_name); static schema_ptr find_table(service::storage_proxy&, const rjson::value& request); private: friend class rmw_operation; static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map * = nullptr, const std::map *tags = nullptr); future fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit); future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode); future<> do_batch_write( std::vector> mutation_builders, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit); future<> cas_write(schema_ptr schema, service::cas_shard cas_shard, const dht::decorated_key& dk, const std::vector& mutation_builders, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit); public: static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map&, const std::map *tags = nullptr); static std::optional describe_single_item(schema_ptr, const query::partition_slice&, const cql3::selection::selection&, const query::result&, const std::optional&, uint64_t* = nullptr); // Converts a multi-row selection result to JSON compatible with DynamoDB. // For each row, this method calls item_callback, which takes the size of // the item as the parameter. static future> describe_multi_item(schema_ptr schema, const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, shared_ptr> attrs_to_get, noncopyable_function item_callback = {}); static void describe_single_item(const cql3::selection::selection&, const std::vector&, const std::optional&, rjson::value&, uint64_t* item_length_in_bytes = nullptr, bool = false); static bool add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp); static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp); static void supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp); }; // is_big() checks approximately if the given JSON value is "bigger" than // the given big_size number of bytes. The goal is to *quickly* detect // oversized JSON that, for example, is too large to be serialized to a // contiguous string - we don't need an accurate size for that. Moreover, // as soon as we detect that the JSON is indeed "big", we can return true // and don't need to continue calculating its exact size. // For simplicity, we use a recursive implementation. This is fine because // Alternator limits the depth of JSONs it reads from inputs, and doesn't // add more than a couple of levels in its own output construction. bool is_big(const rjson::value& val, int big_size = 100'000); // Check CQL's Role-Based Access Control (RBAC) permission (MODIFY, // SELECT, DROP, etc.) on the given table. When permission is denied an // appropriate user-readable api_error::access_denied is thrown. future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats); /** * Make return type for serializing the object "streamed", * i.e. direct to HTTP output stream. Note: only useful for * (very) large objects as there are overhead issues with this * as well, but for massive lists of return objects this can * help avoid large allocations/many re-allocs */ executor::body_writer make_streamed(rjson::value&&); }