diff --git a/alternator/CMakeLists.txt b/alternator/CMakeLists.txt index 80edc23bc1..515e598b09 100644 --- a/alternator/CMakeLists.txt +++ b/alternator/CMakeLists.txt @@ -10,6 +10,7 @@ target_sources(alternator server.cc executor.cc executor_read.cc + executor_util.cc stats.cc serialization.cc expressions.cc diff --git a/alternator/executor.cc b/alternator/executor.cc index b4c151b13f..fa7e58c5ea 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -9,6 +9,7 @@ #include #include #include "alternator/executor.hh" +#include "alternator/executor_util.hh" #include "alternator/consumed_capacity.hh" #include "auth/permission.hh" #include "auth/resource.hh" @@ -68,6 +69,7 @@ #include "index/secondary_index.hh" #include "alternator/ttl_tag.hh" #include "vector_search/vector_store_client.hh" +#include "utils/simple_value_with_expiry.hh" using namespace std::chrono_literals; @@ -121,7 +123,7 @@ extern const sstring TTL_TAG_KEY("system:ttl_attribute"); // the SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY is set the user didn't specify any key and // base table's keys were added as range keys. In all other cases either the first key is the user specified key, // following ones are base table's keys added as needed or range key list will be empty. -static const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY("system:spurious_range_key_added_to_gsi_and_user_didnt_specify_range_key"); +extern const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY("system:spurious_range_key_added_to_gsi_and_user_didnt_specify_range_key"); // The following tags also have the "system:" prefix but are NOT used // by Alternator to store table properties - only the user ever writes to @@ -180,49 +182,12 @@ void executor::maybe_audit( static lw_shared_ptr create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type, const std::map& tags_map, const gms::feature_service& feat, const db::tablets_mode_t::mode tablets_mode); -map_type attrs_type() { - static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true); - return t; -} - static const column_definition& attrs_column(const schema& schema) { const column_definition* cdef = schema.get_column_definition(bytes(executor::ATTRS_COLUMN_NAME)); throwing_assert(cdef); return *cdef; } - -lw_shared_ptr get_stats_from_schema(service::storage_proxy& sp, const schema& schema) { - try { - replica::table& table = sp.local_db().find_column_family(schema.id()); - if (!table.get_stats().alternator_stats) { - table.get_stats().alternator_stats = seastar::make_shared(schema.ks_name(), schema.cf_name()); - } - return table.get_stats().alternator_stats->_stats; - } catch (std::runtime_error&) { - // If we're here it means that a table we are currently working on was deleted before the - // operation completed, returning a temporary object is fine, if the table get deleted so will its metrics - return make_lw_shared(); - } -} - -executor::body_writer make_streamed(rjson::value&& value) { - return [value = std::move(value)](output_stream&& _out) mutable -> future<> { - auto out = std::move(_out); - std::exception_ptr ex; - try { - co_await rjson::print(value, out); - } catch (...) { - ex = std::current_exception(); - } - co_await out.close(); - co_await rjson::destroy_gently(std::move(value)); - if (ex) { - co_await coroutine::return_exception_ptr(std::move(ex)); - } - }; -} - // This function throws api_error::validation if input value is not an object. static void validate_is_object(const rjson::value& value, const char* caller) { if (!value.IsObject()) { @@ -361,212 +326,6 @@ void executor::supplement_table_info(rjson::value& descr, const schema& schema, executor::supplement_table_stream_info(descr, schema, sp); } -// We would have liked to support table names up to 255 bytes, like DynamoDB. -// But Scylla creates a directory whose name is the table's name plus 33 -// bytes (dash and UUID), and since directory names are limited to 255 bytes, -// we need to limit table names to 222 bytes, instead of 255. -// See https://github.com/scylladb/scylla/issues/4480 -// We actually have two limits here, -// * max_table_name_length is the limit that Alternator will impose on names -// of new Alternator tables. -// * max_auxiliary_table_name_length is the potentially higher absolute limit -// that Scylla imposes on the names of auxiliary tables that Alternator -// wants to create internally - i.e. materialized views or CDC log tables. -// The second limit might mean that it is not possible to add a GSI to an -// existing table, because the name of the new auxiliary table may go over -// the limit. The second limit is also one of the reasons why the first limit -// is set lower than 222 - to have room to enable streams which add the extra -// suffix "_scylla_cdc_log" to the table name. -static constexpr int max_table_name_length = 192; -static constexpr int max_auxiliary_table_name_length = 222; - -static bool valid_table_name_chars(std::string_view name) { - for (auto c : name) { - if ((c < 'a' || c > 'z') && - (c < 'A' || c > 'Z') && - (c < '0' || c > '9') && - c != '_' && - c != '-' && - c != '.') { - return false; - } - } - return true; -} - -// validate_table_name() validates the TableName parameter in a request - it -// should only be called in CreateTable or when a request looking for an -// existing table failed to find it. validate_table_name() throws the -// appropriate api_error if this validation fails. -// The DynamoDB developer guide, https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules -// specifies that table "names must be between 3 and 255 characters long and -// can contain only the following characters: a-z, A-Z, 0-9, _ (underscore), -// - (dash), . (dot)". However, Alternator only allows max_table_name_length -// characters (see above) - not 255. -static void validate_table_name(std::string_view name, const char* source="TableName") { - if (name.length() < 3 || name.length() > max_table_name_length) { - throw api_error::validation( - format("{} must be at least 3 characters long and at most {} characters long", source, max_table_name_length)); - } - if (!valid_table_name_chars(name)) { - throw api_error::validation( - format("{} must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", source)); - } -} - -// Validate that a CDC log table could be created for the base table with a -// given table_name, and if not, throw a user-visible api_error::validation. -// It is not possible to create a CDC log table if the table name is so long -// that adding the 15-character suffix "_scylla_cdc_log" (cdc_log_suffix) -// makes it go over max_auxiliary_table_name_length. -// Note that if max_table_name_length is set to less than 207 (which is -// max_auxiliary_table_name_length-15), then this function will never -// fail. However, it's still important to call it in UpdateTable, in case -// we have pre-existing tables with names longer than this to avoid #24598. -static void validate_cdc_log_name_length(std::string_view table_name) { - if (cdc::log_name(table_name).length() > max_auxiliary_table_name_length) { - // CDC will add cdc_log_suffix ("_scylla_cdc_log") to the table name - // to create its log table, and this will exceed the maximum allowed - // length. To provide a more helpful error message, we assume that - // cdc::log_name() always adds a suffix of the same length. - int suffix_len = cdc::log_name(table_name).length() - table_name.length(); - throw api_error::validation(fmt::format("Streams or vector search cannot be enabled on a table whose name is longer than {} characters: {}", - max_auxiliary_table_name_length - suffix_len, table_name)); - } -} - -// In DynamoDB index names are local to a table, while in Scylla, materialized -// view names are global (in a keyspace). So we need to compose a unique name -// for the view taking into account both the table's name and the index name. -// We concatenate the table and index name separated by a delim character -// (a character not allowed by DynamoDB in ordinary table names, default: ":"). -// The downside of this approach is that it limits the sum of the lengths, -// instead of each component individually as DynamoDB does. -// The view_name() function assumes the table_name has already been validated -// but validates the legality of index_name and the combination of both. -std::string view_name(std::string_view table_name, std::string_view index_name, const std::string& delim, bool validate_len) { - if (index_name.length() < 3) { - throw api_error::validation("IndexName must be at least 3 characters long"); - } - if (!valid_table_name_chars(index_name)) { - throw api_error::validation( - fmt::format("IndexName '{}' must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", index_name)); - } - std::string ret = std::string(table_name) + delim + std::string(index_name); - if (ret.length() > max_auxiliary_table_name_length && validate_len) { - throw api_error::validation( - fmt::format("The total length of TableName ('{}') and IndexName ('{}') cannot exceed {} characters", - table_name, index_name, max_auxiliary_table_name_length - delim.size())); - } - return ret; -} - -std::string gsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) { - return view_name(table_name, index_name, ":", validate_len); -} - -std::string lsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) { - return view_name(table_name, index_name, "!:", validate_len); -} - -/** Extract table name from a request. - * Most requests expect the table's name to be listed in a "TableName" field. - * This convenience function returns the name or api_error in case the - * table name is missing or not a string. - */ -static std::optional find_table_name(const rjson::value& request) { - const rjson::value* table_name_value = rjson::find(request, "TableName"); - if (!table_name_value) { - return std::nullopt; - } - if (!table_name_value->IsString()) { - throw api_error::validation("Non-string TableName field in request"); - } - std::string table_name = rjson::to_string(*table_name_value); - return table_name; -} - -std::string get_table_name(const rjson::value& request) { - auto name = find_table_name(request); - if (!name) { - throw api_error::validation("Missing TableName field in request"); - } - return *name; -} - -/** Extract table schema from a request. - * Many requests expect the table's name to be listed in a "TableName" field - * and need to look it up as an existing table. This convenience function - * does this, with the appropriate validation and api_error in case the table - * name is missing, invalid or the table doesn't exist. If everything is - * successful, it returns the table's schema. - */ -schema_ptr executor::find_table(service::storage_proxy& proxy, const rjson::value& request) { - auto table_name = find_table_name(request); - if (!table_name) { - return nullptr; - } - return find_table(proxy, *table_name); -} - -schema_ptr executor::find_table(service::storage_proxy& proxy, std::string_view table_name) { - try { - return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + sstring(table_name), table_name); - } catch(data_dictionary::no_such_column_family&) { - // DynamoDB returns validation error even when table does not exist - // and the table name is invalid. - validate_table_name(table_name); - - throw api_error::resource_not_found( - fmt::format("Requested resource not found: Table: {} not found", table_name)); - } -} - -schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request) { - auto schema = executor::find_table(proxy, request); - if (!schema) { - // if we get here then the name was missing, since syntax or missing actual CF - // checks throw. Slow path, but just call get_table_name to generate exception. - get_table_name(request); - } - return schema; -} - -// try_get_internal_table() handles the special case that the given table_name -// begins with INTERNAL_TABLE_PREFIX (".scylla.alternator."). In that case, -// this function assumes that the rest of the name refers to an internal -// Scylla table (e.g., system table) and returns the schema of that table - -// or an exception if it doesn't exist. Otherwise, if table_name does not -// start with INTERNAL_TABLE_PREFIX, this function returns an empty schema_ptr -// and the caller should look for a normal Alternator table with that name. -schema_ptr try_get_internal_table(data_dictionary::database db, std::string_view table_name) { - size_t it = table_name.find(executor::INTERNAL_TABLE_PREFIX); - if (it != 0) { - return schema_ptr{}; - } - table_name.remove_prefix(executor::INTERNAL_TABLE_PREFIX.size()); - size_t delim = table_name.find_first_of('.'); - if (delim == std::string_view::npos) { - return schema_ptr{}; - } - std::string_view ks_name = table_name.substr(0, delim); - table_name.remove_prefix(ks_name.size() + 1); - // Only internal keyspaces can be accessed to avoid leakage - auto ks = db.try_find_keyspace(ks_name); - if (!ks || !ks->is_internal()) { - return schema_ptr{}; - } - try { - return db.find_schema(ks_name, table_name); - } catch (data_dictionary::no_such_column_family&) { - // DynamoDB returns validation error even when table does not exist - // and the table name is invalid. - validate_table_name(table_name); - throw api_error::resource_not_found( - fmt::format("Requested resource not found: Internal table: {}.{} not found", ks_name, table_name)); - } -} - // get_table_for_write() is similar to get_table(), but additionally, if the // configuration allows this, may also allow writing to system table with // prefix INTERNAL_TABLE_PREFIX. See also get_table_or_view() in @@ -583,90 +342,7 @@ static schema_ptr get_table_for_write(service::storage_proxy& proxy, const rjson } return s; } - return executor::find_table(proxy, table_name); -} - -// Convenience function for getting the value of a string attribute, or a -// default value if it is missing. If the attribute exists, but is not a -// string, a descriptive api_error is thrown. -static std::string get_string_attribute(const rjson::value& value, std::string_view attribute_name, const char* default_return) { - const rjson::value* attribute_value = rjson::find(value, attribute_name); - if (!attribute_value) - return default_return; - if (!attribute_value->IsString()) { - throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}", - attribute_name, value)); - } - return rjson::to_string(*attribute_value); -} - -// Convenience function for getting the value of a boolean attribute, or a -// default value if it is missing. If the attribute exists, but is not a -// bool, a descriptive api_error is thrown. -bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return) { - const rjson::value* attribute_value = rjson::find(value, attribute_name); - if (!attribute_value) { - return default_return; - } - if (!attribute_value->IsBool()) { - throw api_error::validation(fmt::format("Expected boolean value for attribute {}, got: {}", - attribute_name, value)); - } - return attribute_value->GetBool(); -} - -// Convenience function for getting the value of an integer attribute, or -// an empty optional if it is missing. If the attribute exists, but is not -// an integer, a descriptive api_error is thrown. -std::optional get_int_attribute(const rjson::value& value, std::string_view attribute_name) { - const rjson::value* attribute_value = rjson::find(value, attribute_name); - if (!attribute_value) - return {}; - if (!attribute_value->IsInt()) { - throw api_error::validation(fmt::format("Expected integer value for attribute {}, got: {}", - attribute_name, value)); - } - return attribute_value->GetInt(); -} - -// Sets a KeySchema object inside the given JSON parent describing the key -// attributes of the given schema as being either HASH or RANGE keys. -// Additionally, adds to a given map mappings between the key attribute -// names and their type (as a DynamoDB type string). -void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map* attribute_types, const std::map *tags) { - rjson::value key_schema = rjson::empty_array(); - const bool ignore_range_keys_as_spurious = tags != nullptr && tags->contains(SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY); - - for (const column_definition& cdef : schema.partition_key_columns()) { - rjson::value key = rjson::empty_object(); - rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text())); - rjson::add(key, "KeyType", "HASH"); - rjson::push_back(key_schema, std::move(key)); - if (attribute_types) { - (*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type); - } - } - if (!ignore_range_keys_as_spurious) { - // NOTE: user requested key (there can be at most one) will always come first. - // There might be more keys following it, which were added, but those were - // not requested by the user, so we ignore them. - for (const column_definition& cdef : schema.clustering_key_columns()) { - rjson::value key = rjson::empty_object(); - rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text())); - rjson::add(key, "KeyType", "RANGE"); - rjson::push_back(key_schema, std::move(key)); - if (attribute_types) { - (*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type); - } - break; - } - } - rjson::add(parent, "KeySchema", std::move(key_schema)); - -} - -void executor::describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map& attribute_types, const std::map *tags) { - describe_key_schema(parent, schema, &attribute_types, tags); + return find_table(proxy, table_name); } static rjson::value generate_arn_for_table(const schema& schema) { @@ -849,7 +525,7 @@ future executor::fill_table_description(schema_ptr schema, table_s rjson::add(table_description, "CreationDateTime", rjson::value(creation_timestamp)); std::unordered_map key_attribute_types; // Add base table's KeySchema and collect types for AttributeDefinitions: - executor::describe_key_schema(table_description, *schema, key_attribute_types, tags_ptr); + describe_key_schema(table_description, *schema, &key_attribute_types, tags_ptr); if (!t.views().empty()) { rjson::value gsi_array = rjson::empty_array(); rjson::value lsi_array = rjson::empty_array(); @@ -865,7 +541,7 @@ future executor::fill_table_description(schema_ptr schema, table_s rjson::add(view_entry, "IndexName", rjson::from_string(index_name)); rjson::add(view_entry, "IndexArn", generate_arn_for_index(*schema, index_name)); // Add index's KeySchema and collect types for AttributeDefinitions: - executor::describe_key_schema(view_entry, *vptr, key_attribute_types, db::get_tags_of_table(vptr)); + describe_key_schema(view_entry, *vptr, &key_attribute_types, db::get_tags_of_table(vptr)); // Add projection type rjson::value projection = rjson::empty_object(); rjson::add(projection, "ProjectionType", "ALL"); @@ -965,14 +641,6 @@ future executor::fill_table_description(schema_ptr schema, table_s co_return table_description; } -bool is_alternator_keyspace(const sstring& ks_name) { - return ks_name.find(executor::KEYSPACE_NAME_PREFIX) == 0; -} - -sstring executor::table_name(const schema& s) { - return s.cf_name(); -} - future executor::describe_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr& audit_info) { _stats.api_operations.describe_table++; elogger.trace("Describing table {}", request); @@ -991,91 +659,6 @@ future executor::describe_table(client_state& cli co_return rjson::print(std::move(response)); } -// This function increments the authorization_failures counter, and may also -// log a warn-level message and/or throw an access_denied exception, depending -// on what enforce_authorization and warn_authorization are set to. -// Note that if enforce_authorization is false, this function will return -// without throwing. So a caller that doesn't want to continue after an -// authorization_error must explicitly return after calling this function. -static void authorization_error(alternator::stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) { - stats.authorization_failures++; - if (enforce_authorization) { - if (warn_authorization) { - elogger.warn("alternator_warn_authorization=true: {}", msg); - } - throw api_error::access_denied(std::move(msg)); - } else { - if (warn_authorization) { - elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg); - } - } -} - -// Check CQL's Role-Based Access Control (RBAC) permission_to_check (MODIFY, -// SELECT, DROP, etc.) on the given table. When permission is denied an -// appropriate user-readable api_error::access_denied is thrown. -future<> verify_permission( - bool enforce_authorization, - bool warn_authorization, - const service::client_state& client_state, - const schema_ptr& schema, - auth::permission permission_to_check, - alternator::stats& stats) { - if (!enforce_authorization && !warn_authorization) { - co_return; - } - // Unfortunately, the fix for issue #23218 did not modify the function - // that we use here - check_has_permissions(). So if we want to allow - // writes to internal tables (from try_get_internal_table()) only to a - // superuser, we need to explicitly check it here. - if (permission_to_check == auth::permission::MODIFY && is_internal_keyspace(schema->ks_name())) { - if (!client_state.user() || - !client_state.user()->name || - !co_await client_state.get_auth_service()->underlying_role_manager().is_superuser(*client_state.user()->name)) { - sstring username = ""; - if (client_state.user() && client_state.user()->name) { - username = client_state.user()->name.value(); - } - authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( - "Write access denied on internal table {}.{} to role {} because it is not a superuser", - schema->ks_name(), schema->cf_name(), username)); - co_return; - } - } - auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name()); - if (!client_state.user() || !client_state.user()->name || - !co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { - sstring username = ""; - if (client_state.user() && client_state.user()->name) { - username = client_state.user()->name.value(); - } - // Using exceptions for errors makes this function faster in the - // success path (when the operation is allowed). - authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( - "{} access on table {}.{} is denied to role {}, client address {}", - auth::permissions::to_string(permission_to_check), - schema->ks_name(), schema->cf_name(), username, client_state.get_client_address())); - } -} - -// Similar to verify_permission() above, but just for CREATE operations. -// Those do not operate on any specific table, so require permissions on -// ALL KEYSPACES instead of any specific table. -static future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, alternator::stats& stats) { - if (!enforce_authorization && !warn_authorization) { - co_return; - } - auto resource = auth::resource(auth::resource_kind::data); - if (!co_await client_state.check_has_permission(auth::command_desc(auth::permission::CREATE, resource))) { - sstring username = ""; - if (client_state.user() && client_state.user()->name) { - username = client_state.user()->name.value(); - } - authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( - "CREATE access on ALL KEYSPACES is denied to role {}", username)); - } -} - future executor::delete_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request, std::unique_ptr& audit_info) { _stats.api_operations.delete_table++; elogger.trace("Deleting table {}", request); @@ -1476,15 +1059,6 @@ static void update_tags_map(const rjson::value& tags, std::map validate_tags(tags_map); } -const std::map& get_tags_of_table_or_throw(schema_ptr schema) { - auto tags_ptr = db::get_tags_of_table(schema); - if (tags_ptr) { - return *tags_ptr; - } else { - throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name())); - } -} - future executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request, std::unique_ptr& audit_info) { _stats.api_operations.tag_resource++; @@ -2645,15 +2219,6 @@ public: } }; -// After calling pk_from_json() and ck_from_json() to extract the pk and ck -// components of a key, and if that succeeded, call check_key() to further -// check that the key doesn't have any spurious components. -void check_key(const rjson::value& key, const schema_ptr& schema) { - if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) { - throw api_error::validation("Given key attribute not in schema"); - } -} - // Verify that a value parsed from the user input is legal. In particular, // we check that the value is not an empty set, string or bytes - which is // (somewhat artificially) forbidden by DynamoDB. @@ -3103,7 +2668,7 @@ std::optional rmw_operation::apply(foreign_ptrrow_count()) { auto selection = cql3::selection::selection::wildcard(_schema); uint64_t item_length = 0; - auto previous_item = executor::describe_single_item(_schema, slice, *selection, *qr, {}, &item_length); + auto previous_item = describe_single_item(_schema, slice, *selection, *qr, {}, &item_length); if (_consumed_capacity._total_bytes < item_length) { _consumed_capacity._total_bytes = item_length; } @@ -3189,7 +2754,7 @@ static future> get_previous_item( command->allow_limit = db::allow_per_partition_rate_limit::yes; return proxy.query(schema, command, to_partition_ranges(*schema, pk), cl, service::storage_proxy::coordinator_query_options(executor::default_timeout(), std::move(permit), client_state)).then( [schema, command, selection = std::move(selection), &item_length] (service::storage_proxy::coordinator_query_result qr) { - auto previous_item = executor::describe_single_item(schema, command->slice, *selection, *qr.query_result, {}, &item_length); + auto previous_item = describe_single_item(schema, command->slice, *selection, *qr.query_result, {}, &item_length); if (previous_item) { return make_ready_future>(std::make_unique(std::move(*previous_item))); } else { @@ -3313,22 +2878,6 @@ static bool check_needs_read_before_write(const parsed::condition_expression& co return !condition_expression.empty(); } -// Fail the expression if it has unused attribute names or values. This is -// how DynamoDB behaves, so we do too. -void verify_all_are_used(const rjson::value* field, - const std::unordered_set& used, const char* field_name, const char* operation) { - if (!field) { - return; - } - for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) { - if (!used.contains(rjson::to_string(it->name))) { - throw api_error::validation( - format("{} has spurious '{}', not used in {}", - field_name, rjson::to_string_view(it->name), operation)); - } - } -} - class put_item_operation : public rmw_operation { private: put_or_delete_item _mutation_builder; @@ -3557,18 +3106,6 @@ future executor::delete_item(client_state& client co_return res; } -schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) { - sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings - try { - return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name); - } catch(data_dictionary::no_such_column_family&) { - // DynamoDB returns validation error even when table does not exist - // and the table name is invalid. - validate_table_name(table_name); - throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name)); - } -} - using primary_key = std::pair; struct primary_key_hash { schema_ptr _s; @@ -3942,117 +3479,6 @@ static const std::string_view get_item_type_string(const rjson::value& v) { return rjson::to_string_view(mem.name); } -/** - * Helper routine to extract data when we already have - * row, etc etc. - * - * Note: include_all_embedded_attributes means we should - * include all values in the `ATTRS_COLUMN_NAME` map column. - * - * We could change the behaviour to simply include all values - * from this column if the `ATTRS_COLUMN_NAME` is explicit in - * `attrs_to_get`, but I am scared to do that now in case - * there is some corner case in existing code. - * - * Explicit bool means we can be sure all previous calls are - * as before. - */ -void executor::describe_single_item(const cql3::selection::selection& selection, - const std::vector& result_row, - const std::optional& attrs_to_get, - rjson::value& item, - uint64_t* item_length_in_bytes, - bool include_all_embedded_attributes) -{ - const auto& columns = selection.get_columns(); - auto column_it = columns.begin(); - for (const managed_bytes_opt& cell : result_row) { - if (!cell) { - ++column_it; - continue; - } - std::string column_name = (*column_it)->name_as_text(); - if (column_name != executor::ATTRS_COLUMN_NAME) { - if (item_length_in_bytes) { - (*item_length_in_bytes) += column_name.length() + cell->size(); - } - if (!attrs_to_get || attrs_to_get->contains(column_name)) { - // item is expected to start empty, and column_name are unique - // so add() makes sense - rjson::add_with_string_name(item, column_name, rjson::empty_object()); - rjson::value& field = item[column_name.c_str()]; - cell->with_linearized([&] (bytes_view linearized_cell) { - rjson::add_with_string_name(field, type_to_string((*column_it)->type), json_key_column_value(linearized_cell, **column_it)); - }); - } - } else { - auto deserialized = attrs_type()->deserialize(*cell); - auto keys_and_values = value_cast(deserialized); - for (auto entry : keys_and_values) { - std::string attr_name = value_cast(entry.first); - if (item_length_in_bytes) { - (*item_length_in_bytes) += attr_name.length(); - } - if (include_all_embedded_attributes || !attrs_to_get || attrs_to_get->contains(attr_name)) { - bytes value = value_cast(entry.second); - if (item_length_in_bytes && value.length()) { - // ScyllaDB uses one extra byte compared to DynamoDB for the bytes length - (*item_length_in_bytes) += value.length() - 1; - } - rjson::value v = deserialize_item(value); - if (attrs_to_get) { - auto it = attrs_to_get->find(attr_name); - if (it != attrs_to_get->end()) { - // attrs_to_get may have asked for only part of - // this attribute. hierarchy_filter() modifies v, - // and returns false when nothing is to be kept. - if (!hierarchy_filter(v, it->second)) { - continue; - } - } - } - // item is expected to start empty, and attribute - // names are unique so add() makes sense - rjson::add_with_string_name(item, attr_name, std::move(v)); - } else if (item_length_in_bytes) { - (*item_length_in_bytes) += value_cast(entry.second).length() - 1; - } - } - } - ++column_it; - } -} - -std::optional executor::describe_single_item(schema_ptr schema, - const query::partition_slice& slice, - const cql3::selection::selection& selection, - const query::result& query_result, - const std::optional& attrs_to_get, - uint64_t* item_length_in_bytes) { - rjson::value item = rjson::empty_object(); - - cql3::selection::result_set_builder builder(selection, gc_clock::now()); - query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); - - auto result_set = builder.build(); - if (result_set->empty()) { - if (item_length_in_bytes) { - // empty results is counted as having a minimal length (e.g. 1 byte). - (*item_length_in_bytes) += 1; - } - // If there is no matching item, we're supposed to return an empty - // object without an Item member - not one with an empty Item member - return {}; - } - if (result_set->size() > 1) { - // If the result set contains multiple rows, the code should have - // called describe_multi_item(), not this function. - throw std::logic_error("describe_single_item() asked to describe multiple items"); - } - describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item, item_length_in_bytes); - return item; -} - static bool check_needs_read_before_write(const parsed::value& v) { return std::visit(overloaded_functor { [&] (const parsed::constant& c) -> bool { @@ -4840,60 +4266,6 @@ future executor::update_item(client_state& client co_return res; } -static void check_big_object(const rjson::value& val, int& size_left); -static void check_big_array(const rjson::value& val, int& size_left); - -bool is_big(const rjson::value& val, int big_size) { - if (val.IsString()) { - return ssize_t(val.GetStringLength()) > big_size; - } else if (val.IsObject()) { - check_big_object(val, big_size); - return big_size < 0; - } else if (val.IsArray()) { - check_big_array(val, big_size); - return big_size < 0; - } - return false; -} - -static void check_big_array(const rjson::value& val, int& size_left) { - // Assume a fixed size of 10 bytes for each number, boolean, etc., or - // beginning of a sub-object. This doesn't have to be accurate. - size_left -= 10 * val.Size(); - for (const auto& v : val.GetArray()) { - if (size_left < 0) { - return; - } - // Note that we avoid recursive calls for the leaves (anything except - // array or object) because usually those greatly outnumber the trunk. - if (v.IsString()) { - size_left -= v.GetStringLength(); - } else if (v.IsObject()) { - check_big_object(v, size_left); - } else if (v.IsArray()) { - check_big_array(v, size_left); - } - } -} - -static void check_big_object(const rjson::value& val, int& size_left) { - size_left -= 10 * val.MemberCount(); - for (const auto& m : val.GetObject()) { - if (size_left < 0) { - return; - } - size_left -= m.name.GetStringLength(); - if (m.value.IsString()) { - size_left -= m.value.GetStringLength(); - } else if (m.value.IsObject()) { - check_big_object(m.value, size_left); - } else if (m.value.IsArray()) { - check_big_array(m.value, size_left); - } - } -} - - future executor::list_tables(client_state& client_state, service_permit permit, rjson::value request, std::unique_ptr& audit_info) { _stats.api_operations.list_tables++; elogger.trace("Listing tables {}", request); diff --git a/alternator/executor.hh b/alternator/executor.hh index 524354d1f8..3de07fd7b0 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -11,6 +11,7 @@ #include #include "audit/audit.hh" #include "seastarx.hh" +#include #include #include @@ -21,13 +22,16 @@ #include "db/config.hh" #include "alternator/error.hh" -#include "stats.hh" +#include "alternator/attribute_path.hh" +#include "alternator/stats.hh" +#include "alternator/executor_util.hh" + #include "utils/rjson.hh" #include "utils/updateable_value.hh" -#include "utils/simple_value_with_expiry.hh" #include "tracing/trace_state.hh" + namespace db { class system_distributed_keyspace; } @@ -67,7 +71,6 @@ class gossiper; class schema_builder; -#include "alternator/attribute_path.hh" namespace alternator { @@ -75,11 +78,6 @@ enum class table_status; class rmw_operation; class put_or_delete_item; -schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request); -bool is_alternator_keyspace(const sstring& ks_name); -// Wraps the db::get_tags_of_table and throws if the table is missing the tags extension. -const std::map& get_tags_of_table_or_throw(schema_ptr schema); - namespace parsed { class expression_cache; } @@ -119,7 +117,6 @@ public: // is written in chunks to the output_stream. This allows for efficient // handling of large responses without needing to allocate a large buffer // in memory. - using body_writer = noncopyable_function(output_stream&&)>; using request_return_type = std::variant; stats _stats; // The metric_groups object holds this stat object's metrics registered @@ -168,15 +165,9 @@ public: future<> start(); future<> stop(); - static sstring table_name(const schema&); static db::timeout_clock::time_point default_timeout(); private: static thread_local utils::updateable_value s_default_timeout_in_ms; -public: - static schema_ptr find_table(service::storage_proxy&, std::string_view table_name); - static schema_ptr find_table(service::storage_proxy&, const rjson::value& request); - -private: friend class rmw_operation; // Helper to set up auditing for an Alternator operation. Checks whether @@ -190,7 +181,6 @@ private: const rjson::value& request, std::optional cl = std::nullopt); - static void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map * = nullptr, const std::map *tags = nullptr); future fill_table_description(schema_ptr schema, table_status tbl_status, service::client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit); future create_table_on_shard0(service::client_state&& client_state, tracing::trace_state_ptr trace_state, rjson::value request, bool enforce_authorization, bool warn_authorization, const db::tablets_mode_t::mode tablets_mode, std::unique_ptr& audit_info); @@ -206,62 +196,11 @@ private: tracing::trace_state_ptr trace_state, service_permit permit); public: - static void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map&, const std::map *tags = nullptr); - - static std::optional describe_single_item(schema_ptr, - const query::partition_slice&, - const cql3::selection::selection&, - const query::result&, - const std::optional&, - uint64_t* = nullptr); - - // Converts a multi-row selection result to JSON compatible with DynamoDB. - // For each row, this method calls item_callback, which takes the size of - // the item as the parameter. - static future> describe_multi_item(schema_ptr schema, - const query::partition_slice&& slice, - shared_ptr selection, - foreign_ptr> query_result, - shared_ptr> attrs_to_get, - noncopyable_function item_callback = {}); - - static void describe_single_item(const cql3::selection::selection&, - const std::vector&, - const std::optional&, - rjson::value&, - uint64_t* item_length_in_bytes = nullptr, - bool = false); - static bool add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp); static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp); static void supplement_table_stream_info(rjson::value& descr, const schema& schema, const service::storage_proxy& sp); }; -// is_big() checks approximately if the given JSON value is "bigger" than -// the given big_size number of bytes. The goal is to *quickly* detect -// oversized JSON that, for example, is too large to be serialized to a -// contiguous string - we don't need an accurate size for that. Moreover, -// as soon as we detect that the JSON is indeed "big", we can return true -// and don't need to continue calculating its exact size. -// For simplicity, we use a recursive implementation. This is fine because -// Alternator limits the depth of JSONs it reads from inputs, and doesn't -// add more than a couple of levels in its own output construction. -bool is_big(const rjson::value& val, int big_size = 100'000); - -// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY, -// SELECT, DROP, etc.) on the given table. When permission is denied an -// appropriate user-readable api_error::access_denied is thrown. -future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, alternator::stats& stats); - -/** - * Make return type for serializing the object "streamed", - * i.e. direct to HTTP output stream. Note: only useful for - * (very) large objects as there are overhead issues with this - * as well, but for massive lists of return objects this can - * help avoid large allocations/many re-allocs - */ -executor::body_writer make_streamed(rjson::value&&); - // returns table creation time in seconds since epoch for `db_clock` double get_table_creation_time(const schema &schema); @@ -287,25 +226,4 @@ arn_parts parse_arn(std::string_view arn, std::string_view arn_field_name, std:: // The format is ks1|ks2|ks3... and table1|table2|table3... sstring print_names_for_audit(const std::set& names); - -map_type attrs_type(); -lw_shared_ptr get_stats_from_schema(service::storage_proxy& sp, const schema& schema); -std::string view_name(std::string_view table_name, std::string_view index_name, - const std::string& delim = ":", bool validate_len = true); -std::string gsi_name(std::string_view table_name, std::string_view index_name, - bool validate_len = true); -std::string lsi_name(std::string_view table_name, std::string_view index_name, - bool validate_len = true); -std::string get_table_name(const rjson::value& request); -schema_ptr try_get_internal_table(data_dictionary::database db, std::string_view table_name); -std::optional get_int_attribute(const rjson::value& value, std::string_view attribute_name); -bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return); -void check_key(const rjson::value& key, const schema_ptr& schema); -schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request); -void verify_all_are_used( - const rjson::value* field, - const std::unordered_set& used, - const char* field_name, - const char* operation); - } diff --git a/alternator/executor_read.cc b/alternator/executor_read.cc index 05bc63a2b7..1ba20f6aef 100644 --- a/alternator/executor_read.cc +++ b/alternator/executor_read.cc @@ -21,6 +21,7 @@ // ConsistentRead, ProjectionExpression, and more. #include "alternator/executor.hh" +#include "alternator/executor_util.hh" #include "alternator/conditions.hh" #include "alternator/expressions.hh" #include "alternator/consumed_capacity.hh" @@ -61,7 +62,7 @@ extern logging::logger elogger; // from executor.cc // If we ever fix RapidJSON to avoid contiguous allocations for arrays, or // replace it entirely (#24458), we can remove this function and the function // rjson::print_with_extra_array() which it calls. -static executor::body_writer make_streamed_with_extra_array(rjson::value&& value, +static body_writer make_streamed_with_extra_array(rjson::value&& value, std::string array_name, utils::chunked_vector&& array) { return [value = std::move(value), array_name = std::move(array_name), array = std::move(array)](output_stream&& _out) mutable -> future<> { auto out = std::move(_out); @@ -1470,7 +1471,7 @@ static future query_vector( db::consistency_level::LOCAL_ONE, service::storage_proxy::coordinator_query_options( timeout, permit, client_state, trace_state)); - auto opt_item = executor::describe_single_item(base_schema, partition_slice, + auto opt_item = describe_single_item(base_schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); if (opt_item && (!flt || flt.check(*opt_item))) { ++matched_count; @@ -1535,7 +1536,7 @@ static future query_vector( db::consistency_level::LOCAL_ONE, service::storage_proxy::coordinator_query_options( timeout, permit, client_state, trace_state)); - auto opt_item = executor::describe_single_item(base_schema, partition_slice, + auto opt_item = describe_single_item(base_schema, partition_slice, *selection, *qr.query_result, *attrs_to_get); if (opt_item && (!flt || flt.check(*opt_item))) { ++matched_count; @@ -1686,7 +1687,10 @@ future executor::query(client_state& client_state std::move(filter), opts, client_state, _stats, std::move(trace_state), std::move(permit), _enforce_authorization, _warn_authorization); } -future> executor::describe_multi_item(schema_ptr schema, +// Converts a multi-row selection result to JSON compatible with DynamoDB. +// For each row, this method calls item_callback, which takes the size of +// the item as the parameter. +static future> describe_multi_item(schema_ptr schema, const query::partition_slice&& slice, shared_ptr selection, foreign_ptr> query_result, @@ -1719,7 +1723,7 @@ static rjson::value describe_item(schema_ptr schema, const std::optional& attrs_to_get, consumed_capacity_counter& consumed_capacity, uint64_t& metric) { - std::optional opt_item = executor::describe_single_item(std::move(schema), slice, selection, std::move(query_result), attrs_to_get, &consumed_capacity._total_bytes); + std::optional opt_item = describe_single_item(std::move(schema), slice, selection, std::move(query_result), attrs_to_get, &consumed_capacity._total_bytes); rjson::value item_descr = rjson::empty_object(); if (opt_item) { rjson::add(item_descr, "Item", std::move(*opt_item)); @@ -1916,7 +1920,7 @@ future executor::batch_get_item(client_state& cli rjson::value consumed_capacity = rjson::empty_array(); for (size_t i = 0; i < requests.size(); i++) { const table_requests& rs = requests[i]; - std::string table = table_name(*rs.schema); + std::string table = rs.schema->cf_name(); if (should_audit) { table_names.insert(table); } diff --git a/alternator/executor_util.cc b/alternator/executor_util.cc new file mode 100644 index 0000000000..f7d9897d32 --- /dev/null +++ b/alternator/executor_util.cc @@ -0,0 +1,559 @@ +/* + * Copyright 2019-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +#include "alternator/executor_util.hh" +#include "alternator/executor.hh" +#include "alternator/error.hh" +#include "auth/resource.hh" +#include "auth/service.hh" +#include "cdc/log.hh" +#include "data_dictionary/data_dictionary.hh" +#include "db/tags/utils.hh" +#include "replica/database.hh" +#include "cql3/selection/selection.hh" +#include "cql3/result_set.hh" +#include "serialization.hh" +#include "service/storage_proxy.hh" +#include "types/map.hh" +#include + +namespace alternator { + +extern logging::logger elogger; // from executor.cc + +std::optional get_int_attribute(const rjson::value& value, std::string_view attribute_name) { + const rjson::value* attribute_value = rjson::find(value, attribute_name); + if (!attribute_value) + return {}; + if (!attribute_value->IsInt()) { + throw api_error::validation(fmt::format("Expected integer value for attribute {}, got: {}", + attribute_name, value)); + } + return attribute_value->GetInt(); +} + +std::string get_string_attribute(const rjson::value& value, std::string_view attribute_name, const char* default_return) { + const rjson::value* attribute_value = rjson::find(value, attribute_name); + if (!attribute_value) + return default_return; + if (!attribute_value->IsString()) { + throw api_error::validation(fmt::format("Expected string value for attribute {}, got: {}", + attribute_name, value)); + } + return rjson::to_string(*attribute_value); +} + +bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return) { + const rjson::value* attribute_value = rjson::find(value, attribute_name); + if (!attribute_value) { + return default_return; + } + if (!attribute_value->IsBool()) { + throw api_error::validation(fmt::format("Expected boolean value for attribute {}, got: {}", + attribute_name, value)); + } + return attribute_value->GetBool(); +} + +std::optional find_table_name(const rjson::value& request) { + const rjson::value* table_name_value = rjson::find(request, "TableName"); + if (!table_name_value) { + return std::nullopt; + } + if (!table_name_value->IsString()) { + throw api_error::validation("Non-string TableName field in request"); + } + std::string table_name = rjson::to_string(*table_name_value); + return table_name; +} + +std::string get_table_name(const rjson::value& request) { + auto name = find_table_name(request); + if (!name) { + throw api_error::validation("Missing TableName field in request"); + } + return *name; +} + +schema_ptr find_table(service::storage_proxy& proxy, const rjson::value& request) { + auto table_name = find_table_name(request); + if (!table_name) { + return nullptr; + } + return find_table(proxy, *table_name); +} + +schema_ptr find_table(service::storage_proxy& proxy, std::string_view table_name) { + try { + return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + sstring(table_name), table_name); + } catch(data_dictionary::no_such_column_family&) { + // DynamoDB returns validation error even when table does not exist + // and the table name is invalid. + validate_table_name(table_name); + + throw api_error::resource_not_found( + fmt::format("Requested resource not found: Table: {} not found", table_name)); + } +} + +schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request) { + auto schema = find_table(proxy, request); + if (!schema) { + // if we get here then the name was missing, since syntax or missing actual CF + // checks throw. Slow path, but just call get_table_name to generate exception. + get_table_name(request); + } + return schema; +} + +map_type attrs_type() { + static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true); + return t; +} + +const std::map& get_tags_of_table_or_throw(schema_ptr schema) { + auto tags_ptr = db::get_tags_of_table(schema); + if (tags_ptr) { + return *tags_ptr; + } else { + throw api_error::validation(format("Table {} does not have valid tagging information", schema->ks_name())); + } +} + +bool is_alternator_keyspace(std::string_view ks_name) { + return ks_name.starts_with(executor::KEYSPACE_NAME_PREFIX); +} + +// This tag is set on a GSI when the user did not specify a range key, causing +// Alternator to add the base table's range key as a spurious range key. It is +// used by describe_key_schema() to suppress reporting that key. +extern const sstring SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY; + +void describe_key_schema(rjson::value& parent, const schema& schema, std::unordered_map* attribute_types, const std::map* tags) { + rjson::value key_schema = rjson::empty_array(); + const bool ignore_range_keys_as_spurious = tags != nullptr && tags->contains(SPURIOUS_RANGE_KEY_ADDED_TO_GSI_AND_USER_DIDNT_SPECIFY_RANGE_KEY_TAG_KEY); + + for (const column_definition& cdef : schema.partition_key_columns()) { + rjson::value key = rjson::empty_object(); + rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text())); + rjson::add(key, "KeyType", "HASH"); + rjson::push_back(key_schema, std::move(key)); + if (attribute_types) { + (*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type); + } + } + if (!ignore_range_keys_as_spurious) { + // NOTE: user requested key (there can be at most one) will always come first. + // There might be more keys following it, which were added, but those were + // not requested by the user, so we ignore them. + for (const column_definition& cdef : schema.clustering_key_columns()) { + rjson::value key = rjson::empty_object(); + rjson::add(key, "AttributeName", rjson::from_string(cdef.name_as_text())); + rjson::add(key, "KeyType", "RANGE"); + rjson::push_back(key_schema, std::move(key)); + if (attribute_types) { + (*attribute_types)[cdef.name_as_text()] = type_to_string(cdef.type); + } + break; + } + } + rjson::add(parent, "KeySchema", std::move(key_schema)); +} + +// Check if the given string has valid characters for a table name, i.e. only +// a-z, A-Z, 0-9, _ (underscore), - (dash), . (dot). Note that this function +// does not check the length of the name - instead, use validate_table_name() +// to validate both the characters and the length. +static bool valid_table_name_chars(std::string_view name) { + for (auto c : name) { + if ((c < 'a' || c > 'z') && + (c < 'A' || c > 'Z') && + (c < '0' || c > '9') && + c != '_' && + c != '-' && + c != '.') { + return false; + } + } + return true; +} + +std::string view_name(std::string_view table_name, std::string_view index_name, const std::string& delim, bool validate_len) { + if (index_name.length() < 3) { + throw api_error::validation("IndexName must be at least 3 characters long"); + } + if (!valid_table_name_chars(index_name)) { + throw api_error::validation( + fmt::format("IndexName '{}' must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", index_name)); + } + std::string ret = std::string(table_name) + delim + std::string(index_name); + if (ret.length() > max_auxiliary_table_name_length && validate_len) { + throw api_error::validation( + fmt::format("The total length of TableName ('{}') and IndexName ('{}') cannot exceed {} characters", + table_name, index_name, max_auxiliary_table_name_length - delim.size())); + } + return ret; +} + +std::string gsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) { + return view_name(table_name, index_name, ":", validate_len); +} + +std::string lsi_name(std::string_view table_name, std::string_view index_name, bool validate_len) { + return view_name(table_name, index_name, "!:", validate_len); +} + +void check_key(const rjson::value& key, const schema_ptr& schema) { + if (key.MemberCount() != (schema->clustering_key_size() == 0 ? 1 : 2)) { + throw api_error::validation("Given key attribute not in schema"); + } +} + +void verify_all_are_used(const rjson::value* field, + const std::unordered_set& used, const char* field_name, const char* operation) { + if (!field) { + return; + } + for (auto it = field->MemberBegin(); it != field->MemberEnd(); ++it) { + if (!used.contains(rjson::to_string(it->name))) { + throw api_error::validation( + format("{} has spurious '{}', not used in {}", + field_name, rjson::to_string_view(it->name), operation)); + } + } +} + +// This function increments the authorization_failures counter, and may also +// log a warn-level message and/or throw an access_denied exception, depending +// on what enforce_authorization and warn_authorization are set to. +// Note that if enforce_authorization is false, this function will return +// without throwing. So a caller that doesn't want to continue after an +// authorization_error must explicitly return after calling this function. +static void authorization_error(stats& stats, bool enforce_authorization, bool warn_authorization, std::string msg) { + stats.authorization_failures++; + if (enforce_authorization) { + if (warn_authorization) { + elogger.warn("alternator_warn_authorization=true: {}", msg); + } + throw api_error::access_denied(std::move(msg)); + } else { + if (warn_authorization) { + elogger.warn("If you set alternator_enforce_authorization=true the following will be enforced: {}", msg); + } + } +} + +future<> verify_permission( + bool enforce_authorization, + bool warn_authorization, + const service::client_state& client_state, + const schema_ptr& schema, + auth::permission permission_to_check, + stats& stats) { + if (!enforce_authorization && !warn_authorization) { + co_return; + } + // Unfortunately, the fix for issue #23218 did not modify the function + // that we use here - check_has_permissions(). So if we want to allow + // writes to internal tables (from try_get_internal_table()) only to a + // superuser, we need to explicitly check it here. + if (permission_to_check == auth::permission::MODIFY && is_internal_keyspace(schema->ks_name())) { + if (!client_state.user() || + !client_state.user()->name || + !co_await client_state.get_auth_service()->underlying_role_manager().is_superuser(*client_state.user()->name)) { + sstring username = ""; + if (client_state.user() && client_state.user()->name) { + username = client_state.user()->name.value(); + } + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( + "Write access denied on internal table {}.{} to role {} because it is not a superuser", + schema->ks_name(), schema->cf_name(), username)); + co_return; + } + } + auto resource = auth::make_data_resource(schema->ks_name(), schema->cf_name()); + if (!client_state.user() || !client_state.user()->name || + !co_await client_state.check_has_permission(auth::command_desc(permission_to_check, resource))) { + sstring username = ""; + if (client_state.user() && client_state.user()->name) { + username = client_state.user()->name.value(); + } + // Using exceptions for errors makes this function faster in the + // success path (when the operation is allowed). + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( + "{} access on table {}.{} is denied to role {}, client address {}", + auth::permissions::to_string(permission_to_check), + schema->ks_name(), schema->cf_name(), username, client_state.get_client_address())); + } +} + +// Similar to verify_permission() above, but just for CREATE operations. +// Those do not operate on any specific table, so require permissions on +// ALL KEYSPACES instead of any specific table. +future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state& client_state, stats& stats) { + if (!enforce_authorization && !warn_authorization) { + co_return; + } + auto resource = auth::resource(auth::resource_kind::data); + if (!co_await client_state.check_has_permission(auth::command_desc(auth::permission::CREATE, resource))) { + sstring username = ""; + if (client_state.user() && client_state.user()->name) { + username = client_state.user()->name.value(); + } + authorization_error(stats, enforce_authorization, warn_authorization, fmt::format( + "CREATE access on ALL KEYSPACES is denied to role {}", username)); + } +} + +schema_ptr try_get_internal_table(const data_dictionary::database& db, std::string_view table_name) { + size_t it = table_name.find(executor::INTERNAL_TABLE_PREFIX); + if (it != 0) { + return schema_ptr{}; + } + table_name.remove_prefix(executor::INTERNAL_TABLE_PREFIX.size()); + size_t delim = table_name.find_first_of('.'); + if (delim == std::string_view::npos) { + return schema_ptr{}; + } + std::string_view ks_name = table_name.substr(0, delim); + table_name.remove_prefix(ks_name.size() + 1); + // Only internal keyspaces can be accessed to avoid leakage + auto ks = db.try_find_keyspace(ks_name); + if (!ks || !ks->is_internal()) { + return schema_ptr{}; + } + try { + return db.find_schema(ks_name, table_name); + } catch (data_dictionary::no_such_column_family&) { + // DynamoDB returns validation error even when table does not exist + // and the table name is invalid. + validate_table_name(table_name); + throw api_error::resource_not_found( + fmt::format("Requested resource not found: Internal table: {}.{} not found", ks_name, table_name)); + } +} + +schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request) { + sstring table_name = rjson::to_sstring(batch_request->name); // JSON keys are always strings + try { + return proxy.data_dictionary().find_schema(sstring(executor::KEYSPACE_NAME_PREFIX) + table_name, table_name); + } catch(data_dictionary::no_such_column_family&) { + // DynamoDB returns validation error even when table does not exist + // and the table name is invalid. + validate_table_name(table_name); + throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name)); + } +} + +lw_shared_ptr get_stats_from_schema(service::storage_proxy& sp, const schema& schema) { + try { + replica::table& table = sp.local_db().find_column_family(schema.id()); + if (!table.get_stats().alternator_stats) { + table.get_stats().alternator_stats = seastar::make_shared(schema.ks_name(), schema.cf_name()); + } + return table.get_stats().alternator_stats->_stats; + } catch (std::runtime_error&) { + // If we're here it means that a table we are currently working on was deleted before the + // operation completed, returning a temporary object is fine, if the table get deleted so will its metrics + return make_lw_shared(); + } +} + +void describe_single_item(const cql3::selection::selection& selection, + const std::vector& result_row, + const std::optional& attrs_to_get, + rjson::value& item, + uint64_t* item_length_in_bytes, + bool include_all_embedded_attributes) +{ + const auto& columns = selection.get_columns(); + auto column_it = columns.begin(); + for (const managed_bytes_opt& cell : result_row) { + if (!cell) { + ++column_it; + continue; + } + std::string column_name = (*column_it)->name_as_text(); + if (column_name != executor::ATTRS_COLUMN_NAME) { + if (item_length_in_bytes) { + (*item_length_in_bytes) += column_name.length() + cell->size(); + } + if (!attrs_to_get || attrs_to_get->contains(column_name)) { + // item is expected to start empty, and column_name are unique + // so add() makes sense + rjson::add_with_string_name(item, column_name, rjson::empty_object()); + rjson::value& field = item[column_name.c_str()]; + cell->with_linearized([&] (bytes_view linearized_cell) { + rjson::add_with_string_name(field, type_to_string((*column_it)->type), json_key_column_value(linearized_cell, **column_it)); + }); + } + } else { + auto deserialized = attrs_type()->deserialize(*cell); + auto keys_and_values = value_cast(deserialized); + for (auto entry : keys_and_values) { + std::string attr_name = value_cast(entry.first); + if (item_length_in_bytes) { + (*item_length_in_bytes) += attr_name.length(); + } + if (include_all_embedded_attributes || !attrs_to_get || attrs_to_get->contains(attr_name)) { + bytes value = value_cast(entry.second); + if (item_length_in_bytes && value.length()) { + // ScyllaDB uses one extra byte compared to DynamoDB for the bytes length + (*item_length_in_bytes) += value.length() - 1; + } + rjson::value v = deserialize_item(value); + if (attrs_to_get) { + auto it = attrs_to_get->find(attr_name); + if (it != attrs_to_get->end()) { + // attrs_to_get may have asked for only part of + // this attribute. hierarchy_filter() modifies v, + // and returns false when nothing is to be kept. + if (!hierarchy_filter(v, it->second)) { + continue; + } + } + } + // item is expected to start empty, and attribute + // names are unique so add() makes sense + rjson::add_with_string_name(item, attr_name, std::move(v)); + } else if (item_length_in_bytes) { + (*item_length_in_bytes) += value_cast(entry.second).length() - 1; + } + } + } + ++column_it; + } +} + +std::optional describe_single_item(schema_ptr schema, + const query::partition_slice& slice, + const cql3::selection::selection& selection, + const query::result& query_result, + const std::optional& attrs_to_get, + uint64_t* item_length_in_bytes) { + rjson::value item = rjson::empty_object(); + + cql3::selection::result_set_builder builder(selection, gc_clock::now()); + query::result_view::consume(query_result, slice, cql3::selection::result_set_builder::visitor(builder, *schema, selection)); + + auto result_set = builder.build(); + if (result_set->empty()) { + if (item_length_in_bytes) { + // empty results is counted as having a minimal length (e.g. 1 byte). + (*item_length_in_bytes) += 1; + } + // If there is no matching item, we're supposed to return an empty + // object without an Item member - not one with an empty Item member + return {}; + } + if (result_set->size() > 1) { + // If the result set contains multiple rows, the code should have + // called describe_multi_item(), not this function. + throw std::logic_error("describe_single_item() asked to describe multiple items"); + } + describe_single_item(selection, *result_set->rows().begin(), attrs_to_get, item, item_length_in_bytes); + return item; +} + +static void check_big_array(const rjson::value& val, int& size_left); +static void check_big_object(const rjson::value& val, int& size_left); + +// For simplicity, we use a recursive implementation. This is fine because +// Alternator limits the depth of JSONs it reads from inputs, and doesn't +// add more than a couple of levels in its own output construction. +bool is_big(const rjson::value& val, int big_size) { + if (val.IsString()) { + return ssize_t(val.GetStringLength()) > big_size; + } else if (val.IsObject()) { + check_big_object(val, big_size); + return big_size < 0; + } else if (val.IsArray()) { + check_big_array(val, big_size); + return big_size < 0; + } + return false; +} + +static void check_big_array(const rjson::value& val, int& size_left) { + // Assume a fixed size of 10 bytes for each number, boolean, etc., or + // beginning of a sub-object. This doesn't have to be accurate. + size_left -= 10 * val.Size(); + for (const auto& v : val.GetArray()) { + if (size_left < 0) { + return; + } + // Note that we avoid recursive calls for the leaves (anything except + // array or object) because usually those greatly outnumber the trunk. + if (v.IsString()) { + size_left -= v.GetStringLength(); + } else if (v.IsObject()) { + check_big_object(v, size_left); + } else if (v.IsArray()) { + check_big_array(v, size_left); + } + } +} + +static void check_big_object(const rjson::value& val, int& size_left) { + size_left -= 10 * val.MemberCount(); + for (const auto& m : val.GetObject()) { + if (size_left < 0) { + return; + } + size_left -= m.name.GetStringLength(); + if (m.value.IsString()) { + size_left -= m.value.GetStringLength(); + } else if (m.value.IsObject()) { + check_big_object(m.value, size_left); + } else if (m.value.IsArray()) { + check_big_array(m.value, size_left); + } + } +} + +void validate_table_name(std::string_view name, const char* source) { + if (name.length() < 3 || name.length() > max_table_name_length) { + throw api_error::validation( + format("{} must be at least 3 characters long and at most {} characters long", source, max_table_name_length)); + } + if (!valid_table_name_chars(name)) { + throw api_error::validation( + format("{} must satisfy regular expression pattern: [a-zA-Z0-9_.-]+", source)); + } +} + +void validate_cdc_log_name_length(std::string_view table_name) { + if (cdc::log_name(table_name).length() > max_auxiliary_table_name_length) { + // CDC will add cdc_log_suffix ("_scylla_cdc_log") to the table name + // to create its log table, and this will exceed the maximum allowed + // length. To provide a more helpful error message, we assume that + // cdc::log_name() always adds a suffix of the same length. + int suffix_len = cdc::log_name(table_name).length() - table_name.length(); + throw api_error::validation(fmt::format("Streams or vector search cannot be enabled on a table whose name is longer than {} characters: {}", + max_auxiliary_table_name_length - suffix_len, table_name)); + } +} + +body_writer make_streamed(rjson::value&& value) { + return [value = std::move(value)](output_stream&& _out) mutable -> future<> { + auto out = std::move(_out); + std::exception_ptr ex; + try { + co_await rjson::print(value, out); + } catch (...) { + ex = std::current_exception(); + } + co_await out.close(); + co_await rjson::destroy_gently(std::move(value)); + if (ex) { + co_await coroutine::return_exception_ptr(std::move(ex)); + } + }; +} + +} // namespace alternator diff --git a/alternator/executor_util.hh b/alternator/executor_util.hh new file mode 100644 index 0000000000..c60b1b05ad --- /dev/null +++ b/alternator/executor_util.hh @@ -0,0 +1,247 @@ +/* + * Copyright 2019-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 + */ + +// This header file, and the implementation file executor_util.cc, contain +// various utility functions that are reused in many different operations +// (API requests) across Alternator's code - in files such as executor.cc, +// executor_read.cc, streams.cc, ttl.cc, and more. These utility functions +// include things like extracting and validating pieces from a JSON request, +// checking permissions, constructing auxiliary table names, and more. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "utils/rjson.hh" +#include "schema/schema_fwd.hh" +#include "types/types.hh" +#include "auth/permission.hh" +#include "alternator/stats.hh" +#include "alternator/attribute_path.hh" +#include "utils/managed_bytes.hh" + +namespace query { class partition_slice; class result; } +namespace cql3::selection { class selection; } +namespace data_dictionary { class database; } +namespace service { class storage_proxy; class client_state; } + +namespace alternator { + +/// The body_writer is used for streaming responses - where the response body +/// is written in chunks to the output_stream. This allows for efficient +/// handling of large responses without needing to allocate a large buffer in +/// memory. It is one of the variants of executor::request_return_type. +using body_writer = noncopyable_function(output_stream&&)>; + +/// Get the value of an integer attribute, or an empty optional if it is +/// missing. If the attribute exists, but is not an integer, a descriptive +/// api_error is thrown. +std::optional get_int_attribute(const rjson::value& value, std::string_view attribute_name); + +/// Get the value of a string attribute, or a default value if it is missing. +/// If the attribute exists, but is not a string, a descriptive api_error is +/// thrown. +std::string get_string_attribute(const rjson::value& value, std::string_view attribute_name, const char* default_return); + +/// Get the value of a boolean attribute, or a default value if it is missing. +/// If the attribute exists, but is not a bool, a descriptive api_error is +/// thrown. +bool get_bool_attribute(const rjson::value& value, std::string_view attribute_name, bool default_return); + +/// Extract table name from a request. +/// Most requests expect the table's name to be listed in a "TableName" field. +/// get_table_name() returns the name or api_error in case the table name is +/// missing or not a string. +std::string get_table_name(const rjson::value& request); + +/// find_table_name() is like get_table_name() except that it returns an +/// optional table name - it returns an empty optional when the TableName +/// is missing from the request, instead of throwing as get_table_name() +/// does. However, find_table_name() still throws if a TableName exists but +/// is not a string. +std::optional find_table_name(const rjson::value& request); + +/// Extract table schema from a request. +/// Many requests expect the table's name to be listed in a "TableName" field +/// and need to look it up as an existing table. The get_table() function +/// does this, with the appropriate validation and api_error in case the table +/// name is missing, invalid or the table doesn't exist. If everything is +/// successful, it returns the table's schema. +schema_ptr get_table(service::storage_proxy& proxy, const rjson::value& request); + +/// This find_table() variant is like get_table() excepts that it returns a +/// nullptr instead of throwing if the request does not mention a TableName. +/// In other cases of errors (i.e., a table is mentioned but doesn't exist) +/// this function throws too. +schema_ptr find_table(service::storage_proxy& proxy, const rjson::value& request); + +/// This find_table() variant is like the previous one except that it takes +/// the table name directly instead of a request object. It is used in cases +/// where we already have the table name extracted from the request. +schema_ptr find_table(service::storage_proxy& proxy, std::string_view table_name); + +// We would have liked to support table names up to 255 bytes, like DynamoDB. +// But Scylla creates a directory whose name is the table's name plus 33 +// bytes (dash and UUID), and since directory names are limited to 255 bytes, +// we need to limit table names to 222 bytes, instead of 255. See issue #4480. +// We actually have two limits here, +// * max_table_name_length is the limit that Alternator will impose on names +// of new Alternator tables. +// * max_auxiliary_table_name_length is the potentially higher absolute limit +// that Scylla imposes on the names of auxiliary tables that Alternator +// wants to create internally - i.e. materialized views or CDC log tables. +// The second limit might mean that it is not possible to add a GSI to an +// existing table, because the name of the new auxiliary table may go over +// the limit. The second limit is also one of the reasons why the first limit +// is set lower than 222 - to have room to enable streams which add the extra +// suffix "_scylla_cdc_log" to the table name. +inline constexpr int max_table_name_length = 192; +inline constexpr int max_auxiliary_table_name_length = 222; + +/// validate_table_name() validates the TableName parameter in a request - it +/// should be called in CreateTable, and in other requests only when noticing +/// that the named table doesn't exist. +/// The DynamoDB developer guide, https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.NamingRules +/// specifies that table "names must be between 3 and 255 characters long and +/// can contain only the following characters: a-z, A-Z, 0-9, _ (underscore), +/// - (dash), . (dot)". However, Alternator only allows max_table_name_length +/// characters (see above) - not 255. +/// validate_table_name() throws the appropriate api_error if this validation +/// fails. +void validate_table_name(std::string_view name, const char* source = "TableName"); + +/// Validate that a CDC log table could be created for the base table with a +/// given table_name, and if not, throw a user-visible api_error::validation. +/// It is not possible to create a CDC log table if the table name is so long +/// that adding the 15-character suffix "_scylla_cdc_log" (cdc_log_suffix) +/// makes it go over max_auxiliary_table_name_length. +/// Note that if max_table_name_length is set to less than 207 (which is +/// max_auxiliary_table_name_length-15), then this function will never +/// fail. However, it's still important to call it in UpdateTable, in case +/// we have pre-existing tables with names longer than this to avoid #24598. +void validate_cdc_log_name_length(std::string_view table_name); + +/// Checks if a keyspace, given by its name, is an Alternator keyspace. +/// This just checks if the name begins in executor::KEYSPACE_NAME_PREFIX, +/// a prefix that all keyspaces created by Alternator's CreateTable use. +bool is_alternator_keyspace(std::string_view ks_name); + +/// Wraps db::get_tags_of_table() and throws api_error::validation if the +/// table is missing the tags extension. +const std::map& get_tags_of_table_or_throw(schema_ptr schema); + +/// Returns a type object representing the type of the ":attrs" column used +/// by Alternator to store all non-key attribute. This type is a map from +/// string (attribute name) to bytes (serialized attribute value). +map_type attrs_type(); + +// In DynamoDB index names are local to a table, while in Scylla, materialized +// view names are global (in a keyspace). So we need to compose a unique name +// for the view taking into account both the table's name and the index name. +// We concatenate the table and index name separated by a delim character +// (a character not allowed by DynamoDB in ordinary table names, default: ":"). +// The downside of this approach is that it limits the sum of the lengths, +// instead of each component individually as DynamoDB does. +// The view_name() function assumes the table_name has already been validated +// but validates the legality of index_name and the combination of both. +std::string view_name(std::string_view table_name, std::string_view index_name, + const std::string& delim = ":", bool validate_len = true); +std::string gsi_name(std::string_view table_name, std::string_view index_name, + bool validate_len = true); +std::string lsi_name(std::string_view table_name, std::string_view index_name, + bool validate_len = true); + +/// After calling pk_from_json() and ck_from_json() to extract the pk and ck +/// components of a key, and if that succeeded, call check_key() to further +/// check that the key doesn't have any spurious components. +void check_key(const rjson::value& key, const schema_ptr& schema); + +/// Fail with api_error::validation if the expression if has unused attribute +/// names or values. This is how DynamoDB behaves, so we do too. +void verify_all_are_used(const rjson::value* field, + const std::unordered_set& used, + const char* field_name, + const char* operation); + +/// Check CQL's Role-Based Access Control (RBAC) permission (MODIFY, +/// SELECT, DROP, etc.) on the given table. When permission is denied an +/// appropriate user-readable api_error::access_denied is thrown. +future<> verify_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, const schema_ptr&, auth::permission, stats& stats); + +/// Similar to verify_permission() above, but just for CREATE operations. +/// Those do not operate on any specific table, so require permissions on +/// ALL KEYSPACES instead of any specific table. +future<> verify_create_permission(bool enforce_authorization, bool warn_authorization, const service::client_state&, stats& stats); + +// Sets a KeySchema JSON array inside the given parent object describing the +// key attributes of the given schema as HASH or RANGE keys. Additionally, +// adds mappings from key attribute names to their DynamoDB type string into +// attribute_types. +void describe_key_schema(rjson::value& parent, const schema&, std::unordered_map* attribute_types = nullptr, const std::map* tags = nullptr); + +/// is_big() checks approximately if the given JSON value is "bigger" than +/// the given big_size number of bytes. The goal is to *quickly* detect +/// oversized JSON that, for example, is too large to be serialized to a +/// contiguous string - we don't need an accurate size for that. Moreover, +/// as soon as we detect that the JSON is indeed "big", we can return true +/// and don't need to continue calculating its exact size. +bool is_big(const rjson::value& val, int big_size = 100'000); + +/// try_get_internal_table() handles the special case that the given table_name +/// begins with INTERNAL_TABLE_PREFIX (".scylla.alternator."). In that case, +/// this function assumes that the rest of the name refers to an internal +/// Scylla table (e.g., system table) and returns the schema of that table - +/// or an exception if it doesn't exist. Otherwise, if table_name does not +/// start with INTERNAL_TABLE_PREFIX, this function returns an empty schema_ptr +/// and the caller should look for a normal Alternator table with that name. +schema_ptr try_get_internal_table(const data_dictionary::database& db, std::string_view table_name); + +/// get_table_from_batch_request() is used by batch write/read operations to +/// look up the schema for a table named in a batch request, by the JSON member +/// name (which is the table name in a BatchWriteItem or BatchGetItem request). +schema_ptr get_table_from_batch_request(const service::storage_proxy& proxy, const rjson::value::ConstMemberIterator& batch_request); + +/// Returns (or lazily creates) the per-table stats object for the given schema. +/// If the table has been deleted, returns a temporary stats object. +lw_shared_ptr get_stats_from_schema(service::storage_proxy& sp, const schema& schema); + +/// Writes one item's attributes into `item` from the given selection result +/// row. If include_all_embedded_attributes is true, all attributes from the +/// ATTRS_COLUMN map column are included regardless of attrs_to_get. +void describe_single_item(const cql3::selection::selection&, + const std::vector&, + const std::optional&, + rjson::value&, + uint64_t* item_length_in_bytes = nullptr, + bool include_all_embedded_attributes = false); + +/// Converts a single result row to a JSON item, or returns an empty optional +/// if the result is empty. +std::optional describe_single_item(schema_ptr, + const query::partition_slice&, + const cql3::selection::selection&, + const query::result&, + const std::optional&, + uint64_t* item_length_in_bytes = nullptr); + +/// Make a body_writer (function that can write output incrementally to the +/// HTTP stream) from the given JSON object. +/// Note: only useful for (very) large objects as there are overhead issues +/// with this as well, but for massive lists of return objects this can +/// help avoid large allocations/many re-allocs. +body_writer make_streamed(rjson::value&&); + +} // namespace alternator diff --git a/alternator/http_compression.cc b/alternator/http_compression.cc index fd1315b111..88541248ba 100644 --- a/alternator/http_compression.cc +++ b/alternator/http_compression.cc @@ -264,7 +264,7 @@ private: } }; -executor::body_writer compress(response_compressor::compression_type ct, const db::config& cfg, executor::body_writer&& bw) { +body_writer compress(response_compressor::compression_type ct, const db::config& cfg, body_writer&& bw) { return [bw = std::move(bw), ct, level = cfg.alternator_response_gzip_compression_level()](output_stream&& out) mutable -> future<> { output_stream_options opts; opts.trim_to_size = true; @@ -287,7 +287,7 @@ executor::body_writer compress(response_compressor::compression_type ct, const d }; } -future> response_compressor::generate_reply(std::unique_ptr rep, sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer) { +future> response_compressor::generate_reply(std::unique_ptr rep, sstring accept_encoding, const char* content_type, body_writer&& body_writer) { response_compressor::compression_type ct = find_compression(accept_encoding, std::numeric_limits::max()); if (ct != response_compressor::compression_type::none) { rep->add_header("Content-Encoding", get_encoding_name(ct)); diff --git a/alternator/http_compression.hh b/alternator/http_compression.hh index c5bbb7720f..8124151be3 100644 --- a/alternator/http_compression.hh +++ b/alternator/http_compression.hh @@ -85,7 +85,7 @@ public: future> generate_reply(std::unique_ptr rep, sstring accept_encoding, const char* content_type, std::string&& response_body); future> generate_reply(std::unique_ptr rep, - sstring accept_encoding, const char* content_type, executor::body_writer&& body_writer); + sstring accept_encoding, const char* content_type, body_writer&& body_writer); }; } diff --git a/alternator/serialization.cc b/alternator/serialization.cc index ff16564b12..850b6dcfa3 100644 --- a/alternator/serialization.cc +++ b/alternator/serialization.cc @@ -14,12 +14,12 @@ #include "types/concrete_types.hh" #include "types/json_utils.hh" #include "mutation/position_in_partition.hh" +#include "alternator/executor_util.hh" static logging::logger slogger("alternator-serialization"); namespace alternator { -bool is_alternator_keyspace(const sstring& ks_name); type_info type_info_from_string(std::string_view type) { static thread_local const std::unordered_map type_infos = { diff --git a/alternator/server.cc b/alternator/server.cc index 762b0234bb..68f7cacf7a 100644 --- a/alternator/server.cc +++ b/alternator/server.cc @@ -8,6 +8,7 @@ #include "alternator/server.hh" #include "audit/audit.hh" +#include "alternator/executor_util.hh" #include "gms/application_state.hh" #include "utils/log.hh" #include @@ -143,7 +144,7 @@ public: return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding), REPLY_CONTENT_TYPE, std::move(str)); }, - [&] (executor::body_writer&& body_writer) { + [&] (body_writer&& body_writer) { return _response_compressor.generate_reply(std::move(rep), std::move(accept_encoding), REPLY_CONTENT_TYPE, std::move(body_writer)); }, diff --git a/alternator/streams.cc b/alternator/streams.cc index d6fa5475bd..4713f1d7cb 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -30,6 +30,7 @@ #include "gms/feature_service.hh" #include "executor.hh" +#include "alternator/executor_util.hh" #include "data_dictionary/data_dictionary.hh" #include "utils/rjson.hh" @@ -262,7 +263,7 @@ future alternator::executor::list_str auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) }; rjson::add(new_entry, "StreamArn", arn); rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s))); - rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(table_name(*s)))); + rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(s->cf_name()))); rjson::push_back(streams, std::move(new_entry)); --limit; } @@ -565,7 +566,7 @@ future executor::describe_stream(client_state& cl rjson::add(stream_desc, "StreamArn", stream_arn); rjson::add(stream_desc, "StreamViewType", type); - rjson::add(stream_desc, "TableName", rjson::from_string(table_name(*bs))); + rjson::add(stream_desc, "TableName", rjson::from_string(bs->cf_name())); describe_key_schema(stream_desc, *bs); diff --git a/alternator/ttl.cc b/alternator/ttl.cc index f61b57f5f7..3f3c57c00b 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -44,6 +44,7 @@ #include "cql3/query_options.hh" #include "cql3/column_identifier.hh" #include "alternator/executor.hh" +#include "alternator/executor_util.hh" #include "alternator/controller.hh" #include "alternator/serialization.hh" #include "alternator/ttl_tag.hh" diff --git a/configure.py b/configure.py index d1ce4ef2f8..8b37f42156 100755 --- a/configure.py +++ b/configure.py @@ -1439,6 +1439,7 @@ alternator = [ 'alternator/server.cc', 'alternator/executor.cc', 'alternator/executor_read.cc', + 'alternator/executor_util.cc', 'alternator/stats.cc', 'alternator/serialization.cc', 'alternator/expressions.cc',