Compare commits

...

22 Commits

Author SHA1 Message Date
Dani Tweig
5078a054d6 github: bug_report.yml: Improve bug report template structure
V1: Perform a yaml "face lift" on the old bug report md template, making bug reporting more efficient.

Add dedicated textarea fields for problem description and expected behavior
Include pre-filled placeholders to guide issue reporting
Add formatted log output section with shell syntax highlighting

V2: updated the contact details of scylla and performed some code cleanup.
2024-11-14 12:47:06 +02:00
Dani Tweig
8cc510a51b Update bug_report.yml
Perform a yaml "face lift" to the old bug report md template.
Asking to fill in addition to the former data details about reproduction steps and description of the problem.

Making bug reporting more efficient.
2024-11-11 16:03:53 +02:00
Yaron Kaikov
cc71077e33 .github/scripts/label_promoted_commits.py: only match the Close tag in the last line in the commit message
When a backport PR is promoted to the release branch, we automatically close the backport PR (since GitHub will only close the one based on the default branch) and update the labels in the original PRs

In a situation when we have multiple `closes` prefixes, the script will use the first one (which is not the correct one), see 3ddb61c90e

Fixing this by always using the last line with the `closes` prefix

Closes scylladb/scylladb#21498
2024-11-11 11:04:33 +02:00
Dani Tweig
381faa2649 Rename .github/ISSUE_TEMPLATE.md to .github/ISSUE_TEMPLATE/bug_report.yml
GitHub issue template process has changed.
The issue template file should be replaced and renamed.

Closes scylladb/scylladb#21518
2024-11-11 11:00:38 +02:00
Nikita Kurashkin
3032d8ccbf add check to refuse usage of DESC TABLE on a materialized view
Fixes #21026

Closes scylladb/scylladb#21500
2024-11-11 10:23:30 +02:00
Yaron Kaikov
2596d1577b ./github/workflows/add-label-when-promoted.yaml: Run auto-backport only on default branch
In https://github.com/scylladb/scylladb/pull/21496#event-15221789614
```
scylladbbot force-pushed the backport/21459/to-6.1 branch from 414691c to 59a4ccd Compare 2 days ago
```

Backport automation triggered by `push` but also should either start from `master` branch (or `enterprise` branch from Enterprise), we need to verify it by checking also the default branch.

Fixes: https://github.com/scylladb/scylladb/issues/21514

Closes scylladb/scylladb#21515
2024-11-11 09:16:35 +02:00
Pavel Emelyanov
57af69e15f Merge 'Add retries to the S3 client' from Ernest Zaslavsky
1. Add `retry_strategy` interface and default implementation for exponential back-off retry strategy.
2. Add new S3 related errors, also introduce additional errors to describe pure http errors that has no additional information in the body.
3. Add retries to the s3 client, all retries are coordinated by an instance of `retry_strategy`. In a case of error also parse response body in attempt to retrieve additional and more focused error information as suggested by AWS. See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html. Introduce `aws_exception` to carry the original `aws_error`.
4. Discard whatever exception is thrown in `abort_upload` when aborting multipart upload since we don't care about cleanly aborting it since there are other means to clean up dangling parts, for example `rclone cleanup` or S3 bucket's Lifecycle Management Policy.
5. Add tests to cover retries, and retry exhaustion. Also add tests for jumbo upload.
6. Add the S3 proxy which is used to randomly inject retryable S3 errors to test the "retry" part of the S3 client. Switch the `s3_test` to use the S3 proxy. `s3_tests` set afloat `put_object` problem that was causing segmentation when retrying, fixed.
7. Extend the `s3_test` to use both `minio` and `proxy` configurations.
8. Add parameter to the proxy to seed the error injection randomization to make it replayable.

fixes: #20611
fixes: #20613

Closes scylladb/scylladb#21054

* github.com:scylladb/scylladb:
  aws_errors: Make error messages more verbose.
  test: Make the minio proxy randomization re-playable
  test/boost/s3_test: add error injection scenarios to existing test suite
  test: Switch `s3_test` to use proxy
  test: Add more tests
  client: Stop returning error on `DELETE` in multipart upload abortion
  client: Fix sigsegv when retrying
  client: Add retries
  client: Adjust `map_s3_client_exception` to return exception instance
  aws_errors: Change aws_error::parse to return std::optional<>
  aws_errors: Add http errors mapping into aws_error
  client: Add aws_exception mapping
  aws_error: Add `aws_exeption` to carry original `aws_error`
  aws_errors: Add new error codes
  client: Introduce retry strategy
2024-11-11 08:35:55 +03:00
Ernest Zaslavsky
029837a4a1 aws_errors: Make error messages more verbose.
Add more information to the error messages to make the failure reason clearer. Also add tests to check exceptions propagated from s3 client failure.
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
14f3832749 test: Make the minio proxy randomization re-playable
Provide a seed to the proxy randomization, the idea that the `test.py` will initialize the seed from `/dev/urandom` and print the seed when starting, in case some tests failed the dev is supposed to re-play it locally with the same seed (if it didnt repro otherwise) using the `start_s3_proxy.py` and providing it with the aforementioned seed using `--rnd-seed` command line argument
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
0c62635f05 test/boost/s3_test: add error injection scenarios to existing test suite
Add variants of existing S3 tests that route through a proxy instead of connecting directly to MinIO. The proxy allows injecting errors to validate error handling and recovery mechanisms under failure conditions.
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
8919e0abab test: Switch s3_test to use proxy
Switch `s3_test` to use the S3 proxy which is used to randomly inject retryable S3 errors to test the "retry" part of the S3 client.
Fix `put_object` to make it retryable
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
b1e36c868c test: Add more tests
Add tests to cover retries, and retry exhaustion. Also add tests for jumbo upload.
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
7fd1ff8d79 client: Stop returning error on DELETE in multipart upload abortion
Discard whatever exception is thrown in `abort_upload` when aborting multipart upload since we don't care about cleanly aborting it since there are other means to clean up dangling parts, for example `rclone cleanup` or S3 bucket's Lifecycle Management Policy
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
064a239180 client: Fix sigsegv when retrying
Stop moving the `file` into the `make_file_input_stream` since it will try to use it again on retry
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
dc6e4c0d97 client: Add retries
Add retries to the s3 client, all retries are coordinated by an instance of `retry_strategy`. In a case of error also parse response body in attempt to retrieve additional and more focused error information as suggested by AWS. See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html.

Also move the expected http status check to the `make_s3_error_handler` since the http::client::make_request call is done with `nullopt` - we want to manage all the aws errors handling in s3 client to prevent the http client to validate it and fail before we have a chance to analyze the error properly
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
244635ebd8 client: Adjust map_s3_client_exception to return exception instance
"Unfuturize" the `map_s3_client_exception` since the retryable client is going to be implemented using coroutines and no `future` is needed here, just to save unnecessary `co_await` on it
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
bd3d4ed417 aws_errors: Change aws_error::parse to return std::optional<>
Change aws_error::parse to return std::optional<> to signify that no error was found in the response body
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
58decef509 aws_errors: Add http errors mapping into aws_error
Add http errors mapping into aws_error since the retry strategy is going to operate on aws_error and should not be aware of HTTP status codes
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
fa9e8b7ed0 client: Add aws_exception mapping
Map aws_exceptions in `map_s3_client_exception`, will be needed in retryable client calls to remap newly added  AWS errors to `storage_io_error`
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
54e250a6f1 aws_error: Add aws_exeption to carry original aws_error
Add `aws_exeption` to carry original `aws_error` for proper error handling in retryable s3 client
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
e6ff34046f aws_errors: Add new error codes
Add new S3 related errors, also introduce additional errors to describe pure http errors that has no additional information in the body
2024-11-07 21:01:25 +02:00
Ernest Zaslavsky
8dbe351888 client: Introduce retry strategy
Add `retry_strategy` interface and default implementation for exponential back-off retry strategy
2024-11-07 21:01:25 +02:00
21 changed files with 980 additions and 135 deletions

View File

@@ -1,15 +0,0 @@
This is Scylla's bug tracker, to be used for reporting bugs only.
If you have a question about Scylla, and not a bug, please ask it in
our mailing-list at scylladb-dev@googlegroups.com or in our slack channel.
- [] I have read the disclaimer above, and I am reporting a suspected malfunction in Scylla.
*Installation details*
Scylla version (or git commit hash):
Cluster size:
OS (RHEL/CentOS/Ubuntu/AWS AMI):
*Hardware details (for performance issues)* Delete if unneeded
Platform (physical/VM/cloud instance type/docker):
Hardware: sockets= cores= hyperthreading= memory=
Disks: (SSD/HDD, count)

86
.github/ISSUE_TEMPLATE/bug_report.yml vendored Normal file
View File

@@ -0,0 +1,86 @@
name: "Report a bug"
description: "File a bug report."
title: "[Bug]: "
type: "bug"
labels: bug
body:
- type: checkboxes
id: terms
attributes:
label: Code of Conduct
description: "This is Scylla's bug tracker, to be used for reporting bugs only.
If you have a question about Scylla, and not a bug, please ask it in
our forum at https://forum.scylladb.com/ or in our slack channel https://slack.scylladb.com/ "
options:
- label: I have read the disclaimer above and am reporting a suspected malfunction in Scylla.
required: true
- type: input
id: product-version
attributes:
label: product version
description: Scylla version (or git commit hash)
placeholder: ex. scylla-6.1.1
validations:
required: true
- type: input
id: cluster-size
attributes:
label: Cluster Size
validations:
required: true
- type: input
id: os
attributes:
label: OS
placeholder: RHEL/CentOS/Ubuntu/AWS AMI
validations:
required: true
- type: textarea
id: additional-data
attributes:
label: Additional Environmental Data
#description:
placeholder: Add additional data
value: "Platform (physical/VM/cloud instance type/docker):\n
Hardware: sockets= cores= hyperthreading= memory=\n
Disks: (SSD/HDD, count)"
validations:
required: false
- type: textarea
id: reproducer-steps
attributes:
label: Reproduction Steps
placeholder: Describe how to reproduce the problem
value: "The steps to reproduce the problem are:"
validations:
required: true
- type: textarea
id: the-problem
attributes:
label: What is the problem?
placeholder: Describe the problem you found
value: "The problem is that"
validations:
required: true
- type: textarea
id: what-happened
attributes:
label: Expected behavior?
placeholder: Describe what should have happened
value: "I expected that "
validations:
required: true
- type: textarea
id: logs
attributes:
label: Relevant log output
description: Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks.
render: shell

View File

@@ -54,7 +54,8 @@ def main():
# Print commit information
for commit in commits:
print(f'Commit sha is: {commit.sha}')
match = pr_pattern.search(commit.commit.message)
pr_last_line = commit.commit.message.splitlines()[-1]
match = pr_pattern.search(pr_last_line)
if match:
pr_number = int(match.group(1))
if pr_number in processed_prs:

View File

@@ -49,7 +49,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
run: python .github/scripts/label_promoted_commits.py --commits ${{ github.event.before }}..${{ github.sha }} --repository ${{ github.repository }} --ref ${{ github.ref }}
- name: Run auto-backport.py when promotion completed
if: github.event_name == 'push'
if: github.event_name == 'push' && github.ref == 'refs/heads/${{ env.DEFAULT_BRANCH }}'
env:
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
run: python .github/scripts/auto-backport.py --repo ${{ github.repository }} --base-branch ${{ github.ref }} --commits ${{ github.event.before }}..${{ github.sha }}

View File

@@ -1044,8 +1044,9 @@ scylla_core = (['message/messaging_service.cc',
'utils/multiprecision_int.cc',
'utils/gz/crc_combine.cc',
'utils/gz/crc_combine_table.cc',
'utils/s3/client.cc',
'utils/s3/aws_error.cc',
'utils/s3/client.cc',
'utils/s3/retry_strategy.cc',
'gms/version_generator.cc',
'gms/versioned_value.cc',
'gms/gossiper.cc',

View File

@@ -41,6 +41,7 @@
#include "cql3/functions/functions.hh"
#include "types/user.hh"
#include "view_info.hh"
#include "validation.hh"
#include "index/secondary_index_manager.hh"
#include "cql3/functions/user_function.hh"
#include "cql3/functions/user_aggregate.hh"
@@ -255,6 +256,11 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
if (!table) {
throw exceptions::invalid_request_exception(format("Table '{}' not found in keyspace '{}'", name, ks));
}
auto s = validation::validate_column_family(db, ks, name);
if (s->is_view()) {
throw exceptions::invalid_request_exception("Cannot use DESC TABLE on materialized View");
}
auto schema = table->schema();
auto idxs = table->get_index_manager().list_indexes();

View File

@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.host_registry import HostRegistry
from test.pylib.pool import Pool
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
from test.pylib.util import LogPrefixAdapter
@@ -1553,6 +1554,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
await mock_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
minio_uri = "http://" + os.environ[ms.ENV_ADDRESS] + ":" + os.environ[ms.ENV_PORT]
proxy_s3_server = S3ProxyServer(await hosts.lease_host(), 9002, minio_uri, 3, int(time.time()),
LogPrefixAdapter(logging.getLogger('s3_proxy'), {'prefix': 's3_proxy'}))
await proxy_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
console.print_start_blurb()
max_failures = options.max_failures
failed = 0

View File

@@ -20,11 +20,13 @@
#include <seastar/util/closeable.hh>
using namespace seastar;
using namespace std::chrono_literals;
using namespace std::string_view_literals;
enum class failure_policy : uint8_t {
SUCCESS = 0,
RETRYABLE_FAILURE = 1,
NONRETRYABLE_FAILURE = 2,
NEVERENDING_RETRYABLE_FAILURE = 3,
};
static uint16_t get_port() {
@@ -97,23 +99,34 @@ SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_retryable_success) {
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
});
BOOST_REQUIRE_NO_THROW(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure) {
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure_1) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size),
storage_io_error,
[](const storage_io_error& e) { return e.code().value() == EIO; });
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NEVERENDING_RETRYABLE_FAILURE, total_size, memory_size),
storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
});
}
void do_test_client_multipart_upload(failure_policy policy) {
const sstring name(fmt::format("/{}/testobject-{}", "test", ::getpid()));
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure_2) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size), storage_io_error,
[](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
});
}
void do_test_client_multipart_upload(failure_policy policy, bool is_jumbo = false) {
const sstring name(fmt::format("/{}/testobject-{}-{}", "test", is_jumbo ? "jumbo" : "large", ::getpid()));
register_policy(name, policy);
testlog.info("Make client");
semaphore mem(16 << 20);
@@ -121,7 +134,7 @@ void do_test_client_multipart_upload(failure_policy policy) {
auto close_client = deferred_close(*cln);
testlog.info("Upload object");
auto out = output_stream<char>(cln->make_upload_sink(name));
auto out = output_stream<char>(is_jumbo ? cln->make_upload_jumbo_sink(name, 3) : cln->make_upload_sink(name));
auto close_stream = deferred_close(out);
static constexpr unsigned chunk_size = 1000;
@@ -139,13 +152,38 @@ SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_success) {
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_retryable_success) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure_1) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NEVERENDING_RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure) {
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure_2) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO;
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_success) {
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::SUCCESS, true));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_retryable_success) {
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE, true));
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_failure_1) {
BOOST_REQUIRE_EXCEPTION(
do_test_client_multipart_upload(failure_policy::NEVERENDING_RETRYABLE_FAILURE, true), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_failure_2) {
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE, true), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
});
}

View File

@@ -47,35 +47,31 @@ build_xml_response(const std::string& exception, const std::string& message, con
BOOST_AUTO_TEST_CASE(TestXmlErrorPayload) {
std::string message = "Test Message";
std::string requestId = "Request Id";
aws::aws_error error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId));
auto error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId)).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INCOMPLETE_SIGNATURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural));
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural)).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural));
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural)).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
error = aws::aws_error::parse("");
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
BOOST_REQUIRE_EQUAL("", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
auto no_error = aws::aws_error::parse("");
BOOST_REQUIRE_EQUAL(no_error.has_value(), false);
error =
no_error =
aws::aws_error::parse("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
BOOST_REQUIRE_EQUAL("", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
BOOST_REQUIRE_EQUAL(no_error.has_value(), false);
std::string response = " ";
response += build_xml_response("InternalFailure", message, requestId, message_style::singular);
error = aws::aws_error::parse(response);
error = aws::aws_error::parse(response).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
@@ -86,13 +82,13 @@ BOOST_AUTO_TEST_CASE(TestErrorsWithPrefixParse) {
std::string exceptionPrefix = "blahblahblah#";
std::string requestId = "Request Id";
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId));
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId)).value();
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
}
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId));
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId)).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
@@ -102,12 +98,12 @@ BOOST_AUTO_TEST_CASE(TestErrorsWithoutPrefixParse) {
std::string message = "Test Message";
std::string requestId = "Request Id";
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId));
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId)).value();
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
}
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId));
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId)).value();
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);

View File

@@ -31,6 +31,8 @@
#include "sstables/checksum_utils.hh"
#include "gc_clock.hh"
using namespace std::string_view_literals;
// The test can be run on real AWS-S3 bucket. For that, create a bucket with
// permissive enough policy and then run the test with env set respectively
// E.g. like this
@@ -43,7 +45,21 @@
// export AWS_SESSION_TOKEN=${aws_session_token}
// export AWS_DEFAULT_REGION="us-east-2"
s3::endpoint_config_ptr make_minio_config() {
static shared_ptr<s3::client> make_proxy_client(semaphore& mem) {
s3::endpoint_config cfg = {
.port = std::stoul(tests::getenv_safe("PROXY_S3_SERVER_PORT")),
.use_https = false,
.aws = {{
.access_key_id = tests::getenv_safe("AWS_ACCESS_KEY_ID"),
.secret_access_key = tests::getenv_safe("AWS_SECRET_ACCESS_KEY"),
.session_token = ::getenv("AWS_SESSION_TOKEN") ? : "",
.region = ::getenv("AWS_DEFAULT_REGION") ? : "local",
}},
};
return s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_lw_shared<s3::endpoint_config>(std::move(cfg)), mem);
}
static shared_ptr<s3::client> make_minio_client(semaphore& mem) {
s3::endpoint_config cfg = {
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
@@ -54,21 +70,23 @@ s3::endpoint_config_ptr make_minio_config() {
.region = ::getenv("AWS_DEFAULT_REGION") ? : "local",
}},
};
return make_lw_shared<s3::endpoint_config>(std::move(cfg));
return s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_lw_shared<s3::endpoint_config>(std::move(cfg)), mem);
}
using client_maker_function = std::function<shared_ptr<s3::client>(semaphore&)>;
/*
* Tests below expect minio server to be running on localhost
* with the bucket named env['S3_BUCKET_FOR_TEST'] created with
* unrestricted anonymous read-write access
*/
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
void client_put_get_object(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
semaphore mem(16 << 20);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -102,6 +120,14 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
});
}
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object_minio) {
client_put_get_object(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object_proxy) {
client_put_get_object(make_proxy_client);
}
static auto deferred_delete_object(shared_ptr<s3::client> client, sstring name) {
return seastar::defer([client, name] {
testlog.info("Delete object: {}\n", name);
@@ -109,12 +135,12 @@ static auto deferred_delete_object(shared_ptr<s3::client> client, sstring name)
});
}
void do_test_client_multipart_upload(bool with_copy_upload) {
void do_test_client_multipart_upload(const client_maker_function& client_maker, bool with_copy_upload) {
const sstring name(fmt::format("/{}/test{}object-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), with_copy_upload ? "jumbo" : "large", ::getpid()));
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object (with copy = {})\n", with_copy_upload);
@@ -161,21 +187,29 @@ void do_test_client_multipart_upload(bool with_copy_upload) {
}
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload) {
do_test_client_multipart_upload(false);
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_minio) {
do_test_client_multipart_upload(make_minio_client, false);
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload) {
do_test_client_multipart_upload(true);
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_proxy) {
do_test_client_multipart_upload(make_proxy_client, false);
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload_minio) {
do_test_client_multipart_upload(make_minio_client, true);
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload_proxy) {
do_test_client_multipart_upload(make_proxy_client, true);
}
void client_multipart_upload_fallback(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testfbobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
testlog.info("Make client");
semaphore mem(0);
mem.broken(); // so that any attempt to use it throws
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
testlog.info("Upload object");
@@ -197,9 +231,17 @@ SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
BOOST_REQUIRE_EQUAL(to_sstring(std::move(res)), to_sstring(std::move(data)));
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback_minio) {
client_multipart_upload_fallback(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback_proxy) {
client_multipart_upload_fallback(make_proxy_client);
}
using with_remainder_t = bool_class<class with_remainder_tag>;
future<> test_client_upload_file(std::string_view test_name, size_t total_size, size_t memory_size) {
future<> test_client_upload_file(const client_maker_function& client_maker, std::string_view test_name, size_t total_size, size_t memory_size) {
tmpdir tmp;
const auto file_path = tmp.path() / "test";
@@ -231,9 +273,7 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
// 2. upload the file to s3
semaphore mem{memory_size};
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"),
make_minio_config(),
mem);
auto client = client_maker(mem);
co_await client->upload_file(file_path, object_name);
// 3. retrieve the object from s3 and retrieve the object from S3 and
// compare it with the pattern
@@ -262,34 +302,56 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
co_await client->close();
}
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder) {
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder_minio) {
const size_t part_size = 5_MiB;
const size_t total_size = 4 * part_size;
const size_t memory_size = part_size;
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
}
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder) {
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder_proxy) {
const size_t part_size = 5_MiB;
const size_t total_size = 4 * part_size;
const size_t memory_size = part_size;
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
}
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder_minio) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
}
SEASTAR_TEST_CASE(test_client_upload_file_single_part) {
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder_proxy) {
const size_t part_size = 5_MiB;
const size_t remainder_size = part_size / 2;
const size_t total_size = 4 * part_size + remainder_size;
const size_t memory_size = part_size;
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
}
SEASTAR_TEST_CASE(test_client_upload_file_single_part_minio) {
const size_t part_size = 5_MiB;
const size_t total_size = part_size / 2;
const size_t memory_size = part_size;
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
}
SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
SEASTAR_TEST_CASE(test_client_upload_file_single_part_proxy) {
const size_t part_size = 5_MiB;
const size_t total_size = part_size / 2;
const size_t memory_size = part_size;
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
}
void client_readable_file(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testroobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -326,12 +388,20 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
BOOST_REQUIRE_EQUAL(to_sstring(std::move(buf)), sstring("67890ABC"));
}
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_minio) {
client_readable_file(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_proxy) {
client_readable_file(make_proxy_client);
}
void client_readable_file_stream(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/teststreamobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
testlog.info("Make client\n");
semaphore mem(16<<20);
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto cln = client_maker(mem);
auto close_client = deferred_close(*cln);
testlog.info("Put object {}\n", name);
@@ -350,11 +420,20 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
BOOST_REQUIRE_EQUAL(res, sample);
}
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream_minio) {
client_readable_file_stream(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream_proxy) {
client_readable_file_stream(make_proxy_client);
}
void client_put_get_tagging(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testobject-{}",
tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
auto data = sstring("1234567890ABCDEF").release();
client->put_object(name, std::move(data)).get();
@@ -379,6 +458,14 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
}
}
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging_minio) {
client_put_get_tagging(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging_proxy) {
client_put_get_tagging(make_proxy_client);
}
static std::unordered_set<sstring> populate_bucket(shared_ptr<s3::client> client, sstring bucket, sstring prefix, int nr_objects) {
std::unordered_set<sstring> names;
@@ -392,11 +479,11 @@ static std::unordered_set<sstring> populate_bucket(shared_ptr<s3::client> client
return names;
}
SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
void client_list_objects(const client_maker_function& client_maker) {
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
// Put extra object to check list-by-prefix filters it out
@@ -416,11 +503,19 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
BOOST_REQUIRE(names.empty());
}
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_minio) {
client_list_objects(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_proxy) {
client_list_objects(make_proxy_client);
}
void client_list_objects_incomplete(const client_maker_function& client_maker) {
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
semaphore mem(16<<20);
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
populate_bucket(client, bucket, prefix, 8);
@@ -432,3 +527,57 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
auto de = lister.get().get();
close_lister.close_now();
}
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete_minio) {
client_list_objects_incomplete(make_minio_client);
}
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete_proxy) {
client_list_objects_incomplete(make_proxy_client);
}
void client_broken_bucket(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testobject-{}", "NO_BUCKET", ::getpid()));
semaphore mem(16 << 20);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
auto data = sstring("1234567890ABCDEF").release();
BOOST_REQUIRE_EXCEPTION(client->put_object(name, std::move(data)).get(), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 100. Reason: The specified bucket is not valid."sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_client_broken_bucket_minio) {
client_broken_bucket(make_minio_client);
}
void client_missing_prefix(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
semaphore mem(16 << 20);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
BOOST_REQUIRE_EXCEPTION(client->get_object_size(name).get(), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == ENOENT && e.what() == "S3 request failed. Code: 117. Reason: HTTP code: 404 Not Found"sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_client_missing_prefix_minio) {
client_missing_prefix(make_minio_client);
}
void client_access_missing_object(const client_maker_function& client_maker) {
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
semaphore mem(16 << 20);
auto client = client_maker(mem);
auto close_client = deferred_close(*client);
BOOST_REQUIRE_EXCEPTION(client->get_object_tagging(name).get(), storage_io_error, [](const storage_io_error& e) {
return e.code().value() == ENOENT && e.what() == "S3 request failed. Code: 133. Reason: The specified key does not exist."sv;
});
}
SEASTAR_THREAD_TEST_CASE(test_client_access_missing_object_minio) {
client_access_missing_object(make_minio_client);
}

View File

@@ -1399,7 +1399,6 @@ def test_view_forbids_write(cql, mv1):
# "TABLE" in their name - DROP TABLE, ALTER TABLE, and DESC TABLE. There
# are identical operations with "MATERIALIZED VIEW" in their name, which
# should be used instead.
@pytest.mark.xfail(reason="issue #21026")
def test_view_forbids_table_ops(cql, mv1):
with pytest.raises(InvalidRequest, match='Cannot use'):
cql.execute(f'DROP TABLE {mv1}')
@@ -1409,7 +1408,7 @@ def test_view_forbids_table_ops(cql, mv1):
# that DESC TABLE cannot be used on a view and that DESC MATERIALIZED VIEW
# should be used instead - it just reports that the table is "not found".
# Reproduces #21026 (DESC TABLE was allowed on a view):
with pytest.raises(InvalidRequest):
with pytest.raises(InvalidRequest, match='Cannot use'):
cql.execute(f'DESC TABLE {mv1}')
# A materialized view cannot have its own materialized views, nor secondary

256
test/pylib/s3_proxy.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/python3
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
# S3 proxy server to inject retryable errors for fuzzy testing.
import os
import random
import sys
import asyncio
import requests
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import urlparse, parse_qs
import uuid
from functools import partial
from collections import OrderedDict
from requests import Response
from typing_extensions import Optional
sys.path.insert(0, os.path.dirname(__file__))
class Policy:
def __init__(self, max_retries: int):
self.should_forward: bool = True
self.should_fail: bool = False
self.error_count: int = 0
self.max_errors: int = random.choice(list(range(1, max_retries)))
class LRUCache:
lock = threading.Lock()
def __init__(self, capacity: int):
self.cache = OrderedDict()
self.capacity = capacity
def get(self, key: str) -> Optional[Policy]:
with self.lock:
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key: str, value: Policy) -> None:
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
def true_or_false():
return random.choice([True, False])
class InjectingHandler(BaseHTTPRequestHandler):
retryable_codes = list((408, 419, 429, 440)) + list(range(500, 599))
error_names = list(("InternalFailureException",
"InternalFailure",
"InternalServerError",
"InternalError",
"RequestExpiredException",
"RequestExpired",
"ServiceUnavailableException",
"ServiceUnavailableError",
"ServiceUnavailable",
"RequestThrottledException",
"RequestThrottled",
"ThrottlingException",
"ThrottledException",
"Throttling",
"SlowDownException",
"SlowDown",
"RequestTimeTooSkewedException",
"RequestTimeTooSkewed",
"RequestTimeoutException",
"RequestTimeout"))
def __init__(self, policies, logger, minio_uri, max_retries, *args, **kwargs):
self.minio_uri = minio_uri
self.policies = policies
self.logger = logger
self.max_retries = max_retries
super().__init__(*args, **kwargs)
self.close_connection = False
def log_error(self, format, *args):
if self.logger:
self.logger.info("%s - - [%s] %s\n" %
(self.client_address[0],
self.log_date_time_string(),
format % args))
else:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def log_message(self, format, *args):
# Just don't be too verbose
if not self.logger:
sys.stderr.write("%s - - [%s] %s\n" %
(self.address_string(),
self.log_date_time_string(),
format % args))
def parsed_qs(self):
parsed_url = urlparse(self.path)
query_components = parse_qs(parsed_url.query)
for key in parsed_url.query.split('&'):
if '=' not in key:
query_components[key] = ['']
return query_components
def get_policy(self):
policy = self.policies.get(self.path)
if policy is None:
policy = Policy(self.max_retries)
policy.should_forward = true_or_false()
if policy.should_forward:
policy.should_fail = true_or_false()
else:
policy.should_fail = True
self.policies.put(self.path, policy)
# Unfortunately MPU completion retry on already completed upload would introduce flakiness to unit tests, for example `s3_test`
if self.command == "POST" and "uploadId" in self.parsed_qs():
policy.should_forward = not policy.should_fail
return policy
def get_retryable_http_codes(self):
return random.choice(self.retryable_codes), random.choice(self.error_names)
def respond_with_error(self):
code, error_name = self.get_retryable_http_codes()
self.send_response(code)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Connection', 'keep-alive')
req_uuid = str(uuid.uuid4())
response = f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>{error_name}</Code>
<Message>Minio proxy injected error. Client should retry.</Message>
<RequestId>{req_uuid}</RequestId>
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
</Error>""".encode('utf-8')
self.send_header('Content-Length', str(len(response)))
self.end_headers()
self.wfile.write(response)
def process_request(self):
try:
policy = self.get_policy()
if policy.error_count >= policy.max_errors:
policy.should_fail = False
policy.should_forward = True
response = Response()
body = None
content_length = self.headers['Content-Length']
if content_length:
body = self.rfile.read(int(content_length))
if policy.should_forward:
target_url = self.minio_uri + self.path
headers = {key: value for key, value in self.headers.items()}
response = requests.request(self.command, target_url, headers=headers, data=body)
if policy.should_fail:
policy.error_count += 1
self.respond_with_error()
else:
self.send_response(response.status_code)
for key, value in response.headers.items():
if key.upper() != 'CONTENT-LENGTH':
self.send_header(key, value)
if self.command == 'HEAD':
self.send_header("Content-Length", response.headers['Content-Length'])
else:
self.send_header("Content-Length", str(len(response.content)))
self.end_headers()
self.wfile.write(response.content)
except Exception as e:
print(e)
def do_GET(self):
self.process_request()
def do_POST(self):
self.process_request()
def do_PUT(self):
self.process_request()
def do_DELETE(self):
self.process_request()
def do_HEAD(self):
self.process_request()
# Proxy server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py`. In
# addition, it is possible just to start this server using another script - `start_s3_proxy.py` to run it locally to
# provide proxy between tests and minio
class S3ProxyServer:
def __init__(self, host: str, port: int, minio_uri: str, max_retries: int, seed: int, logger=None):
print(f'Setting minio proxy random seed to {seed}')
random.seed(seed)
self.req_states = LRUCache(10000)
handler = partial(InjectingHandler, self.req_states, logger, minio_uri, max_retries)
self.server = ThreadingHTTPServer((host, port), handler)
self.server_thread = None
self.server.request_queue_size = 1000
self.server.timeout = 10000
self.server.socket.settimeout(10000)
self.server.socket.listen(1000)
self.is_running = False
os.environ['PROXY_S3_SERVER_PORT'] = f'{port}'
os.environ['PROXY_S3_SERVER_HOST'] = host
async def start(self):
if not self.is_running:
print(f'Starting S3 proxy server on {self.server.server_address}')
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
async def stop(self):
if self.is_running:
print('Stopping S3 proxy server')
self.server.shutdown()
await self.server_thread
self.is_running = False
async def run(self):
try:
await self.start()
while self.is_running:
await asyncio.sleep(1)
except Exception as e:
print(f"Server error: {e}")
await self.stop()

View File

@@ -25,6 +25,7 @@ class Policy(Enum):
SUCCESS = 0
RETRYABLE_FAILURE = 1
NONRETRYABLE_FAILURE = 2
NEVERENDING_RETRYABLE_FAILURE = 3
class LRUCache:
@@ -113,7 +114,6 @@ class InjectingHandler(BaseHTTPRequestHandler):
put_data = self.rfile.read(int(content_length))
self.send_response(200)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Content-Length', '0')
self.send_header('Connection', 'keep-alive')
query_components = self.parsed_qs()
if 'Key' in query_components and 'Policy' in query_components:
@@ -122,11 +122,28 @@ class InjectingHandler(BaseHTTPRequestHandler):
self.send_header('ETag', "SomeTag_" + query_components.get("partNumber")[0])
else:
self.send_header('ETag', "SomeTag")
if self.headers['x-amz-copy-source']:
response_body = """<CopyPartResult>
<LastModified>2011-04-11T20:34:56.000Z</LastModified>
<ETag>"9b2cf535f27731c974343645a3985328"</ETag>
</CopyPartResult>""".encode('utf-8')
self.send_header('Content-Length', str(len(response_body)))
self.end_headers()
self.wfile.write(response_body)
return
self.send_header('Content-Length', '0')
self.end_headers()
# Processes DELETE method, does nothing except providing in the response expected headers and response code
def do_DELETE(self):
self.send_response(204)
query_components = self.parsed_qs()
if 'uploadId' in query_components and self.policies.get(
urlparse(self.path).path) == Policy.NONRETRYABLE_FAILURE:
self.send_response(404)
else:
self.send_response(204)
self.send_header('Content-Type', 'text/plain; charset=utf-8')
self.send_header('Content-Length', '0')
self.send_header('Connection', 'keep-alive')
@@ -153,9 +170,10 @@ class InjectingHandler(BaseHTTPRequestHandler):
<Key>Example-Object</Key>
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
</CompleteMultipartUploadResult>"""
case Policy.RETRYABLE_FAILURE:
# should succeed on retry
self.policies.put(path, Policy.SUCCESS)
case Policy.RETRYABLE_FAILURE | Policy.NEVERENDING_RETRYABLE_FAILURE:
if self.policies.get(path) == Policy.RETRYABLE_FAILURE:
# should succeed on retry
self.policies.put(path, Policy.SUCCESS)
return """<?xml version="1.0" encoding="UTF-8"?>
<Error>

28
test/pylib/start_s3_proxy.py Executable file
View File

@@ -0,0 +1,28 @@
#!/usr/bin/python3
import asyncio
import signal
import argparse
import time
from s3_proxy import S3ProxyServer
async def run():
parser = argparse.ArgumentParser(description="Start S3 proxy server")
parser.add_argument('--host', default='127.0.0.1')
parser.add_argument('--port', type=int, default=9002)
parser.add_argument('--minio-uri', default="http://127.0.0.1:9000")
parser.add_argument('--max-retries', type=int, default=5)
parser.add_argument('--rnd-seed', type=int, default=int(time.time()))
args = parser.parse_args()
server = S3ProxyServer(args.host, args.port, args.minio_uri, args.max_retries, args.rnd_seed)
print('Starting S3 proxy server')
await server.start()
signal.sigwait({signal.SIGINT, signal.SIGTERM})
print('Stopping S3 proxy server')
await server.stop()
if __name__ == '__main__':
asyncio.run(run())

View File

@@ -47,7 +47,8 @@ target_sources(utils
uuid.cc
aws_sigv4.cc
s3/aws_error.cc
s3/client.cc)
s3/client.cc
s3/retry_strategy.cc)
target_include_directories(utils
PUBLIC
${CMAKE_SOURCE_DIR}

View File

@@ -24,11 +24,9 @@ aws_error::aws_error(aws_error_type error_type, std::string&& error_message, ret
: _type(error_type), _message(std::move(error_message)), _is_retryable(is_retryable) {
}
aws_error aws_error::parse(seastar::sstring&& body) {
aws_error ret_val;
std::optional<aws_error> aws_error::parse(seastar::sstring&& body) {
if (body.empty()) {
return ret_val;
return {};
}
auto doc = std::make_unique<rapidxml::xml_document<>>();
@@ -36,7 +34,7 @@ aws_error aws_error::parse(seastar::sstring&& body) {
doc->parse<0>(body.data());
} catch (const rapidxml::parse_error&) {
// Most likely not an XML which is possible, just return
return ret_val;
return {};
}
const auto* error_node = doc->first_node("Error");
@@ -48,11 +46,12 @@ aws_error aws_error::parse(seastar::sstring&& body) {
}
if (!error_node) {
return ret_val;
return {};
}
const auto* code_node = error_node->first_node("Code");
const auto* message_node = error_node->first_node("Message");
aws_error ret_val;
if (code_node && message_node) {
std::string code = code_node->value();
@@ -75,6 +74,58 @@ aws_error aws_error::parse(seastar::sstring&& body) {
return ret_val;
}
aws_error aws_error::from_http_code(seastar::http::reply::status_type http_code) {
const auto& all_errors = get_errors();
aws_error ret_val;
switch (http_code) {
case seastar::http::reply::status_type::unauthorized:
ret_val = all_errors.at("HTTP_UNAUTHORIZED");
break;
case seastar::http::reply::status_type::forbidden:
ret_val = all_errors.at("HTTP_FORBIDDEN");
break;
case seastar::http::reply::status_type::not_found:
ret_val = all_errors.at("HTTP_NOT_FOUND");
break;
case seastar::http::reply::status_type::too_many_requests:
ret_val = all_errors.at("HTTP_TOO_MANY_REQUESTS");
break;
case seastar::http::reply::status_type::internal_server_error:
ret_val = all_errors.at("HTTP_INTERNAL_SERVER_ERROR");
break;
case seastar::http::reply::status_type::bandwidth_limit_exceeded:
ret_val = all_errors.at("HTTP_BANDWIDTH_LIMIT_EXCEEDED");
break;
case seastar::http::reply::status_type::service_unavailable:
ret_val = all_errors.at("HTTP_SERVICE_UNAVAILABLE");
break;
case seastar::http::reply::status_type::request_timeout:
ret_val = all_errors.at("HTTP_REQUEST_TIMEOUT");
break;
case seastar::http::reply::status_type::page_expired:
ret_val = all_errors.at("HTTP_PAGE_EXPIRED");
break;
case seastar::http::reply::status_type::login_timeout:
ret_val = all_errors.at("HTTP_LOGIN_TIMEOUT");
break;
case seastar::http::reply::status_type::gateway_timeout:
ret_val = all_errors.at("HTTP_GATEWAY_TIMEOUT");
break;
case seastar::http::reply::status_type::network_connect_timeout:
ret_val = all_errors.at("HTTP_NETWORK_CONNECT_TIMEOUT");
break;
case seastar::http::reply::status_type::network_read_timeout:
ret_val = all_errors.at("HTTP_NETWORK_READ_TIMEOUT");
break;
default:
ret_val = {aws_error_type::UNKNOWN,
"Unknown server error has been encountered.",
retryable{seastar::http::reply::classify_status(http_code) == seastar::http::reply::status_class::server_error}};
}
ret_val._message = seastar::format("{} HTTP code: {}", ret_val._message, http_code);
return ret_val;
}
const aws_errors& aws_error::get_errors() {
static const std::unordered_map<std::string_view, const aws_error> aws_error_map{
{"IncompleteSignature", aws_error(aws_error_type::INCOMPLETE_SIGNATURE, retryable::no)},
@@ -132,7 +183,29 @@ const aws_errors& aws_error::get_errors() {
{"RequestTimeTooSkewedException", aws_error(aws_error_type::REQUEST_TIME_TOO_SKEWED, retryable::yes)},
{"RequestTimeTooSkewed", aws_error(aws_error_type::REQUEST_TIME_TOO_SKEWED, retryable::yes)},
{"RequestTimeoutException", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)},
{"RequestTimeout", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)}};
{"RequestTimeout", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)},
{"HTTP_UNAUTHORIZED", aws_error(aws_error_type::HTTP_UNAUTHORIZED, retryable::no)},
{"HTTP_FORBIDDEN", aws_error(aws_error_type::HTTP_FORBIDDEN, retryable::no)},
{"HTTP_NOT_FOUND", aws_error(aws_error_type::HTTP_NOT_FOUND, retryable::no)},
{"HTTP_TOO_MANY_REQUESTS", aws_error(aws_error_type::HTTP_TOO_MANY_REQUESTS, retryable::yes)},
{"HTTP_INTERNAL_SERVER_ERROR", aws_error(aws_error_type::HTTP_INTERNAL_SERVER_ERROR, retryable::yes)},
{"HTTP_BANDWIDTH_LIMIT_EXCEEDED", aws_error(aws_error_type::HTTP_BANDWIDTH_LIMIT_EXCEEDED, retryable::yes)},
{"HTTP_SERVICE_UNAVAILABLE", aws_error(aws_error_type::HTTP_SERVICE_UNAVAILABLE, retryable::yes)},
{"HTTP_REQUEST_TIMEOUT", aws_error(aws_error_type::HTTP_REQUEST_TIMEOUT, retryable::yes)},
{"HTTP_PAGE_EXPIRED", aws_error(aws_error_type::HTTP_PAGE_EXPIRED, retryable::yes)},
{"HTTP_LOGIN_TIMEOUT", aws_error(aws_error_type::HTTP_LOGIN_TIMEOUT, retryable::yes)},
{"HTTP_GATEWAY_TIMEOUT", aws_error(aws_error_type::HTTP_GATEWAY_TIMEOUT, retryable::yes)},
{"HTTP_NETWORK_CONNECT_TIMEOUT", aws_error(aws_error_type::HTTP_NETWORK_CONNECT_TIMEOUT, retryable::yes)},
{"HTTP_NETWORK_READ_TIMEOUT", aws_error(aws_error_type::HTTP_NETWORK_READ_TIMEOUT, retryable::yes)},
{"NoSuchUpload", aws_error(aws_error_type::NO_SUCH_UPLOAD, retryable::no)},
{"BucketAlreadyOwnedByYou", aws_error(aws_error_type::BUCKET_ALREADY_OWNED_BY_YOU, retryable::no)},
{"ObjectAlreadyInActiveTierError", aws_error(aws_error_type::OBJECT_ALREADY_IN_ACTIVE_TIER, retryable::no)},
{"NoSuchBucket", aws_error(aws_error_type::NO_SUCH_BUCKET, retryable::no)},
{"NoSuchKey", aws_error(aws_error_type::NO_SUCH_KEY, retryable::no)},
{"ObjectNotInActiveTierError", aws_error(aws_error_type::OBJECT_NOT_IN_ACTIVE_TIER, retryable::no)},
{"BucketAlreadyExists", aws_error(aws_error_type::BUCKET_ALREADY_EXISTS, retryable::no)},
{"InvalidObjectState", aws_error(aws_error_type::INVALID_OBJECT_STATE, retryable::no)}};
return aws_error_map;
}
} // namespace aws

View File

@@ -9,6 +9,7 @@
#pragma once
#include <seastar/core/sstring.hh>
#include <seastar/http/reply.hh>
#include <seastar/util/bool_class.hh>
#include <string>
#include <string_view>
@@ -49,8 +50,32 @@ enum class aws_error_type : uint8_t {
CLIENT_SIGNING_FAILURE = 101,
USER_CANCELLED = 102,
ENDPOINT_RESOLUTION_FAILURE = 103,
// HTTP errors without additional information in the response body
HTTP_UNAUTHORIZED = 115,
HTTP_FORBIDDEN = 116,
HTTP_NOT_FOUND = 117,
HTTP_TOO_MANY_REQUESTS = 118,
HTTP_INTERNAL_SERVER_ERROR = 119,
HTTP_BANDWIDTH_LIMIT_EXCEEDED = 120,
HTTP_SERVICE_UNAVAILABLE = 121,
HTTP_REQUEST_TIMEOUT = 122,
HTTP_PAGE_EXPIRED = 123,
HTTP_LOGIN_TIMEOUT = 124,
HTTP_GATEWAY_TIMEOUT = 125,
HTTP_NETWORK_CONNECT_TIMEOUT = 126,
HTTP_NETWORK_READ_TIMEOUT = 127,
SERVICE_EXTENSION_START_RANGE = 128,
OK = 255 // No error set
// S3 specific
BUCKET_ALREADY_EXISTS = 129,
BUCKET_ALREADY_OWNED_BY_YOU = 130,
INVALID_OBJECT_STATE = 131,
NO_SUCH_BUCKET = 132,
NO_SUCH_KEY = 133,
NO_SUCH_UPLOAD = 134,
OBJECT_ALREADY_IN_ACTIVE_TIER = 135,
OBJECT_NOT_IN_ACTIVE_TIER = 136,
// No error set
OK = 255
};
class aws_error;
@@ -69,8 +94,28 @@ public:
[[nodiscard]] const std::string& get_error_message() const { return _message; }
[[nodiscard]] aws_error_type get_error_type() const { return _type; }
[[nodiscard]] retryable is_retryable() const { return _is_retryable; }
static aws_error parse(seastar::sstring&& body);
static std::optional<aws_error> parse(seastar::sstring&& body);
static aws_error from_http_code(seastar::http::reply::status_type http_code);
static const aws_errors& get_errors();
};
class aws_exception : public std::exception {
aws_error _error;
public:
explicit aws_exception(const aws_error& error) noexcept : _error(error) {}
explicit aws_exception(aws_error&& error) noexcept : _error(std::move(error)) {}
const char* what() const noexcept override { return _error.get_error_message().c_str(); }
const aws_error& error() const noexcept { return _error; }
};
} // namespace aws
template <>
struct fmt::formatter<aws::aws_error_type> : fmt::formatter<string_view> {
auto format(const aws::aws_error_type& error_type, fmt::format_context& ctx) const {
return fmt::format_to(ctx.out(), "{}", static_cast<unsigned>(error_type));
}
};

View File

@@ -24,6 +24,7 @@
#include <seastar/core/iostream.hh>
#include <seastar/core/on_internal_error.hh>
#include <seastar/core/pipe.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/units.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/coroutine/exception.hh>
@@ -81,20 +82,12 @@ future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_) {
co_await util::skip_entire_stream(in);
}
static future<> look_for_errors(const http::reply&, input_stream<char>&& in_) {
auto in = std::move(in_);
auto body = co_await util::read_entire_stream_contiguous(in);
auto possible_error = aws::aws_error::parse(std::move(body));
if (possible_error.get_error_type() != aws::aws_error_type::OK) {
throw std::system_error(std::error_code(EIO, std::generic_category()), possible_error.get_error_message());
}
}
client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag)
client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<aws::retry_strategy> rs)
: _host(std::move(host))
, _cfg(std::move(cfg))
, _gf(std::move(gf))
, _memory(mem)
, _retry_strategy(std::move(rs))
{
}
@@ -214,50 +207,100 @@ inline bool is_redirect_status(http::reply::status_type st) {
return st_i >= 300 && st_i < 400;
}
future<> map_s3_client_exception(std::exception_ptr ex) {
storage_io_error map_s3_client_exception(std::exception_ptr ex) {
seastar::memory::scoped_critical_alloc_section alloc;
try {
std::rethrow_exception(std::move(ex));
} catch (const aws::aws_exception& e) {
int error_code;
switch (e.error().get_error_type()) {
case aws::aws_error_type::HTTP_NOT_FOUND:
case aws::aws_error_type::RESOURCE_NOT_FOUND:
case aws::aws_error_type::NO_SUCH_BUCKET:
case aws::aws_error_type::NO_SUCH_KEY:
case aws::aws_error_type::NO_SUCH_UPLOAD:
error_code = ENOENT;
break;
case aws::aws_error_type::HTTP_FORBIDDEN:
case aws::aws_error_type::HTTP_UNAUTHORIZED:
case aws::aws_error_type::ACCESS_DENIED:
error_code = EACCES;
break;
default:
error_code = EIO;
}
return {error_code, format("S3 request failed. Code: {}. Reason: {}", e.error().get_error_type(), e.what())};
} catch (const httpd::unexpected_status_error& e) {
auto status = e.status();
if (is_redirect_status(status) || status == http::reply::status_type::not_found) {
return make_exception_future<>(storage_io_error(ENOENT, format("S3 object doesn't exist ({})", status)));
return {ENOENT, format("S3 object doesn't exist ({})", status)};
}
if (status == http::reply::status_type::forbidden || status == http::reply::status_type::unauthorized) {
return make_exception_future<>(storage_io_error(EACCES, format("S3 access denied ({})", status)));
return {EACCES, format("S3 access denied ({})", status)};
}
return make_exception_future<>(storage_io_error(EIO, format("S3 request failed with ({})", status)));
return {EIO, format("S3 request failed with ({})", status)};
} catch (...) {
auto e = std::current_exception();
return make_exception_future<>(storage_io_error(EIO, format("S3 error ({})", e)));
return {EIO, format("S3 error ({})", e)};
}
}
future<> client::do_make_request(group_client& gc, http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected_opt, seastar::abort_source* as) {
// TODO: the http client does not check abort status on entry, and if
// we're already aborted when we get here we will paradoxally not be
future<> client::do_retryable_request(group_client& gc, http::request req, http::experimental::client::reply_handler handler, seastar::abort_source* as) const {
// TODO: the http client does not check abort status on entry, and if
// we're already aborted when we get here we will paradoxally not be
// interrupted, because no registration etc will be done. So do a quick
// preemptive check already.
if (as && as->abort_requested()) {
return make_exception_future<>(as->abort_requested_exception_ptr());
co_await coroutine::return_exception_ptr(as->abort_requested_exception_ptr());
}
uint32_t retries = 0;
std::exception_ptr e;
aws::aws_exception request_ex{aws::aws_error{aws::aws_error_type::OK, aws::retryable::yes}};
while (true) {
try {
e = {};
co_return co_await (as ? gc.http.make_request(req, handler, *as, std::nullopt) : gc.http.make_request(req, handler, std::nullopt));
} catch (const aws::aws_exception& ex) {
e = std::current_exception();
request_ex = ex;
}
if (!_retry_strategy->should_retry(request_ex.error(), retries)) {
break;
}
co_await seastar::sleep(_retry_strategy->delay_before_retry(request_ex.error(), retries));
++retries;
}
if (e) {
throw map_s3_client_exception(e);
}
auto expected = expected_opt.value_or(http::reply::status_type::ok);
return (as
? gc.http.make_request(std::move(req), std::move(handle), *as, expected)
: gc.http.make_request(std::move(req), std::move(handle), expected)
).handle_exception([] (auto ex) {
return map_s3_client_exception(std::move(ex));
});
}
future<> client::make_request(http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected, seastar::abort_source* as) {
authorize(req);
auto& gc = find_or_create_client();
return do_make_request(gc, std::move(req), std::move(handle), expected, as);
return do_retryable_request(
gc, std::move(req), [handler = std::move(handle), expected = expected.value_or(http::reply::status_type::ok)](const http::reply& rep, input_stream<char>&& in) mutable -> future<> {
auto payload = std::move(in);
auto status_class = http::reply::classify_status(rep._status);
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
if (possible_error) {
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
}
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
}
if (rep._status != expected) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
co_await handler(rep, std::move(payload));
}, as);
}
future<> client::make_request(http::request req, reply_handler_ext handle_ex, std::optional<http::reply::status_type> expected, seastar::abort_source* as) {
@@ -266,7 +309,7 @@ future<> client::make_request(http::request req, reply_handler_ext handle_ex, st
auto handle = [&gc, handle = std::move(handle_ex)] (const http::reply& rep, input_stream<char>&& in) {
return handle(gc, rep, std::move(in));
};
return do_make_request(gc, std::move(req), std::move(handle), expected, as);
return make_request(std::move(req), std::move(handle), expected, as);
}
future<> client::get_object_header(sstring object_name, http::experimental::client::reply_handler handler, seastar::abort_source* as) {
@@ -714,7 +757,13 @@ future<> client::multipart_upload::abort_upload() {
s3l.trace("DELETE upload {}", _upload_id);
auto req = http::request::make("DELETE", _client->_host, _object_name);
req.query_parameters["uploadId"] = std::exchange(_upload_id, ""); // now upload_started() returns false
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content)
.handle_exception([this](const std::exception_ptr& ex) -> future<> {
// Here we discard whatever exception is thrown when aborting multipart upload since we don't care about cleanly aborting it since there are other
// means to clean up dangling parts, for example `rclone cleanup` or S3 bucket's Lifecycle Management Policy
s3l.warn("Failed to abort multipart upload. Object: '{}'. Reason: {})", _object_name, ex);
co_return;
});
}
future<> client::multipart_upload::finalize_upload() {
@@ -734,7 +783,24 @@ future<> client::multipart_upload::finalize_upload() {
});
// If this request fails, finalize_upload() throws, the upload should then
// be aborted in .close() method
co_await _client->make_request(std::move(req), look_for_errors);
co_await _client->make_request(std::move(req), [](const http::reply& rep, input_stream<char>&& in) -> future<> {
auto payload = std::move(in);
auto status_class = http::reply::classify_status(rep._status);
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
if (possible_error) {
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
}
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
}
if (rep._status != http::reply::status_type::ok) {
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
}
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
// this point it is ok since we are not interested in parsing this particular response
});
_upload_id = ""; // now upload_started() returns false
}
@@ -1057,8 +1123,8 @@ class client::do_upload_file : private multipart_upload {
if (_tag) {
req._headers["x-amz-tagging"] = seastar::format("{}={}", _tag->key, _tag->value);
}
req.write_body("bin", len, [f = std::move(f), &progress = _progress] (output_stream<char>&& out_) mutable {
auto input = make_file_input_stream(std::move(f), input_stream_options());
req.write_body("bin", len, [f = std::move(f), &progress = _progress] (output_stream<char>&& out_) {
auto input = make_file_input_stream(f, input_stream_options());
auto output = std::move(out_);
return copy_to(std::move(input), std::move(output), _transmit_size, progress);
});

View File

@@ -17,6 +17,7 @@
#include <filesystem>
#include "utils/lister.hh"
#include "utils/s3/creds.hh"
#include "retry_strategy.hh"
#include "utils/s3/client_fwd.hh"
using namespace seastar;
@@ -77,6 +78,7 @@ class client : public enable_shared_from_this<client> {
using global_factory = std::function<shared_ptr<client>(std::string)>;
global_factory _gf;
semaphore& _memory;
std::unique_ptr<aws::retry_strategy> _retry_strategy;
struct private_tag {};
@@ -87,12 +89,11 @@ class client : public enable_shared_from_this<client> {
future<> make_request(http::request req, http::experimental::client::reply_handler handle = ignore_reply, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
using reply_handler_ext = noncopyable_function<future<>(group_client&, const http::reply&, input_stream<char>&& body)>;
future<> make_request(http::request req, reply_handler_ext handle, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
future<> do_make_request(group_client&, http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
future<> do_retryable_request(group_client& gc, http::request req, http::experimental::client::reply_handler handler, seastar::abort_source* as = nullptr) const;
future<> get_object_header(sstring object_name, http::experimental::client::reply_handler handler, seastar::abort_source* = nullptr);
public:
explicit client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag);
client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<aws::retry_strategy> rs = std::make_unique<aws::default_retry_strategy>());
static shared_ptr<client> make(std::string endpoint, endpoint_config_ptr cfg, semaphore& memory, global_factory gf = {});
future<uint64_t> get_object_size(sstring object_name, seastar::abort_source* = nullptr);

View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "retry_strategy.hh"
#include "aws_error.hh"
#include "utils/log.hh"
using namespace std::chrono_literals;
namespace aws {
static logging::logger rs_logger("default_retry_strategy");
default_retry_strategy::default_retry_strategy(unsigned max_retries, unsigned scale_factor) : _max_retries(max_retries), _scale_factor(scale_factor) {
}
bool default_retry_strategy::should_retry(const aws_error& error, unsigned attempted_retries) const {
if (attempted_retries >= _max_retries) {
rs_logger.error("Retries exhausted. Retry# {}", attempted_retries);
return false;
}
bool should_retry = error.is_retryable() == retryable::yes;
if (should_retry) {
rs_logger.debug("S3 client request failed. Reason: {}. Retry# {}", error.get_error_message(), attempted_retries);
} else {
rs_logger.error("S3 client encountered non-retryable error. Reason: {}. Code: {}. Retry# {}",
error.get_error_message(),
std::to_underlying(error.get_error_type()),
attempted_retries);
}
return should_retry;
}
std::chrono::milliseconds default_retry_strategy::delay_before_retry(const aws_error&, unsigned attempted_retries) const {
if (attempted_retries == 0) {
return 0ms;
}
return std::chrono::milliseconds((1UL << attempted_retries) * _scale_factor);
}
} // namespace aws

View File

@@ -0,0 +1,42 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <chrono>
namespace aws {
class aws_error;
class retry_strategy {
public:
virtual ~retry_strategy() = default;
// Returns true if the error can be retried given the error and the number of times already tried.
[[nodiscard]] virtual bool should_retry(const aws_error& error, unsigned attempted_retries) const = 0;
// Calculates the time in milliseconds the client should wait before attempting another request based on the error and attemptedRetries count.
[[nodiscard]] virtual std::chrono::milliseconds delay_before_retry(const aws_error& error, unsigned attempted_retries) const = 0;
[[nodiscard]] virtual unsigned get_max_retries() const = 0;
};
class default_retry_strategy : public retry_strategy {
unsigned _max_retries;
unsigned _scale_factor;
public:
explicit default_retry_strategy(unsigned max_retries = 10, unsigned scale_factor = 25);
[[nodiscard]] bool should_retry(const aws_error& error, unsigned attempted_retries) const override;
[[nodiscard]] std::chrono::milliseconds delay_before_retry(const aws_error& error, unsigned attempted_retries) const override;
[[nodiscard]] unsigned get_max_retries() const override { return _max_retries; }
};
} // namespace aws