diff --git a/alternator/executor.cc b/alternator/executor.cc index 30b597400e..a032e4c678 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -341,7 +341,7 @@ static void set_table_creation_time(std::map& tags_map, db_clo tags_map[TABLE_CREATION_TIME_TAG_KEY] = std::to_string(tm); } -static double get_table_creation_time(const schema &schema) { +double get_table_creation_time(const schema &schema) { auto time = db::find_tag(schema, TABLE_CREATION_TIME_TAG_KEY); if (time) { try { @@ -1236,9 +1236,91 @@ static std::pair parse_key_schema(const rjson::value& return {hash_key, range_key}; } +arn_parts parse_arn(std::string_view arn, std::string_view arn_field_name, std::string_view type_name, std::string_view expected_postfix) { + // Expected ARN format arn:partition:service:region:account-id:resource-type/resource-id + // So (old arn produced by scylla for internal purposes) arn:scylla:alternator:${KEYSPACE_NAME}:scylla:table/${TABLE_NAME} + // or (KCL ready new arn) arn:aws:dynamodb:us-east-1:797456418907:table/${KEYSPACE_NAME}@${TABLE_NAME}/stream/2025-12-18T17:38:48.952 + // According to Amazon account-id must be a number, but we don't check for it here. + // postfix is part of the string after table name, including the separator, e.g. "/stream/2025-12-18T17:38:48.952" in the second example above + // if expected_postfix is empty, then we reject ARN with any postfix + // otherwise we require that postfix starts with expected_postfix and we return it + + if (!arn.starts_with("arn:")) { + throw api_error::access_denied(fmt::format("{}: Invalid {} ARN `{}` - missing `arn:` prefix", arn_field_name, type_name, arn)); + } + + // skip to the resource part + // we don't require ARNs to follow KCL requirements, except for number of parts. + auto index = arn.find(':'); // skip arn + bool is_scylla_arn = false; + std::string_view keyspace_name; + if (index != std::string_view::npos) { + auto start = index + 1; + index = arn.find(':', index + 1); // skip partition + if (index != std::string_view::npos) { + if (arn.substr(start, index - start) == "scylla") { + is_scylla_arn = true; + } + index = arn.find(':', index + 1); // skip service + if (index != std::string_view::npos) { + start = index + 1; + index = arn.find(':', index + 1); // skip region + if (index != std::string_view::npos) { + if (is_scylla_arn) { + keyspace_name = arn.substr(start, index - start); + } + index = arn.find(':', index + 1); // skip account-id + } + } + } + } + if (index == std::string_view::npos) { + throw api_error::access_denied(fmt::format("{}: Invalid {} ARN `{}` - missing arn:::: prefix", arn_field_name, type_name, arn)); + } + + // index is at last `:` before `table/` + // this looks like a valid ARN up to this point + // as a sanity check make sure that what follows is a resource-type "table/" + if (arn.substr(index + 1, 6) != "table/") { + throw api_error::validation(fmt::format("{}: Invalid {} ARN `{}` - resource-type is not `table/`", arn_field_name, type_name, arn)); + } + index += 7; // move past ":table/" + + // Amazon's spec allows both ':' and '/' as resource separators + auto end_index = arn.substr(index).find_first_of("/:"); + if (end_index == std::string_view::npos) { + end_index = arn.length(); + } + else { + end_index += index; // adjust end_index to be relative to the start of the string, not to the start of the table name + } + auto table_name = arn.substr(index, end_index - index); + if (!is_scylla_arn) { + auto separator_pos = table_name.find_first_of("@"); + if (separator_pos == std::string_view::npos) { + throw api_error::validation(fmt::format("{}: Invalid {} ARN `{}` - missing separator `@`", arn_field_name, type_name, arn)); + } + keyspace_name = table_name.substr(0, separator_pos); + table_name = table_name.substr(separator_pos + 1); + } + index = end_index; + + // caller might require a specific postfix after the table name + // so for example in arn:aws:dynamodb:us-east-1:797456418907:table/dynamodb_streams_verification_table_rc/stream/2025-12-18T17:38:48.952 + // specific postfix could be "/stream/" to make sure ARN is for a stream, not for the table itself + // postfix will contain leading separator + std::string_view postfix = arn.substr(index); + + if (postfix.empty() == expected_postfix.empty() && postfix.starts_with(expected_postfix)) { + // we will end here if + // - postfix and expected_postfix are both empty (thus `starts_with` will check against empty string and return true) + // - both are not empty and postfix starts with expected_postfix + return { keyspace_name, table_name, postfix }; + } + throw api_error::validation(fmt::format("{}: Invalid {} ARN `{}` - expected `{}` after table name `{}`", arn_field_name, type_name, arn, expected_postfix, table_name)); +} + static schema_ptr get_table_from_arn(service::storage_proxy& proxy, std::string_view arn) { - // Expected format: arn:scylla:alternator:${KEYSPACE_NAME}:scylla:table/${TABLE_NAME}; - constexpr size_t prefix_size = sizeof("arn:scylla:alternator:") - 1; // NOTE: This code returns AccessDeniedException if it's problematic to parse or recognize an arn. // Technically, a properly formatted, but nonexistent arn *should* return AccessDeniedException, // while an incorrectly formatted one should return ValidationException. @@ -1250,22 +1332,12 @@ static schema_ptr get_table_from_arn(service::storage_proxy& proxy, std::string_ // concluding that an error is an error and code which uses tagging // must be ready for handling AccessDeniedException instances anyway. try { - size_t keyspace_end = arn.find_first_of(':', prefix_size); - std::string_view keyspace_name = arn.substr(prefix_size, keyspace_end - prefix_size); - size_t table_start = arn.find_first_of('/'); - std::string_view table_name = arn.substr(table_start + 1); - if (table_name.find('/') != std::string_view::npos) { - // A table name cannot contain a '/' - if it does, it's not a - // table ARN, it may be an index. DynamoDB returns a - // ValidationException in that case - see #10786. - throw api_error::validation(fmt::format("ResourceArn '{}' is not a valid table ARN", table_name)); - } - // FIXME: remove sstring creation once find_schema gains a view-based interface - return proxy.data_dictionary().find_schema(sstring(keyspace_name), sstring(table_name)); + auto parts = parse_arn(arn, "ResourceArn", "table", ""); + return proxy.data_dictionary().find_schema(parts.keyspace_name, parts.table_name); } catch (const data_dictionary::no_such_column_family& e) { - throw api_error::resource_not_found(fmt::format("ResourceArn '{}' not found", arn)); + throw api_error::resource_not_found(fmt::format("ResourceArn: Invalid table ARN `{}` - not found", arn)); } catch (const std::out_of_range& e) { - throw api_error::access_denied("Incorrect resource identifier"); + throw api_error::access_denied(fmt::format("ResourceArn: Invalid table ARN `{}` - {}", arn, e.what())); } } diff --git a/alternator/executor.hh b/alternator/executor.hh index 6a50140bdf..91a51df4d4 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -301,4 +301,27 @@ future<> verify_permission(bool enforce_authorization, bool warn_authorization, */ executor::body_writer make_streamed(rjson::value&&); +// returns table creation time in seconds since epoch for `db_clock` +double get_table_creation_time(const schema &schema); + +// result of parsing ARN (Amazon Resource Name) +// ARN format is `arn::::://` +// we ignore partition, service and account-id +// resource-type must be string "table" +// resource-id will be returned as table_name +// region will be returned as keyspace_name +// postfix is a string after resource-id and will be returned as is (whole), including separator. +struct arn_parts { + std::string_view keyspace_name; + std::string_view table_name; + std::string_view postfix; +}; +// arn - arn to parse +// arn_field_name - identifier of the ARN, used only when reporting an error (in error messages), for example "Incorrect resource identifier ``" +// type_name - used only when reporting an error (in error messages), for example "... is not a valid ARN ..." +// expected_postfix - optional filter of postfix value (part of ARN after resource-id, including separator, see comments for struct arn_parts). +// If is empty - then postfix value must be empty as well +// if not empty - postfix value must start with expected_postfix, but might be longer +arn_parts parse_arn(std::string_view arn, std::string_view arn_field_name, std::string_view type_name, std::string_view expected_postfix); + } diff --git a/alternator/streams.cc b/alternator/streams.cc index d18a0d88ad..5a3282818f 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -92,44 +92,97 @@ static sstring stream_label(const schema& log_schema) { } namespace alternator { +// stream arn has certain format (see https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html) +// we need to follow it as Kinesis Client Library does check +// NOTE: we're holding inside a name of cdc log table, not a user table +class stream_arn { + std::string _arn; + std::string_view _table_name; + std::string_view _keyspace_name; +public: + // ARN to get table name from + stream_arn(std::string arn) : _arn(std::move(arn)) { + auto parts = parse_arn(_arn, "StreamArn", "stream", "/stream/"); + _table_name = parts.table_name; + _keyspace_name = parts.keyspace_name; + } + // NOTE: it must be a schema of a CDC log table, not a base table, because that's what we are encoding in ARN and returning to users. + // we need base schema for creation time + stream_arn(schema_ptr s, schema_ptr base_schema) { + auto creation_time = get_table_creation_time(*base_schema); + auto now = std::chrono::system_clock::time_point{ std::chrono::duration_cast(std::chrono::duration(creation_time)) }; -// stream arn _has_ to be 37 or more characters long. ugh... -// see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html#API_streams_DescribeStream_RequestSyntax + // KCL checks for arn / aws / dynamodb and account-id being a number + _arn = fmt::format("arn:aws:dynamodb:us-east-1:000000000000:table/{}@{}/stream/{:%FT%T}", s->ks_name(), s->cf_name(), now); + auto table_index = std::string_view{"arn:aws:dynamodb:us-east-1:000000000000:table/"}.size(); + auto x1 = _arn.find("@", table_index); + auto x2 = _arn.find("/", x1); + _table_name = std::string_view{ _arn }.substr(x1 + 1, x2 - (x1 + 1)); + _keyspace_name = std::string_view{ _arn }.substr(table_index, x1 - table_index); + } + + std::string_view unparsed() const { return _arn; } + std::string_view table_name() const { return _table_name; } + std::string_view keyspace_name() const { return _keyspace_name; } + friend std::ostream& operator<<(std::ostream& os, const stream_arn& arn) { + os << arn._arn; + return os; + } +}; + +// NOTE: this will return schema for cdc log table, not the base table. +static schema_ptr get_schema_from_arn(service::storage_proxy& proxy, const stream_arn& arn) +{ + if (!cdc::is_log_name(arn.table_name())) { + throw api_error::resource_not_found(fmt::format("{} as found in ARN {} is not a valid name for a CDC table", arn.table_name(), arn.unparsed())); + } + try { + return proxy.data_dictionary().find_schema(arn.keyspace_name(), arn.table_name()); + } catch(data_dictionary::no_such_column_family&) { + throw api_error::resource_not_found(fmt::format("`{}` is not a valid StreamArn - table {} not found", arn.unparsed(), arn.table_name())); + } +} + +// ShardId. Must be between 28 and 65 characters inclusive. // UUID is 36 bytes as string (including dashes). -// Prepend a version/type marker -> 37 -class stream_arn : public utils::UUID { +// Prepend a version/type marker (`S`) -> 37 +class stream_shard_id : public utils::UUID { public: using UUID = utils::UUID; static constexpr char marker = 'S'; - stream_arn() = default; - stream_arn(const UUID& uuid) + stream_shard_id() = default; + stream_shard_id(const UUID& uuid) : UUID(uuid) {} - stream_arn(const table_id& tid) + stream_shard_id(const table_id& tid) : UUID(tid.uuid()) {} - stream_arn(std::string_view v) + stream_shard_id(std::string_view v) : UUID(v.substr(1)) { if (v[0] != marker) { throw std::invalid_argument(std::string(v)); } } - friend std::ostream& operator<<(std::ostream& os, const stream_arn& arn) { + friend std::ostream& operator<<(std::ostream& os, const stream_shard_id& arn) { const UUID& uuid = arn; return os << marker << uuid; } - friend std::istream& operator>>(std::istream& is, stream_arn& arn) { + friend std::istream& operator>>(std::istream& is, stream_shard_id& arn) { std::string s; is >> s; - arn = stream_arn(s); + arn = stream_shard_id(s); return is; } }; } // namespace alternator +template +struct rapidjson::internal::TypeHelper + : public from_string_helper +{}; template struct rapidjson::internal::TypeHelper : public from_string_helper @@ -141,7 +194,7 @@ future alternator::executor::list_str _stats.api_operations.list_streams++; auto limit = rjson::get_opt(request, "Limit").value_or(100); - auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); + auto streams_start = rjson::get_opt(request, "ExclusiveStartStreamArn"); auto table = find_table(_proxy, request); auto db = _proxy.data_dictionary(); @@ -190,7 +243,7 @@ future alternator::executor::list_str auto ret = rjson::empty_object(); auto streams = rjson::empty_array(); - std::optional last; + std::optional last; for (;limit > 0 && i != e; ++i) { auto s = i->schema(); @@ -204,7 +257,8 @@ future alternator::executor::list_str rjson::value new_entry = rjson::empty_object(); last = i->schema()->id(); - rjson::add(new_entry, "StreamArn", *last); + auto arn = stream_arn{ i->schema(), cdc::get_base_table(db.real_database(), *i->schema()) }; + rjson::add(new_entry, "StreamArn", arn); rjson::add(new_entry, "StreamLabel", rjson::from_string(stream_label(*s))); rjson::add(new_entry, "TableName", rjson::from_string(cdc::base_name(table_name(*s)))); rjson::push_back(streams, std::move(new_entry)); @@ -459,12 +513,11 @@ future executor::describe_stream(client_state& cl // I.e. unparsable arn -> error. auto stream_arn = rjson::get(request, "StreamArn"); - schema_ptr schema, bs; + schema_ptr bs; auto db = _proxy.data_dictionary(); + auto schema = get_schema_from_arn(_proxy, stream_arn); try { - auto cf = db.find_column_family(table_id(stream_arn)); - schema = cf.schema(); bs = cdc::get_base_table(db.real_database(), *schema); } catch (...) { } @@ -504,7 +557,7 @@ future executor::describe_stream(client_state& cl stream_view_type type = cdc_options_to_steam_view_type(opts); - rjson::add(stream_desc, "StreamArn", alternator::stream_arn(schema->id())); + rjson::add(stream_desc, "StreamArn", stream_arn); rjson::add(stream_desc, "StreamViewType", type); rjson::add(stream_desc, "TableName", rjson::from_string(table_name(*bs))); @@ -736,12 +789,9 @@ future executor::get_shard_iterator(client_state& auto stream_arn = rjson::get(request, "StreamArn"); auto db = _proxy.data_dictionary(); - schema_ptr schema = nullptr; std::optional sid; - + auto schema = get_schema_from_arn(_proxy, stream_arn); try { - auto cf = db.find_column_family(table_id(stream_arn)); - schema = cf.schema(); sid = rjson::get(request, "ShardId"); } catch (...) { } @@ -776,7 +826,7 @@ future executor::get_shard_iterator(client_state& break; } - shard_iterator iter(stream_arn, *sid, threshold, inclusive_of_threshold); + shard_iterator iter(schema->id().uuid(), *sid, threshold, inclusive_of_threshold); auto ret = rjson::empty_object(); rjson::add(ret, "ShardIterator", iter); @@ -1156,7 +1206,7 @@ void executor::supplement_table_stream_info(rjson::value& descr, const schema& s if (opts.enabled()) { auto db = sp.data_dictionary(); auto cf = db.find_table(schema.ks_name(), cdc::log_name(schema.cf_name())); - stream_arn arn(cf.schema()->id()); + stream_arn arn(cf.schema(), cdc::get_base_table(db.real_database(), *cf.schema())); rjson::add(descr, "LatestStreamArn", arn); rjson::add(descr, "LatestStreamLabel", rjson::from_string(stream_label(*cf.schema()))); diff --git a/test/boost/alternator_unit_test.cc b/test/boost/alternator_unit_test.cc index 713209e863..e8e4e14e8b 100644 --- a/test/boost/alternator_unit_test.cc +++ b/test/boost/alternator_unit_test.cc @@ -13,9 +13,11 @@ #include "utils/base64.hh" #include "utils/rjson.hh" #include "alternator/serialization.hh" +#include "alternator/error.hh" #include "alternator/expressions.hh" #include "cdc/generation.hh" +#include "alternator/executor.hh" #include #include #include @@ -24,6 +26,60 @@ namespace alternator { const cdc::stream_id& find_parent_shard_in_previous_generation(db_clock::time_point prev_timestamp, const utils::chunked_vector& prev_streams, const cdc::stream_id& child); } +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_simple_new_format) { + std::string_view arn = "arn:aws:dynamodb:us-east-1:797456418907:table/ks_space@dynamodb_streams_verification_table_rc/stream/2025-12-18T17:38:48.952"; + + auto parts = alternator::parse_arn(arn, "StreamArn", "stream", "/stream/"); + BOOST_REQUIRE_EQUAL(parts.table_name, "dynamodb_streams_verification_table_rc"); + BOOST_REQUIRE_EQUAL(parts.keyspace_name, "ks_space"); + BOOST_REQUIRE_EQUAL(parts.postfix, "/stream/2025-12-18T17:38:48.952"); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_simple_old_format) { + std::string_view arn = "arn:scylla:service:region:account-id:table/resource"; + + auto parts = alternator::parse_arn(arn, "ResourceArn", "table", ""); + BOOST_REQUIRE_EQUAL(parts.table_name, "resource"); + BOOST_REQUIRE_EQUAL(parts.keyspace_name, "region"); + BOOST_REQUIRE_EQUAL(parts.postfix, ""); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_no_table) { + std::string_view arn = "arn:aws:dynamodb:us-east-1:797456418907:foo/dynamodb_streams_verification_table_rc/stream/2025-12-18T17:38:48.952"; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", ""), alternator::api_error); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_no_keyspace) { + std::string_view arn = "arn:aws:dynamodb:us-east-1:797456418907:table/dynamodb_streams_verification_table_rc"; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", ""), alternator::api_error); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_wrong_postfix) { + std::string_view arn = "arn:aws:dynamodb:us-east-1:797456418907:table/dynamodb_streams_verification_table_rc/stream/2025-12-18T17:38:48.952"; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", "/cakes"), alternator::api_error); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_not_enough_colons_1) { + std::string_view arn = "arn:aws:dynamodb:us-east-1:797456418907"; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", ""), alternator::api_error); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_not_enough_colons_2) { + std::string_view arn = "arn"; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", ""), alternator::api_error); +} + +BOOST_AUTO_TEST_CASE(test_extract_table_name_from_arn_empty) { + std::string_view arn = ""; + + BOOST_REQUIRE_THROW(alternator::parse_arn(arn, "", "", ""), alternator::api_error); +} + static std::map strings { {"", ""}, {"a", "YQ=="},