Compare commits
22 Commits
dani-tweig
...
dani-tweig
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5078a054d6 | ||
|
|
8cc510a51b | ||
|
|
cc71077e33 | ||
|
|
381faa2649 | ||
|
|
3032d8ccbf | ||
|
|
2596d1577b | ||
|
|
57af69e15f | ||
|
|
029837a4a1 | ||
|
|
14f3832749 | ||
|
|
0c62635f05 | ||
|
|
8919e0abab | ||
|
|
b1e36c868c | ||
|
|
7fd1ff8d79 | ||
|
|
064a239180 | ||
|
|
dc6e4c0d97 | ||
|
|
244635ebd8 | ||
|
|
bd3d4ed417 | ||
|
|
58decef509 | ||
|
|
fa9e8b7ed0 | ||
|
|
54e250a6f1 | ||
|
|
e6ff34046f | ||
|
|
8dbe351888 |
15
.github/ISSUE_TEMPLATE.md
vendored
15
.github/ISSUE_TEMPLATE.md
vendored
@@ -1,15 +0,0 @@
|
||||
This is Scylla's bug tracker, to be used for reporting bugs only.
|
||||
If you have a question about Scylla, and not a bug, please ask it in
|
||||
our mailing-list at scylladb-dev@googlegroups.com or in our slack channel.
|
||||
|
||||
- [] I have read the disclaimer above, and I am reporting a suspected malfunction in Scylla.
|
||||
|
||||
*Installation details*
|
||||
Scylla version (or git commit hash):
|
||||
Cluster size:
|
||||
OS (RHEL/CentOS/Ubuntu/AWS AMI):
|
||||
|
||||
*Hardware details (for performance issues)* Delete if unneeded
|
||||
Platform (physical/VM/cloud instance type/docker):
|
||||
Hardware: sockets= cores= hyperthreading= memory=
|
||||
Disks: (SSD/HDD, count)
|
||||
86
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
Normal file
86
.github/ISSUE_TEMPLATE/bug_report.yml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
name: "Report a bug"
|
||||
description: "File a bug report."
|
||||
title: "[Bug]: "
|
||||
type: "bug"
|
||||
labels: bug
|
||||
body:
|
||||
- type: checkboxes
|
||||
id: terms
|
||||
attributes:
|
||||
label: Code of Conduct
|
||||
description: "This is Scylla's bug tracker, to be used for reporting bugs only.
|
||||
If you have a question about Scylla, and not a bug, please ask it in
|
||||
our forum at https://forum.scylladb.com/ or in our slack channel https://slack.scylladb.com/ "
|
||||
options:
|
||||
- label: I have read the disclaimer above and am reporting a suspected malfunction in Scylla.
|
||||
required: true
|
||||
|
||||
- type: input
|
||||
id: product-version
|
||||
attributes:
|
||||
label: product version
|
||||
description: Scylla version (or git commit hash)
|
||||
placeholder: ex. scylla-6.1.1
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: input
|
||||
id: cluster-size
|
||||
attributes:
|
||||
label: Cluster Size
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: input
|
||||
id: os
|
||||
attributes:
|
||||
label: OS
|
||||
placeholder: RHEL/CentOS/Ubuntu/AWS AMI
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: additional-data
|
||||
attributes:
|
||||
label: Additional Environmental Data
|
||||
#description:
|
||||
placeholder: Add additional data
|
||||
value: "Platform (physical/VM/cloud instance type/docker):\n
|
||||
Hardware: sockets= cores= hyperthreading= memory=\n
|
||||
Disks: (SSD/HDD, count)"
|
||||
validations:
|
||||
required: false
|
||||
|
||||
- type: textarea
|
||||
id: reproducer-steps
|
||||
attributes:
|
||||
label: Reproduction Steps
|
||||
placeholder: Describe how to reproduce the problem
|
||||
value: "The steps to reproduce the problem are:"
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: the-problem
|
||||
attributes:
|
||||
label: What is the problem?
|
||||
placeholder: Describe the problem you found
|
||||
value: "The problem is that"
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: what-happened
|
||||
attributes:
|
||||
label: Expected behavior?
|
||||
placeholder: Describe what should have happened
|
||||
value: "I expected that "
|
||||
validations:
|
||||
required: true
|
||||
|
||||
- type: textarea
|
||||
id: logs
|
||||
attributes:
|
||||
label: Relevant log output
|
||||
description: Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks.
|
||||
render: shell
|
||||
3
.github/scripts/label_promoted_commits.py
vendored
3
.github/scripts/label_promoted_commits.py
vendored
@@ -54,7 +54,8 @@ def main():
|
||||
# Print commit information
|
||||
for commit in commits:
|
||||
print(f'Commit sha is: {commit.sha}')
|
||||
match = pr_pattern.search(commit.commit.message)
|
||||
pr_last_line = commit.commit.message.splitlines()[-1]
|
||||
match = pr_pattern.search(pr_last_line)
|
||||
if match:
|
||||
pr_number = int(match.group(1))
|
||||
if pr_number in processed_prs:
|
||||
|
||||
@@ -49,7 +49,7 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
run: python .github/scripts/label_promoted_commits.py --commits ${{ github.event.before }}..${{ github.sha }} --repository ${{ github.repository }} --ref ${{ github.ref }}
|
||||
- name: Run auto-backport.py when promotion completed
|
||||
if: github.event_name == 'push'
|
||||
if: github.event_name == 'push' && github.ref == 'refs/heads/${{ env.DEFAULT_BRANCH }}'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.AUTO_BACKPORT_TOKEN }}
|
||||
run: python .github/scripts/auto-backport.py --repo ${{ github.repository }} --base-branch ${{ github.ref }} --commits ${{ github.event.before }}..${{ github.sha }}
|
||||
|
||||
@@ -1044,8 +1044,9 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'utils/multiprecision_int.cc',
|
||||
'utils/gz/crc_combine.cc',
|
||||
'utils/gz/crc_combine_table.cc',
|
||||
'utils/s3/client.cc',
|
||||
'utils/s3/aws_error.cc',
|
||||
'utils/s3/client.cc',
|
||||
'utils/s3/retry_strategy.cc',
|
||||
'gms/version_generator.cc',
|
||||
'gms/versioned_value.cc',
|
||||
'gms/gossiper.cc',
|
||||
|
||||
@@ -41,6 +41,7 @@
|
||||
#include "cql3/functions/functions.hh"
|
||||
#include "types/user.hh"
|
||||
#include "view_info.hh"
|
||||
#include "validation.hh"
|
||||
#include "index/secondary_index_manager.hh"
|
||||
#include "cql3/functions/user_function.hh"
|
||||
#include "cql3/functions/user_aggregate.hh"
|
||||
@@ -255,6 +256,11 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
|
||||
if (!table) {
|
||||
throw exceptions::invalid_request_exception(format("Table '{}' not found in keyspace '{}'", name, ks));
|
||||
}
|
||||
|
||||
auto s = validation::validate_column_family(db, ks, name);
|
||||
if (s->is_view()) {
|
||||
throw exceptions::invalid_request_exception("Cannot use DESC TABLE on materialized View");
|
||||
}
|
||||
|
||||
auto schema = table->schema();
|
||||
auto idxs = table->get_index_manager().list_indexes();
|
||||
|
||||
7
test.py
7
test.py
@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
|
||||
from test.pylib.artifact_registry import ArtifactRegistry
|
||||
from test.pylib.host_registry import HostRegistry
|
||||
from test.pylib.pool import Pool
|
||||
from test.pylib.s3_proxy import S3ProxyServer
|
||||
from test.pylib.s3_server_mock import MockS3Server
|
||||
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
|
||||
from test.pylib.util import LogPrefixAdapter
|
||||
@@ -1553,6 +1554,12 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
|
||||
await mock_s3_server.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, mock_s3_server.stop)
|
||||
|
||||
minio_uri = "http://" + os.environ[ms.ENV_ADDRESS] + ":" + os.environ[ms.ENV_PORT]
|
||||
proxy_s3_server = S3ProxyServer(await hosts.lease_host(), 9002, minio_uri, 3, int(time.time()),
|
||||
LogPrefixAdapter(logging.getLogger('s3_proxy'), {'prefix': 's3_proxy'}))
|
||||
await proxy_s3_server.start()
|
||||
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)
|
||||
|
||||
console.print_start_blurb()
|
||||
max_failures = options.max_failures
|
||||
failed = 0
|
||||
|
||||
@@ -20,11 +20,13 @@
|
||||
#include <seastar/util/closeable.hh>
|
||||
|
||||
using namespace seastar;
|
||||
using namespace std::chrono_literals;
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
enum class failure_policy : uint8_t {
|
||||
SUCCESS = 0,
|
||||
RETRYABLE_FAILURE = 1,
|
||||
NONRETRYABLE_FAILURE = 2,
|
||||
NEVERENDING_RETRYABLE_FAILURE = 3,
|
||||
};
|
||||
|
||||
static uint16_t get_port() {
|
||||
@@ -97,23 +99,34 @@ SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_retryable_success) {
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
});
|
||||
BOOST_REQUIRE_NO_THROW(test_client_upload_file(seastar_test::get_name(), failure_policy::RETRYABLE_FAILURE, total_size, memory_size));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure_1) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size),
|
||||
storage_io_error,
|
||||
[](const storage_io_error& e) { return e.code().value() == EIO; });
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NEVERENDING_RETRYABLE_FAILURE, total_size, memory_size),
|
||||
storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
|
||||
});
|
||||
}
|
||||
|
||||
void do_test_client_multipart_upload(failure_policy policy) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", "test", ::getpid()));
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_file_failure_2) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
BOOST_REQUIRE_EXCEPTION(test_client_upload_file(seastar_test::get_name(), failure_policy::NONRETRYABLE_FAILURE, total_size, memory_size), storage_io_error,
|
||||
[](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
|
||||
});
|
||||
}
|
||||
|
||||
void do_test_client_multipart_upload(failure_policy policy, bool is_jumbo = false) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}-{}", "test", is_jumbo ? "jumbo" : "large", ::getpid()));
|
||||
|
||||
register_policy(name, policy);
|
||||
testlog.info("Make client");
|
||||
semaphore mem(16 << 20);
|
||||
@@ -121,7 +134,7 @@ void do_test_client_multipart_upload(failure_policy policy) {
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object");
|
||||
auto out = output_stream<char>(cln->make_upload_sink(name));
|
||||
auto out = output_stream<char>(is_jumbo ? cln->make_upload_jumbo_sink(name, 3) : cln->make_upload_sink(name));
|
||||
auto close_stream = deferred_close(out);
|
||||
|
||||
static constexpr unsigned chunk_size = 1000;
|
||||
@@ -139,13 +152,38 @@ SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_success) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_retryable_success) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure_1) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NEVERENDING_RETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_sink_failure_2) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO;
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_success) {
|
||||
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::SUCCESS, true));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_retryable_success) {
|
||||
BOOST_REQUIRE_NO_THROW(do_test_client_multipart_upload(failure_policy::RETRYABLE_FAILURE, true));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_failure_1) {
|
||||
BOOST_REQUIRE_EXCEPTION(
|
||||
do_test_client_multipart_upload(failure_policy::NEVERENDING_RETRYABLE_FAILURE, true), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 1. Reason: We encountered an internal error. Please try again."sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_multipart_upload_jumbo_sink_failure_2) {
|
||||
BOOST_REQUIRE_EXCEPTION(do_test_client_multipart_upload(failure_policy::NONRETRYABLE_FAILURE, true), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 2. Reason: Something went terribly wrong"sv;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -47,35 +47,31 @@ build_xml_response(const std::string& exception, const std::string& message, con
|
||||
BOOST_AUTO_TEST_CASE(TestXmlErrorPayload) {
|
||||
std::string message = "Test Message";
|
||||
std::string requestId = "Request Id";
|
||||
aws::aws_error error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId));
|
||||
auto error = aws::aws_error::parse(build_xml_response("IncompleteSignatureException", message, requestId)).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INCOMPLETE_SIGNATURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural));
|
||||
error = aws::aws_error::parse(build_xml_response("InternalFailure", message, requestId, message_style::plural)).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
|
||||
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural));
|
||||
error = aws::aws_error::parse(build_xml_response("IDontExist", message, requestId, message_style::plural)).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
error = aws::aws_error::parse("");
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
auto no_error = aws::aws_error::parse("");
|
||||
BOOST_REQUIRE_EQUAL(no_error.has_value(), false);
|
||||
|
||||
error =
|
||||
no_error =
|
||||
aws::aws_error::parse("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.");
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::OK, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
BOOST_REQUIRE_EQUAL(no_error.has_value(), false);
|
||||
|
||||
std::string response = " ";
|
||||
response += build_xml_response("InternalFailure", message, requestId, message_style::singular);
|
||||
error = aws::aws_error::parse(response);
|
||||
error = aws::aws_error::parse(response).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::INTERNAL_FAILURE, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::yes);
|
||||
@@ -86,13 +82,13 @@ BOOST_AUTO_TEST_CASE(TestErrorsWithPrefixParse) {
|
||||
std::string exceptionPrefix = "blahblahblah#";
|
||||
std::string requestId = "Request Id";
|
||||
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId));
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + std::string(exception), message, requestId)).value();
|
||||
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
|
||||
}
|
||||
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId));
|
||||
auto error = aws::aws_error::parse(build_xml_response(exceptionPrefix + "IDon'tExist", "JunkMessage", requestId)).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
@@ -102,12 +98,12 @@ BOOST_AUTO_TEST_CASE(TestErrorsWithoutPrefixParse) {
|
||||
std::string message = "Test Message";
|
||||
std::string requestId = "Request Id";
|
||||
for (const auto& [exception, err] : aws::aws_error::get_errors()) {
|
||||
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId));
|
||||
auto error = aws::aws_error::parse(build_xml_response(std::string(exception), message, requestId)).value();
|
||||
BOOST_REQUIRE_EQUAL(err.get_error_type(), error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL(message, error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(err.is_retryable(), error.is_retryable());
|
||||
}
|
||||
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId));
|
||||
auto error = aws::aws_error::parse(build_xml_response("IDon'tExist", "JunkMessage", requestId)).value();
|
||||
BOOST_REQUIRE_EQUAL(aws::aws_error_type::UNKNOWN, error.get_error_type());
|
||||
BOOST_REQUIRE_EQUAL("JunkMessage", error.get_error_message());
|
||||
BOOST_REQUIRE_EQUAL(error.is_retryable(), aws::retryable::no);
|
||||
|
||||
@@ -31,6 +31,8 @@
|
||||
#include "sstables/checksum_utils.hh"
|
||||
#include "gc_clock.hh"
|
||||
|
||||
using namespace std::string_view_literals;
|
||||
|
||||
// The test can be run on real AWS-S3 bucket. For that, create a bucket with
|
||||
// permissive enough policy and then run the test with env set respectively
|
||||
// E.g. like this
|
||||
@@ -43,7 +45,21 @@
|
||||
// export AWS_SESSION_TOKEN=${aws_session_token}
|
||||
// export AWS_DEFAULT_REGION="us-east-2"
|
||||
|
||||
s3::endpoint_config_ptr make_minio_config() {
|
||||
static shared_ptr<s3::client> make_proxy_client(semaphore& mem) {
|
||||
s3::endpoint_config cfg = {
|
||||
.port = std::stoul(tests::getenv_safe("PROXY_S3_SERVER_PORT")),
|
||||
.use_https = false,
|
||||
.aws = {{
|
||||
.access_key_id = tests::getenv_safe("AWS_ACCESS_KEY_ID"),
|
||||
.secret_access_key = tests::getenv_safe("AWS_SECRET_ACCESS_KEY"),
|
||||
.session_token = ::getenv("AWS_SESSION_TOKEN") ? : "",
|
||||
.region = ::getenv("AWS_DEFAULT_REGION") ? : "local",
|
||||
}},
|
||||
};
|
||||
return s3::client::make(tests::getenv_safe("PROXY_S3_SERVER_HOST"), make_lw_shared<s3::endpoint_config>(std::move(cfg)), mem);
|
||||
}
|
||||
|
||||
static shared_ptr<s3::client> make_minio_client(semaphore& mem) {
|
||||
s3::endpoint_config cfg = {
|
||||
.port = std::stoul(tests::getenv_safe("S3_SERVER_PORT_FOR_TEST")),
|
||||
.use_https = ::getenv("AWS_DEFAULT_REGION") != nullptr,
|
||||
@@ -54,21 +70,23 @@ s3::endpoint_config_ptr make_minio_config() {
|
||||
.region = ::getenv("AWS_DEFAULT_REGION") ? : "local",
|
||||
}},
|
||||
};
|
||||
return make_lw_shared<s3::endpoint_config>(std::move(cfg));
|
||||
return s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_lw_shared<s3::endpoint_config>(std::move(cfg)), mem);
|
||||
}
|
||||
|
||||
using client_maker_function = std::function<shared_ptr<s3::client>(semaphore&)>;
|
||||
|
||||
/*
|
||||
* Tests below expect minio server to be running on localhost
|
||||
* with the bucket named env['S3_BUCKET_FOR_TEST'] created with
|
||||
* unrestricted anonymous read-write access
|
||||
*/
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
|
||||
void client_put_get_object(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
semaphore mem(16 << 20);
|
||||
auto cln = client_maker(mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -102,6 +120,14 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_object) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object_minio) {
|
||||
client_put_get_object(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_object_proxy) {
|
||||
client_put_get_object(make_proxy_client);
|
||||
}
|
||||
|
||||
static auto deferred_delete_object(shared_ptr<s3::client> client, sstring name) {
|
||||
return seastar::defer([client, name] {
|
||||
testlog.info("Delete object: {}\n", name);
|
||||
@@ -109,12 +135,12 @@ static auto deferred_delete_object(shared_ptr<s3::client> client, sstring name)
|
||||
});
|
||||
}
|
||||
|
||||
void do_test_client_multipart_upload(bool with_copy_upload) {
|
||||
void do_test_client_multipart_upload(const client_maker_function& client_maker, bool with_copy_upload) {
|
||||
const sstring name(fmt::format("/{}/test{}object-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), with_copy_upload ? "jumbo" : "large", ::getpid()));
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = client_maker(mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object (with copy = {})\n", with_copy_upload);
|
||||
@@ -161,21 +187,29 @@ void do_test_client_multipart_upload(bool with_copy_upload) {
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload) {
|
||||
do_test_client_multipart_upload(false);
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_minio) {
|
||||
do_test_client_multipart_upload(make_minio_client, false);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload) {
|
||||
do_test_client_multipart_upload(true);
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_proxy) {
|
||||
do_test_client_multipart_upload(make_proxy_client, false);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload_minio) {
|
||||
do_test_client_multipart_upload(make_minio_client, true);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_copy_upload_proxy) {
|
||||
do_test_client_multipart_upload(make_proxy_client, true);
|
||||
}
|
||||
|
||||
void client_multipart_upload_fallback(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testfbobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
|
||||
testlog.info("Make client");
|
||||
semaphore mem(0);
|
||||
mem.broken(); // so that any attempt to use it throws
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = client_maker(mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Upload object");
|
||||
@@ -197,9 +231,17 @@ SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback) {
|
||||
BOOST_REQUIRE_EQUAL(to_sstring(std::move(res)), to_sstring(std::move(data)));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback_minio) {
|
||||
client_multipart_upload_fallback(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_multipart_upload_fallback_proxy) {
|
||||
client_multipart_upload_fallback(make_proxy_client);
|
||||
}
|
||||
|
||||
using with_remainder_t = bool_class<class with_remainder_tag>;
|
||||
|
||||
future<> test_client_upload_file(std::string_view test_name, size_t total_size, size_t memory_size) {
|
||||
future<> test_client_upload_file(const client_maker_function& client_maker, std::string_view test_name, size_t total_size, size_t memory_size) {
|
||||
tmpdir tmp;
|
||||
const auto file_path = tmp.path() / "test";
|
||||
|
||||
@@ -231,9 +273,7 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
|
||||
|
||||
// 2. upload the file to s3
|
||||
semaphore mem{memory_size};
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"),
|
||||
make_minio_config(),
|
||||
mem);
|
||||
auto client = client_maker(mem);
|
||||
co_await client->upload_file(file_path, object_name);
|
||||
// 3. retrieve the object from s3 and retrieve the object from S3 and
|
||||
// compare it with the pattern
|
||||
@@ -262,34 +302,56 @@ future<> test_client_upload_file(std::string_view test_name, size_t total_size,
|
||||
co_await client->close();
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder) {
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder_minio) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t total_size = 4 * part_size;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
|
||||
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder) {
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_without_remainder_proxy) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t total_size = 4 * part_size;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder_minio) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
|
||||
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_single_part) {
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_multi_part_with_remainder_proxy) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t remainder_size = part_size / 2;
|
||||
const size_t total_size = 4 * part_size + remainder_size;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_single_part_minio) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t total_size = part_size / 2;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(seastar_test::get_name(), total_size, memory_size);
|
||||
co_await test_client_upload_file(make_minio_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
|
||||
SEASTAR_TEST_CASE(test_client_upload_file_single_part_proxy) {
|
||||
const size_t part_size = 5_MiB;
|
||||
const size_t total_size = part_size / 2;
|
||||
const size_t memory_size = part_size;
|
||||
co_await test_client_upload_file(make_proxy_client, seastar_test::get_name(), total_size, memory_size);
|
||||
}
|
||||
|
||||
void client_readable_file(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testroobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = client_maker(mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -326,12 +388,20 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file) {
|
||||
BOOST_REQUIRE_EQUAL(to_sstring(std::move(buf)), sstring("67890ABC"));
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_minio) {
|
||||
client_readable_file(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_proxy) {
|
||||
client_readable_file(make_proxy_client);
|
||||
}
|
||||
|
||||
void client_readable_file_stream(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/teststreamobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
|
||||
testlog.info("Make client\n");
|
||||
semaphore mem(16<<20);
|
||||
auto cln = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto cln = client_maker(mem);
|
||||
auto close_client = deferred_close(*cln);
|
||||
|
||||
testlog.info("Put object {}\n", name);
|
||||
@@ -350,11 +420,20 @@ SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream) {
|
||||
BOOST_REQUIRE_EQUAL(res, sample);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream_minio) {
|
||||
client_readable_file_stream(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_readable_file_stream_proxy) {
|
||||
client_readable_file_stream(make_proxy_client);
|
||||
}
|
||||
|
||||
void client_put_get_tagging(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}",
|
||||
tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = client_maker(mem);
|
||||
|
||||
auto close_client = deferred_close(*client);
|
||||
auto data = sstring("1234567890ABCDEF").release();
|
||||
client->put_object(name, std::move(data)).get();
|
||||
@@ -379,6 +458,14 @@ SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging) {
|
||||
}
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging_minio) {
|
||||
client_put_get_tagging(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_put_get_tagging_proxy) {
|
||||
client_put_get_tagging(make_proxy_client);
|
||||
}
|
||||
|
||||
static std::unordered_set<sstring> populate_bucket(shared_ptr<s3::client> client, sstring bucket, sstring prefix, int nr_objects) {
|
||||
std::unordered_set<sstring> names;
|
||||
|
||||
@@ -392,11 +479,11 @@ static std::unordered_set<sstring> populate_bucket(shared_ptr<s3::client> client
|
||||
return names;
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
|
||||
void client_list_objects(const client_maker_function& client_maker) {
|
||||
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
|
||||
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = client_maker(mem);
|
||||
auto close_client = deferred_close(*client);
|
||||
|
||||
// Put extra object to check list-by-prefix filters it out
|
||||
@@ -416,11 +503,19 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects) {
|
||||
BOOST_REQUIRE(names.empty());
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_minio) {
|
||||
client_list_objects(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_proxy) {
|
||||
client_list_objects(make_proxy_client);
|
||||
}
|
||||
|
||||
void client_list_objects_incomplete(const client_maker_function& client_maker) {
|
||||
const sstring bucket = tests::getenv_safe("S3_BUCKET_FOR_TEST");
|
||||
const sstring prefix(fmt::format("testprefix-{}/", ::getpid()));
|
||||
semaphore mem(16<<20);
|
||||
auto client = s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_minio_config(), mem);
|
||||
auto client = client_maker(mem);
|
||||
auto close_client = deferred_close(*client);
|
||||
|
||||
populate_bucket(client, bucket, prefix, 8);
|
||||
@@ -432,3 +527,57 @@ SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete) {
|
||||
auto de = lister.get().get();
|
||||
close_lister.close_now();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete_minio) {
|
||||
client_list_objects_incomplete(make_minio_client);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_list_objects_incomplete_proxy) {
|
||||
client_list_objects_incomplete(make_proxy_client);
|
||||
}
|
||||
|
||||
void client_broken_bucket(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", "NO_BUCKET", ::getpid()));
|
||||
semaphore mem(16 << 20);
|
||||
auto client = client_maker(mem);
|
||||
|
||||
auto close_client = deferred_close(*client);
|
||||
auto data = sstring("1234567890ABCDEF").release();
|
||||
BOOST_REQUIRE_EXCEPTION(client->put_object(name, std::move(data)).get(), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == EIO && e.what() == "S3 request failed. Code: 100. Reason: The specified bucket is not valid."sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_broken_bucket_minio) {
|
||||
client_broken_bucket(make_minio_client);
|
||||
}
|
||||
|
||||
void client_missing_prefix(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
semaphore mem(16 << 20);
|
||||
auto client = client_maker(mem);
|
||||
|
||||
auto close_client = deferred_close(*client);
|
||||
BOOST_REQUIRE_EXCEPTION(client->get_object_size(name).get(), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == ENOENT && e.what() == "S3 request failed. Code: 117. Reason: HTTP code: 404 Not Found"sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_missing_prefix_minio) {
|
||||
client_missing_prefix(make_minio_client);
|
||||
}
|
||||
|
||||
void client_access_missing_object(const client_maker_function& client_maker) {
|
||||
const sstring name(fmt::format("/{}/testobject-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid()));
|
||||
semaphore mem(16 << 20);
|
||||
auto client = client_maker(mem);
|
||||
|
||||
auto close_client = deferred_close(*client);
|
||||
BOOST_REQUIRE_EXCEPTION(client->get_object_tagging(name).get(), storage_io_error, [](const storage_io_error& e) {
|
||||
return e.code().value() == ENOENT && e.what() == "S3 request failed. Code: 133. Reason: The specified key does not exist."sv;
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_client_access_missing_object_minio) {
|
||||
client_access_missing_object(make_minio_client);
|
||||
}
|
||||
|
||||
@@ -1399,7 +1399,6 @@ def test_view_forbids_write(cql, mv1):
|
||||
# "TABLE" in their name - DROP TABLE, ALTER TABLE, and DESC TABLE. There
|
||||
# are identical operations with "MATERIALIZED VIEW" in their name, which
|
||||
# should be used instead.
|
||||
@pytest.mark.xfail(reason="issue #21026")
|
||||
def test_view_forbids_table_ops(cql, mv1):
|
||||
with pytest.raises(InvalidRequest, match='Cannot use'):
|
||||
cql.execute(f'DROP TABLE {mv1}')
|
||||
@@ -1409,7 +1408,7 @@ def test_view_forbids_table_ops(cql, mv1):
|
||||
# that DESC TABLE cannot be used on a view and that DESC MATERIALIZED VIEW
|
||||
# should be used instead - it just reports that the table is "not found".
|
||||
# Reproduces #21026 (DESC TABLE was allowed on a view):
|
||||
with pytest.raises(InvalidRequest):
|
||||
with pytest.raises(InvalidRequest, match='Cannot use'):
|
||||
cql.execute(f'DESC TABLE {mv1}')
|
||||
|
||||
# A materialized view cannot have its own materialized views, nor secondary
|
||||
|
||||
256
test/pylib/s3_proxy.py
Normal file
256
test/pylib/s3_proxy.py
Normal file
@@ -0,0 +1,256 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
# S3 proxy server to inject retryable errors for fuzzy testing.
|
||||
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import asyncio
|
||||
|
||||
import requests
|
||||
import threading
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import uuid
|
||||
from functools import partial
|
||||
from collections import OrderedDict
|
||||
from requests import Response
|
||||
from typing_extensions import Optional
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
|
||||
|
||||
class Policy:
|
||||
def __init__(self, max_retries: int):
|
||||
self.should_forward: bool = True
|
||||
self.should_fail: bool = False
|
||||
self.error_count: int = 0
|
||||
self.max_errors: int = random.choice(list(range(1, max_retries)))
|
||||
|
||||
|
||||
class LRUCache:
|
||||
lock = threading.Lock()
|
||||
|
||||
def __init__(self, capacity: int):
|
||||
self.cache = OrderedDict()
|
||||
self.capacity = capacity
|
||||
|
||||
def get(self, key: str) -> Optional[Policy]:
|
||||
with self.lock:
|
||||
if key not in self.cache:
|
||||
return None
|
||||
self.cache.move_to_end(key)
|
||||
return self.cache[key]
|
||||
|
||||
def put(self, key: str, value: Policy) -> None:
|
||||
with self.lock:
|
||||
if key in self.cache:
|
||||
self.cache.move_to_end(key)
|
||||
self.cache[key] = value
|
||||
if len(self.cache) > self.capacity:
|
||||
self.cache.popitem(last=False)
|
||||
|
||||
|
||||
# Simple proxy between s3 client and minio to randomly inject errors and simulate cases when the request succeeds but the wire got "broken"
|
||||
def true_or_false():
|
||||
return random.choice([True, False])
|
||||
|
||||
|
||||
class InjectingHandler(BaseHTTPRequestHandler):
|
||||
retryable_codes = list((408, 419, 429, 440)) + list(range(500, 599))
|
||||
error_names = list(("InternalFailureException",
|
||||
"InternalFailure",
|
||||
"InternalServerError",
|
||||
"InternalError",
|
||||
"RequestExpiredException",
|
||||
"RequestExpired",
|
||||
"ServiceUnavailableException",
|
||||
"ServiceUnavailableError",
|
||||
"ServiceUnavailable",
|
||||
"RequestThrottledException",
|
||||
"RequestThrottled",
|
||||
"ThrottlingException",
|
||||
"ThrottledException",
|
||||
"Throttling",
|
||||
"SlowDownException",
|
||||
"SlowDown",
|
||||
"RequestTimeTooSkewedException",
|
||||
"RequestTimeTooSkewed",
|
||||
"RequestTimeoutException",
|
||||
"RequestTimeout"))
|
||||
|
||||
def __init__(self, policies, logger, minio_uri, max_retries, *args, **kwargs):
|
||||
self.minio_uri = minio_uri
|
||||
self.policies = policies
|
||||
self.logger = logger
|
||||
self.max_retries = max_retries
|
||||
super().__init__(*args, **kwargs)
|
||||
self.close_connection = False
|
||||
|
||||
def log_error(self, format, *args):
|
||||
if self.logger:
|
||||
self.logger.info("%s - - [%s] %s\n" %
|
||||
(self.client_address[0],
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
else:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Just don't be too verbose
|
||||
if not self.logger:
|
||||
sys.stderr.write("%s - - [%s] %s\n" %
|
||||
(self.address_string(),
|
||||
self.log_date_time_string(),
|
||||
format % args))
|
||||
|
||||
def parsed_qs(self):
|
||||
parsed_url = urlparse(self.path)
|
||||
query_components = parse_qs(parsed_url.query)
|
||||
for key in parsed_url.query.split('&'):
|
||||
if '=' not in key:
|
||||
query_components[key] = ['']
|
||||
return query_components
|
||||
|
||||
def get_policy(self):
|
||||
policy = self.policies.get(self.path)
|
||||
if policy is None:
|
||||
policy = Policy(self.max_retries)
|
||||
policy.should_forward = true_or_false()
|
||||
if policy.should_forward:
|
||||
policy.should_fail = true_or_false()
|
||||
else:
|
||||
policy.should_fail = True
|
||||
self.policies.put(self.path, policy)
|
||||
|
||||
# Unfortunately MPU completion retry on already completed upload would introduce flakiness to unit tests, for example `s3_test`
|
||||
if self.command == "POST" and "uploadId" in self.parsed_qs():
|
||||
policy.should_forward = not policy.should_fail
|
||||
|
||||
return policy
|
||||
|
||||
def get_retryable_http_codes(self):
|
||||
return random.choice(self.retryable_codes), random.choice(self.error_names)
|
||||
|
||||
def respond_with_error(self):
|
||||
code, error_name = self.get_retryable_http_codes()
|
||||
self.send_response(code)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
req_uuid = str(uuid.uuid4())
|
||||
response = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<Error>
|
||||
<Code>{error_name}</Code>
|
||||
<Message>Minio proxy injected error. Client should retry.</Message>
|
||||
<RequestId>{req_uuid}</RequestId>
|
||||
<HostId>Uuag1LuByRx9e6j5Onimru9pO4ZVKnJ2Qz7/C1NPcfTWAtRPfTaOFg==</HostId>
|
||||
</Error>""".encode('utf-8')
|
||||
self.send_header('Content-Length', str(len(response)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response)
|
||||
|
||||
def process_request(self):
|
||||
try:
|
||||
policy = self.get_policy()
|
||||
|
||||
if policy.error_count >= policy.max_errors:
|
||||
policy.should_fail = False
|
||||
policy.should_forward = True
|
||||
|
||||
response = Response()
|
||||
body = None
|
||||
|
||||
content_length = self.headers['Content-Length']
|
||||
if content_length:
|
||||
body = self.rfile.read(int(content_length))
|
||||
|
||||
if policy.should_forward:
|
||||
target_url = self.minio_uri + self.path
|
||||
headers = {key: value for key, value in self.headers.items()}
|
||||
response = requests.request(self.command, target_url, headers=headers, data=body)
|
||||
|
||||
if policy.should_fail:
|
||||
policy.error_count += 1
|
||||
self.respond_with_error()
|
||||
else:
|
||||
self.send_response(response.status_code)
|
||||
for key, value in response.headers.items():
|
||||
if key.upper() != 'CONTENT-LENGTH':
|
||||
self.send_header(key, value)
|
||||
|
||||
if self.command == 'HEAD':
|
||||
self.send_header("Content-Length", response.headers['Content-Length'])
|
||||
else:
|
||||
self.send_header("Content-Length", str(len(response.content)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response.content)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def do_GET(self):
|
||||
self.process_request()
|
||||
|
||||
def do_POST(self):
|
||||
self.process_request()
|
||||
|
||||
def do_PUT(self):
|
||||
self.process_request()
|
||||
|
||||
def do_DELETE(self):
|
||||
self.process_request()
|
||||
|
||||
def do_HEAD(self):
|
||||
self.process_request()
|
||||
|
||||
|
||||
# Proxy server to setup `ThreadingHTTPServer` instance with custom request handler (see above), managing requests state
|
||||
# in the `self.req_states`, adding custom logger, etc. This server will be started automatically from `test.py`. In
|
||||
# addition, it is possible just to start this server using another script - `start_s3_proxy.py` to run it locally to
|
||||
# provide proxy between tests and minio
|
||||
class S3ProxyServer:
|
||||
def __init__(self, host: str, port: int, minio_uri: str, max_retries: int, seed: int, logger=None):
|
||||
print(f'Setting minio proxy random seed to {seed}')
|
||||
random.seed(seed)
|
||||
self.req_states = LRUCache(10000)
|
||||
handler = partial(InjectingHandler, self.req_states, logger, minio_uri, max_retries)
|
||||
self.server = ThreadingHTTPServer((host, port), handler)
|
||||
self.server_thread = None
|
||||
self.server.request_queue_size = 1000
|
||||
self.server.timeout = 10000
|
||||
self.server.socket.settimeout(10000)
|
||||
self.server.socket.listen(1000)
|
||||
self.is_running = False
|
||||
os.environ['PROXY_S3_SERVER_PORT'] = f'{port}'
|
||||
os.environ['PROXY_S3_SERVER_HOST'] = host
|
||||
|
||||
async def start(self):
|
||||
if not self.is_running:
|
||||
print(f'Starting S3 proxy server on {self.server.server_address}')
|
||||
loop = asyncio.get_running_loop()
|
||||
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
|
||||
self.is_running = True
|
||||
|
||||
async def stop(self):
|
||||
if self.is_running:
|
||||
print('Stopping S3 proxy server')
|
||||
self.server.shutdown()
|
||||
await self.server_thread
|
||||
self.is_running = False
|
||||
|
||||
async def run(self):
|
||||
try:
|
||||
await self.start()
|
||||
while self.is_running:
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
print(f"Server error: {e}")
|
||||
await self.stop()
|
||||
@@ -25,6 +25,7 @@ class Policy(Enum):
|
||||
SUCCESS = 0
|
||||
RETRYABLE_FAILURE = 1
|
||||
NONRETRYABLE_FAILURE = 2
|
||||
NEVERENDING_RETRYABLE_FAILURE = 3
|
||||
|
||||
|
||||
class LRUCache:
|
||||
@@ -113,7 +114,6 @@ class InjectingHandler(BaseHTTPRequestHandler):
|
||||
put_data = self.rfile.read(int(content_length))
|
||||
self.send_response(200)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
query_components = self.parsed_qs()
|
||||
if 'Key' in query_components and 'Policy' in query_components:
|
||||
@@ -122,11 +122,28 @@ class InjectingHandler(BaseHTTPRequestHandler):
|
||||
self.send_header('ETag', "SomeTag_" + query_components.get("partNumber")[0])
|
||||
else:
|
||||
self.send_header('ETag', "SomeTag")
|
||||
|
||||
if self.headers['x-amz-copy-source']:
|
||||
response_body = """<CopyPartResult>
|
||||
<LastModified>2011-04-11T20:34:56.000Z</LastModified>
|
||||
<ETag>"9b2cf535f27731c974343645a3985328"</ETag>
|
||||
</CopyPartResult>""".encode('utf-8')
|
||||
self.send_header('Content-Length', str(len(response_body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(response_body)
|
||||
return
|
||||
|
||||
self.send_header('Content-Length', '0')
|
||||
self.end_headers()
|
||||
|
||||
# Processes DELETE method, does nothing except providing in the response expected headers and response code
|
||||
def do_DELETE(self):
|
||||
self.send_response(204)
|
||||
query_components = self.parsed_qs()
|
||||
if 'uploadId' in query_components and self.policies.get(
|
||||
urlparse(self.path).path) == Policy.NONRETRYABLE_FAILURE:
|
||||
self.send_response(404)
|
||||
else:
|
||||
self.send_response(204)
|
||||
self.send_header('Content-Type', 'text/plain; charset=utf-8')
|
||||
self.send_header('Content-Length', '0')
|
||||
self.send_header('Connection', 'keep-alive')
|
||||
@@ -153,9 +170,10 @@ class InjectingHandler(BaseHTTPRequestHandler):
|
||||
<Key>Example-Object</Key>
|
||||
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>
|
||||
</CompleteMultipartUploadResult>"""
|
||||
case Policy.RETRYABLE_FAILURE:
|
||||
# should succeed on retry
|
||||
self.policies.put(path, Policy.SUCCESS)
|
||||
case Policy.RETRYABLE_FAILURE | Policy.NEVERENDING_RETRYABLE_FAILURE:
|
||||
if self.policies.get(path) == Policy.RETRYABLE_FAILURE:
|
||||
# should succeed on retry
|
||||
self.policies.put(path, Policy.SUCCESS)
|
||||
return """<?xml version="1.0" encoding="UTF-8"?>
|
||||
|
||||
<Error>
|
||||
|
||||
28
test/pylib/start_s3_proxy.py
Executable file
28
test/pylib/start_s3_proxy.py
Executable file
@@ -0,0 +1,28 @@
|
||||
#!/usr/bin/python3
|
||||
import asyncio
|
||||
import signal
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from s3_proxy import S3ProxyServer
|
||||
|
||||
|
||||
async def run():
|
||||
parser = argparse.ArgumentParser(description="Start S3 proxy server")
|
||||
parser.add_argument('--host', default='127.0.0.1')
|
||||
parser.add_argument('--port', type=int, default=9002)
|
||||
parser.add_argument('--minio-uri', default="http://127.0.0.1:9000")
|
||||
parser.add_argument('--max-retries', type=int, default=5)
|
||||
parser.add_argument('--rnd-seed', type=int, default=int(time.time()))
|
||||
args = parser.parse_args()
|
||||
server = S3ProxyServer(args.host, args.port, args.minio_uri, args.max_retries, args.rnd_seed)
|
||||
|
||||
print('Starting S3 proxy server')
|
||||
await server.start()
|
||||
signal.sigwait({signal.SIGINT, signal.SIGTERM})
|
||||
print('Stopping S3 proxy server')
|
||||
await server.stop()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run())
|
||||
@@ -47,7 +47,8 @@ target_sources(utils
|
||||
uuid.cc
|
||||
aws_sigv4.cc
|
||||
s3/aws_error.cc
|
||||
s3/client.cc)
|
||||
s3/client.cc
|
||||
s3/retry_strategy.cc)
|
||||
target_include_directories(utils
|
||||
PUBLIC
|
||||
${CMAKE_SOURCE_DIR}
|
||||
|
||||
@@ -24,11 +24,9 @@ aws_error::aws_error(aws_error_type error_type, std::string&& error_message, ret
|
||||
: _type(error_type), _message(std::move(error_message)), _is_retryable(is_retryable) {
|
||||
}
|
||||
|
||||
aws_error aws_error::parse(seastar::sstring&& body) {
|
||||
aws_error ret_val;
|
||||
|
||||
std::optional<aws_error> aws_error::parse(seastar::sstring&& body) {
|
||||
if (body.empty()) {
|
||||
return ret_val;
|
||||
return {};
|
||||
}
|
||||
|
||||
auto doc = std::make_unique<rapidxml::xml_document<>>();
|
||||
@@ -36,7 +34,7 @@ aws_error aws_error::parse(seastar::sstring&& body) {
|
||||
doc->parse<0>(body.data());
|
||||
} catch (const rapidxml::parse_error&) {
|
||||
// Most likely not an XML which is possible, just return
|
||||
return ret_val;
|
||||
return {};
|
||||
}
|
||||
|
||||
const auto* error_node = doc->first_node("Error");
|
||||
@@ -48,11 +46,12 @@ aws_error aws_error::parse(seastar::sstring&& body) {
|
||||
}
|
||||
|
||||
if (!error_node) {
|
||||
return ret_val;
|
||||
return {};
|
||||
}
|
||||
|
||||
const auto* code_node = error_node->first_node("Code");
|
||||
const auto* message_node = error_node->first_node("Message");
|
||||
aws_error ret_val;
|
||||
|
||||
if (code_node && message_node) {
|
||||
std::string code = code_node->value();
|
||||
@@ -75,6 +74,58 @@ aws_error aws_error::parse(seastar::sstring&& body) {
|
||||
return ret_val;
|
||||
}
|
||||
|
||||
aws_error aws_error::from_http_code(seastar::http::reply::status_type http_code) {
|
||||
const auto& all_errors = get_errors();
|
||||
aws_error ret_val;
|
||||
switch (http_code) {
|
||||
case seastar::http::reply::status_type::unauthorized:
|
||||
ret_val = all_errors.at("HTTP_UNAUTHORIZED");
|
||||
break;
|
||||
case seastar::http::reply::status_type::forbidden:
|
||||
ret_val = all_errors.at("HTTP_FORBIDDEN");
|
||||
break;
|
||||
case seastar::http::reply::status_type::not_found:
|
||||
ret_val = all_errors.at("HTTP_NOT_FOUND");
|
||||
break;
|
||||
case seastar::http::reply::status_type::too_many_requests:
|
||||
ret_val = all_errors.at("HTTP_TOO_MANY_REQUESTS");
|
||||
break;
|
||||
case seastar::http::reply::status_type::internal_server_error:
|
||||
ret_val = all_errors.at("HTTP_INTERNAL_SERVER_ERROR");
|
||||
break;
|
||||
case seastar::http::reply::status_type::bandwidth_limit_exceeded:
|
||||
ret_val = all_errors.at("HTTP_BANDWIDTH_LIMIT_EXCEEDED");
|
||||
break;
|
||||
case seastar::http::reply::status_type::service_unavailable:
|
||||
ret_val = all_errors.at("HTTP_SERVICE_UNAVAILABLE");
|
||||
break;
|
||||
case seastar::http::reply::status_type::request_timeout:
|
||||
ret_val = all_errors.at("HTTP_REQUEST_TIMEOUT");
|
||||
break;
|
||||
case seastar::http::reply::status_type::page_expired:
|
||||
ret_val = all_errors.at("HTTP_PAGE_EXPIRED");
|
||||
break;
|
||||
case seastar::http::reply::status_type::login_timeout:
|
||||
ret_val = all_errors.at("HTTP_LOGIN_TIMEOUT");
|
||||
break;
|
||||
case seastar::http::reply::status_type::gateway_timeout:
|
||||
ret_val = all_errors.at("HTTP_GATEWAY_TIMEOUT");
|
||||
break;
|
||||
case seastar::http::reply::status_type::network_connect_timeout:
|
||||
ret_val = all_errors.at("HTTP_NETWORK_CONNECT_TIMEOUT");
|
||||
break;
|
||||
case seastar::http::reply::status_type::network_read_timeout:
|
||||
ret_val = all_errors.at("HTTP_NETWORK_READ_TIMEOUT");
|
||||
break;
|
||||
default:
|
||||
ret_val = {aws_error_type::UNKNOWN,
|
||||
"Unknown server error has been encountered.",
|
||||
retryable{seastar::http::reply::classify_status(http_code) == seastar::http::reply::status_class::server_error}};
|
||||
}
|
||||
ret_val._message = seastar::format("{} HTTP code: {}", ret_val._message, http_code);
|
||||
return ret_val;
|
||||
}
|
||||
|
||||
const aws_errors& aws_error::get_errors() {
|
||||
static const std::unordered_map<std::string_view, const aws_error> aws_error_map{
|
||||
{"IncompleteSignature", aws_error(aws_error_type::INCOMPLETE_SIGNATURE, retryable::no)},
|
||||
@@ -132,7 +183,29 @@ const aws_errors& aws_error::get_errors() {
|
||||
{"RequestTimeTooSkewedException", aws_error(aws_error_type::REQUEST_TIME_TOO_SKEWED, retryable::yes)},
|
||||
{"RequestTimeTooSkewed", aws_error(aws_error_type::REQUEST_TIME_TOO_SKEWED, retryable::yes)},
|
||||
{"RequestTimeoutException", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)},
|
||||
{"RequestTimeout", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)}};
|
||||
{"RequestTimeout", aws_error(aws_error_type::REQUEST_TIMEOUT, retryable::yes)},
|
||||
{"HTTP_UNAUTHORIZED", aws_error(aws_error_type::HTTP_UNAUTHORIZED, retryable::no)},
|
||||
{"HTTP_FORBIDDEN", aws_error(aws_error_type::HTTP_FORBIDDEN, retryable::no)},
|
||||
{"HTTP_NOT_FOUND", aws_error(aws_error_type::HTTP_NOT_FOUND, retryable::no)},
|
||||
{"HTTP_TOO_MANY_REQUESTS", aws_error(aws_error_type::HTTP_TOO_MANY_REQUESTS, retryable::yes)},
|
||||
{"HTTP_INTERNAL_SERVER_ERROR", aws_error(aws_error_type::HTTP_INTERNAL_SERVER_ERROR, retryable::yes)},
|
||||
{"HTTP_BANDWIDTH_LIMIT_EXCEEDED", aws_error(aws_error_type::HTTP_BANDWIDTH_LIMIT_EXCEEDED, retryable::yes)},
|
||||
{"HTTP_SERVICE_UNAVAILABLE", aws_error(aws_error_type::HTTP_SERVICE_UNAVAILABLE, retryable::yes)},
|
||||
{"HTTP_REQUEST_TIMEOUT", aws_error(aws_error_type::HTTP_REQUEST_TIMEOUT, retryable::yes)},
|
||||
{"HTTP_PAGE_EXPIRED", aws_error(aws_error_type::HTTP_PAGE_EXPIRED, retryable::yes)},
|
||||
{"HTTP_LOGIN_TIMEOUT", aws_error(aws_error_type::HTTP_LOGIN_TIMEOUT, retryable::yes)},
|
||||
{"HTTP_GATEWAY_TIMEOUT", aws_error(aws_error_type::HTTP_GATEWAY_TIMEOUT, retryable::yes)},
|
||||
{"HTTP_NETWORK_CONNECT_TIMEOUT", aws_error(aws_error_type::HTTP_NETWORK_CONNECT_TIMEOUT, retryable::yes)},
|
||||
{"HTTP_NETWORK_READ_TIMEOUT", aws_error(aws_error_type::HTTP_NETWORK_READ_TIMEOUT, retryable::yes)},
|
||||
{"NoSuchUpload", aws_error(aws_error_type::NO_SUCH_UPLOAD, retryable::no)},
|
||||
{"BucketAlreadyOwnedByYou", aws_error(aws_error_type::BUCKET_ALREADY_OWNED_BY_YOU, retryable::no)},
|
||||
{"ObjectAlreadyInActiveTierError", aws_error(aws_error_type::OBJECT_ALREADY_IN_ACTIVE_TIER, retryable::no)},
|
||||
{"NoSuchBucket", aws_error(aws_error_type::NO_SUCH_BUCKET, retryable::no)},
|
||||
{"NoSuchKey", aws_error(aws_error_type::NO_SUCH_KEY, retryable::no)},
|
||||
{"ObjectNotInActiveTierError", aws_error(aws_error_type::OBJECT_NOT_IN_ACTIVE_TIER, retryable::no)},
|
||||
{"BucketAlreadyExists", aws_error(aws_error_type::BUCKET_ALREADY_EXISTS, retryable::no)},
|
||||
{"InvalidObjectState", aws_error(aws_error_type::INVALID_OBJECT_STATE, retryable::no)}};
|
||||
return aws_error_map;
|
||||
}
|
||||
|
||||
} // namespace aws
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/sstring.hh>
|
||||
#include <seastar/http/reply.hh>
|
||||
#include <seastar/util/bool_class.hh>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
@@ -49,8 +50,32 @@ enum class aws_error_type : uint8_t {
|
||||
CLIENT_SIGNING_FAILURE = 101,
|
||||
USER_CANCELLED = 102,
|
||||
ENDPOINT_RESOLUTION_FAILURE = 103,
|
||||
// HTTP errors without additional information in the response body
|
||||
HTTP_UNAUTHORIZED = 115,
|
||||
HTTP_FORBIDDEN = 116,
|
||||
HTTP_NOT_FOUND = 117,
|
||||
HTTP_TOO_MANY_REQUESTS = 118,
|
||||
HTTP_INTERNAL_SERVER_ERROR = 119,
|
||||
HTTP_BANDWIDTH_LIMIT_EXCEEDED = 120,
|
||||
HTTP_SERVICE_UNAVAILABLE = 121,
|
||||
HTTP_REQUEST_TIMEOUT = 122,
|
||||
HTTP_PAGE_EXPIRED = 123,
|
||||
HTTP_LOGIN_TIMEOUT = 124,
|
||||
HTTP_GATEWAY_TIMEOUT = 125,
|
||||
HTTP_NETWORK_CONNECT_TIMEOUT = 126,
|
||||
HTTP_NETWORK_READ_TIMEOUT = 127,
|
||||
SERVICE_EXTENSION_START_RANGE = 128,
|
||||
OK = 255 // No error set
|
||||
// S3 specific
|
||||
BUCKET_ALREADY_EXISTS = 129,
|
||||
BUCKET_ALREADY_OWNED_BY_YOU = 130,
|
||||
INVALID_OBJECT_STATE = 131,
|
||||
NO_SUCH_BUCKET = 132,
|
||||
NO_SUCH_KEY = 133,
|
||||
NO_SUCH_UPLOAD = 134,
|
||||
OBJECT_ALREADY_IN_ACTIVE_TIER = 135,
|
||||
OBJECT_NOT_IN_ACTIVE_TIER = 136,
|
||||
// No error set
|
||||
OK = 255
|
||||
};
|
||||
|
||||
class aws_error;
|
||||
@@ -69,8 +94,28 @@ public:
|
||||
[[nodiscard]] const std::string& get_error_message() const { return _message; }
|
||||
[[nodiscard]] aws_error_type get_error_type() const { return _type; }
|
||||
[[nodiscard]] retryable is_retryable() const { return _is_retryable; }
|
||||
static aws_error parse(seastar::sstring&& body);
|
||||
static std::optional<aws_error> parse(seastar::sstring&& body);
|
||||
static aws_error from_http_code(seastar::http::reply::status_type http_code);
|
||||
static const aws_errors& get_errors();
|
||||
};
|
||||
|
||||
class aws_exception : public std::exception {
|
||||
aws_error _error;
|
||||
|
||||
public:
|
||||
explicit aws_exception(const aws_error& error) noexcept : _error(error) {}
|
||||
explicit aws_exception(aws_error&& error) noexcept : _error(std::move(error)) {}
|
||||
|
||||
const char* what() const noexcept override { return _error.get_error_message().c_str(); }
|
||||
|
||||
const aws_error& error() const noexcept { return _error; }
|
||||
};
|
||||
|
||||
} // namespace aws
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<aws::aws_error_type> : fmt::formatter<string_view> {
|
||||
auto format(const aws::aws_error_type& error_type, fmt::format_context& ctx) const {
|
||||
return fmt::format_to(ctx.out(), "{}", static_cast<unsigned>(error_type));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <seastar/core/iostream.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
#include <seastar/core/pipe.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/core/units.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
#include <seastar/coroutine/exception.hh>
|
||||
@@ -81,20 +82,12 @@ future<> ignore_reply(const http::reply& rep, input_stream<char>&& in_) {
|
||||
co_await util::skip_entire_stream(in);
|
||||
}
|
||||
|
||||
static future<> look_for_errors(const http::reply&, input_stream<char>&& in_) {
|
||||
auto in = std::move(in_);
|
||||
auto body = co_await util::read_entire_stream_contiguous(in);
|
||||
auto possible_error = aws::aws_error::parse(std::move(body));
|
||||
if (possible_error.get_error_type() != aws::aws_error_type::OK) {
|
||||
throw std::system_error(std::error_code(EIO, std::generic_category()), possible_error.get_error_message());
|
||||
}
|
||||
}
|
||||
|
||||
client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag)
|
||||
client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<aws::retry_strategy> rs)
|
||||
: _host(std::move(host))
|
||||
, _cfg(std::move(cfg))
|
||||
, _gf(std::move(gf))
|
||||
, _memory(mem)
|
||||
, _retry_strategy(std::move(rs))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -214,50 +207,100 @@ inline bool is_redirect_status(http::reply::status_type st) {
|
||||
return st_i >= 300 && st_i < 400;
|
||||
}
|
||||
|
||||
future<> map_s3_client_exception(std::exception_ptr ex) {
|
||||
storage_io_error map_s3_client_exception(std::exception_ptr ex) {
|
||||
seastar::memory::scoped_critical_alloc_section alloc;
|
||||
|
||||
try {
|
||||
std::rethrow_exception(std::move(ex));
|
||||
} catch (const aws::aws_exception& e) {
|
||||
int error_code;
|
||||
switch (e.error().get_error_type()) {
|
||||
case aws::aws_error_type::HTTP_NOT_FOUND:
|
||||
case aws::aws_error_type::RESOURCE_NOT_FOUND:
|
||||
case aws::aws_error_type::NO_SUCH_BUCKET:
|
||||
case aws::aws_error_type::NO_SUCH_KEY:
|
||||
case aws::aws_error_type::NO_SUCH_UPLOAD:
|
||||
error_code = ENOENT;
|
||||
break;
|
||||
case aws::aws_error_type::HTTP_FORBIDDEN:
|
||||
case aws::aws_error_type::HTTP_UNAUTHORIZED:
|
||||
case aws::aws_error_type::ACCESS_DENIED:
|
||||
error_code = EACCES;
|
||||
break;
|
||||
default:
|
||||
error_code = EIO;
|
||||
}
|
||||
return {error_code, format("S3 request failed. Code: {}. Reason: {}", e.error().get_error_type(), e.what())};
|
||||
} catch (const httpd::unexpected_status_error& e) {
|
||||
auto status = e.status();
|
||||
|
||||
if (is_redirect_status(status) || status == http::reply::status_type::not_found) {
|
||||
return make_exception_future<>(storage_io_error(ENOENT, format("S3 object doesn't exist ({})", status)));
|
||||
return {ENOENT, format("S3 object doesn't exist ({})", status)};
|
||||
}
|
||||
if (status == http::reply::status_type::forbidden || status == http::reply::status_type::unauthorized) {
|
||||
return make_exception_future<>(storage_io_error(EACCES, format("S3 access denied ({})", status)));
|
||||
return {EACCES, format("S3 access denied ({})", status)};
|
||||
}
|
||||
|
||||
return make_exception_future<>(storage_io_error(EIO, format("S3 request failed with ({})", status)));
|
||||
return {EIO, format("S3 request failed with ({})", status)};
|
||||
} catch (...) {
|
||||
auto e = std::current_exception();
|
||||
return make_exception_future<>(storage_io_error(EIO, format("S3 error ({})", e)));
|
||||
return {EIO, format("S3 error ({})", e)};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
future<> client::do_make_request(group_client& gc, http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected_opt, seastar::abort_source* as) {
|
||||
// TODO: the http client does not check abort status on entry, and if
|
||||
// we're already aborted when we get here we will paradoxally not be
|
||||
future<> client::do_retryable_request(group_client& gc, http::request req, http::experimental::client::reply_handler handler, seastar::abort_source* as) const {
|
||||
// TODO: the http client does not check abort status on entry, and if
|
||||
// we're already aborted when we get here we will paradoxally not be
|
||||
// interrupted, because no registration etc will be done. So do a quick
|
||||
// preemptive check already.
|
||||
if (as && as->abort_requested()) {
|
||||
return make_exception_future<>(as->abort_requested_exception_ptr());
|
||||
co_await coroutine::return_exception_ptr(as->abort_requested_exception_ptr());
|
||||
}
|
||||
uint32_t retries = 0;
|
||||
std::exception_ptr e;
|
||||
aws::aws_exception request_ex{aws::aws_error{aws::aws_error_type::OK, aws::retryable::yes}};
|
||||
while (true) {
|
||||
try {
|
||||
e = {};
|
||||
co_return co_await (as ? gc.http.make_request(req, handler, *as, std::nullopt) : gc.http.make_request(req, handler, std::nullopt));
|
||||
} catch (const aws::aws_exception& ex) {
|
||||
e = std::current_exception();
|
||||
request_ex = ex;
|
||||
}
|
||||
|
||||
if (!_retry_strategy->should_retry(request_ex.error(), retries)) {
|
||||
break;
|
||||
}
|
||||
co_await seastar::sleep(_retry_strategy->delay_before_retry(request_ex.error(), retries));
|
||||
++retries;
|
||||
}
|
||||
|
||||
if (e) {
|
||||
throw map_s3_client_exception(e);
|
||||
}
|
||||
auto expected = expected_opt.value_or(http::reply::status_type::ok);
|
||||
return (as
|
||||
? gc.http.make_request(std::move(req), std::move(handle), *as, expected)
|
||||
: gc.http.make_request(std::move(req), std::move(handle), expected)
|
||||
).handle_exception([] (auto ex) {
|
||||
return map_s3_client_exception(std::move(ex));
|
||||
});
|
||||
}
|
||||
|
||||
future<> client::make_request(http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected, seastar::abort_source* as) {
|
||||
authorize(req);
|
||||
auto& gc = find_or_create_client();
|
||||
return do_make_request(gc, std::move(req), std::move(handle), expected, as);
|
||||
return do_retryable_request(
|
||||
gc, std::move(req), [handler = std::move(handle), expected = expected.value_or(http::reply::status_type::ok)](const http::reply& rep, input_stream<char>&& in) mutable -> future<> {
|
||||
auto payload = std::move(in);
|
||||
auto status_class = http::reply::classify_status(rep._status);
|
||||
|
||||
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
|
||||
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
|
||||
if (possible_error) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
}
|
||||
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
}
|
||||
|
||||
if (rep._status != expected) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
co_await handler(rep, std::move(payload));
|
||||
}, as);
|
||||
}
|
||||
|
||||
future<> client::make_request(http::request req, reply_handler_ext handle_ex, std::optional<http::reply::status_type> expected, seastar::abort_source* as) {
|
||||
@@ -266,7 +309,7 @@ future<> client::make_request(http::request req, reply_handler_ext handle_ex, st
|
||||
auto handle = [&gc, handle = std::move(handle_ex)] (const http::reply& rep, input_stream<char>&& in) {
|
||||
return handle(gc, rep, std::move(in));
|
||||
};
|
||||
return do_make_request(gc, std::move(req), std::move(handle), expected, as);
|
||||
return make_request(std::move(req), std::move(handle), expected, as);
|
||||
}
|
||||
|
||||
future<> client::get_object_header(sstring object_name, http::experimental::client::reply_handler handler, seastar::abort_source* as) {
|
||||
@@ -714,7 +757,13 @@ future<> client::multipart_upload::abort_upload() {
|
||||
s3l.trace("DELETE upload {}", _upload_id);
|
||||
auto req = http::request::make("DELETE", _client->_host, _object_name);
|
||||
req.query_parameters["uploadId"] = std::exchange(_upload_id, ""); // now upload_started() returns false
|
||||
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content);
|
||||
co_await _client->make_request(std::move(req), ignore_reply, http::reply::status_type::no_content)
|
||||
.handle_exception([this](const std::exception_ptr& ex) -> future<> {
|
||||
// Here we discard whatever exception is thrown when aborting multipart upload since we don't care about cleanly aborting it since there are other
|
||||
// means to clean up dangling parts, for example `rclone cleanup` or S3 bucket's Lifecycle Management Policy
|
||||
s3l.warn("Failed to abort multipart upload. Object: '{}'. Reason: {})", _object_name, ex);
|
||||
co_return;
|
||||
});
|
||||
}
|
||||
|
||||
future<> client::multipart_upload::finalize_upload() {
|
||||
@@ -734,7 +783,24 @@ future<> client::multipart_upload::finalize_upload() {
|
||||
});
|
||||
// If this request fails, finalize_upload() throws, the upload should then
|
||||
// be aborted in .close() method
|
||||
co_await _client->make_request(std::move(req), look_for_errors);
|
||||
co_await _client->make_request(std::move(req), [](const http::reply& rep, input_stream<char>&& in) -> future<> {
|
||||
auto payload = std::move(in);
|
||||
auto status_class = http::reply::classify_status(rep._status);
|
||||
std::optional<aws::aws_error> possible_error = aws::aws_error::parse(co_await util::read_entire_stream_contiguous(payload));
|
||||
if (possible_error) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(std::move(possible_error.value())));
|
||||
}
|
||||
|
||||
if (status_class != http::reply::status_class::informational && status_class != http::reply::status_class::success) {
|
||||
co_await coroutine::return_exception(aws::aws_exception(aws::aws_error::from_http_code(rep._status)));
|
||||
}
|
||||
|
||||
if (rep._status != http::reply::status_type::ok) {
|
||||
co_await coroutine::return_exception(httpd::unexpected_status_error(rep._status));
|
||||
}
|
||||
// If we reach this point it means the request succeeded. However, the body payload was already consumed, so no response handler was invoked. At
|
||||
// this point it is ok since we are not interested in parsing this particular response
|
||||
});
|
||||
_upload_id = ""; // now upload_started() returns false
|
||||
}
|
||||
|
||||
@@ -1057,8 +1123,8 @@ class client::do_upload_file : private multipart_upload {
|
||||
if (_tag) {
|
||||
req._headers["x-amz-tagging"] = seastar::format("{}={}", _tag->key, _tag->value);
|
||||
}
|
||||
req.write_body("bin", len, [f = std::move(f), &progress = _progress] (output_stream<char>&& out_) mutable {
|
||||
auto input = make_file_input_stream(std::move(f), input_stream_options());
|
||||
req.write_body("bin", len, [f = std::move(f), &progress = _progress] (output_stream<char>&& out_) {
|
||||
auto input = make_file_input_stream(f, input_stream_options());
|
||||
auto output = std::move(out_);
|
||||
return copy_to(std::move(input), std::move(output), _transmit_size, progress);
|
||||
});
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
#include <filesystem>
|
||||
#include "utils/lister.hh"
|
||||
#include "utils/s3/creds.hh"
|
||||
#include "retry_strategy.hh"
|
||||
#include "utils/s3/client_fwd.hh"
|
||||
|
||||
using namespace seastar;
|
||||
@@ -77,6 +78,7 @@ class client : public enable_shared_from_this<client> {
|
||||
using global_factory = std::function<shared_ptr<client>(std::string)>;
|
||||
global_factory _gf;
|
||||
semaphore& _memory;
|
||||
std::unique_ptr<aws::retry_strategy> _retry_strategy;
|
||||
|
||||
struct private_tag {};
|
||||
|
||||
@@ -87,12 +89,11 @@ class client : public enable_shared_from_this<client> {
|
||||
future<> make_request(http::request req, http::experimental::client::reply_handler handle = ignore_reply, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
|
||||
using reply_handler_ext = noncopyable_function<future<>(group_client&, const http::reply&, input_stream<char>&& body)>;
|
||||
future<> make_request(http::request req, reply_handler_ext handle, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
|
||||
future<> do_make_request(group_client&, http::request req, http::experimental::client::reply_handler handle, std::optional<http::reply::status_type> expected = std::nullopt, seastar::abort_source* = nullptr);
|
||||
|
||||
future<> do_retryable_request(group_client& gc, http::request req, http::experimental::client::reply_handler handler, seastar::abort_source* as = nullptr) const;
|
||||
future<> get_object_header(sstring object_name, http::experimental::client::reply_handler handler, seastar::abort_source* = nullptr);
|
||||
public:
|
||||
|
||||
explicit client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag);
|
||||
client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global_factory gf, private_tag, std::unique_ptr<aws::retry_strategy> rs = std::make_unique<aws::default_retry_strategy>());
|
||||
static shared_ptr<client> make(std::string endpoint, endpoint_config_ptr cfg, semaphore& memory, global_factory gf = {});
|
||||
|
||||
future<uint64_t> get_object_size(sstring object_name, seastar::abort_source* = nullptr);
|
||||
|
||||
47
utils/s3/retry_strategy.cc
Normal file
47
utils/s3/retry_strategy.cc
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "retry_strategy.hh"
|
||||
#include "aws_error.hh"
|
||||
#include "utils/log.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace aws {
|
||||
|
||||
static logging::logger rs_logger("default_retry_strategy");
|
||||
|
||||
default_retry_strategy::default_retry_strategy(unsigned max_retries, unsigned scale_factor) : _max_retries(max_retries), _scale_factor(scale_factor) {
|
||||
}
|
||||
|
||||
bool default_retry_strategy::should_retry(const aws_error& error, unsigned attempted_retries) const {
|
||||
if (attempted_retries >= _max_retries) {
|
||||
rs_logger.error("Retries exhausted. Retry# {}", attempted_retries);
|
||||
return false;
|
||||
}
|
||||
bool should_retry = error.is_retryable() == retryable::yes;
|
||||
if (should_retry) {
|
||||
rs_logger.debug("S3 client request failed. Reason: {}. Retry# {}", error.get_error_message(), attempted_retries);
|
||||
} else {
|
||||
rs_logger.error("S3 client encountered non-retryable error. Reason: {}. Code: {}. Retry# {}",
|
||||
error.get_error_message(),
|
||||
std::to_underlying(error.get_error_type()),
|
||||
attempted_retries);
|
||||
}
|
||||
return should_retry;
|
||||
}
|
||||
|
||||
std::chrono::milliseconds default_retry_strategy::delay_before_retry(const aws_error&, unsigned attempted_retries) const {
|
||||
if (attempted_retries == 0) {
|
||||
return 0ms;
|
||||
}
|
||||
|
||||
return std::chrono::milliseconds((1UL << attempted_retries) * _scale_factor);
|
||||
}
|
||||
|
||||
} // namespace aws
|
||||
42
utils/s3/retry_strategy.hh
Normal file
42
utils/s3/retry_strategy.hh
Normal file
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include <chrono>
|
||||
|
||||
namespace aws {
|
||||
|
||||
class aws_error;
|
||||
|
||||
class retry_strategy {
|
||||
public:
|
||||
virtual ~retry_strategy() = default;
|
||||
// Returns true if the error can be retried given the error and the number of times already tried.
|
||||
[[nodiscard]] virtual bool should_retry(const aws_error& error, unsigned attempted_retries) const = 0;
|
||||
|
||||
// Calculates the time in milliseconds the client should wait before attempting another request based on the error and attemptedRetries count.
|
||||
[[nodiscard]] virtual std::chrono::milliseconds delay_before_retry(const aws_error& error, unsigned attempted_retries) const = 0;
|
||||
|
||||
[[nodiscard]] virtual unsigned get_max_retries() const = 0;
|
||||
};
|
||||
|
||||
class default_retry_strategy : public retry_strategy {
|
||||
unsigned _max_retries;
|
||||
unsigned _scale_factor;
|
||||
|
||||
public:
|
||||
explicit default_retry_strategy(unsigned max_retries = 10, unsigned scale_factor = 25);
|
||||
|
||||
[[nodiscard]] bool should_retry(const aws_error& error, unsigned attempted_retries) const override;
|
||||
|
||||
[[nodiscard]] std::chrono::milliseconds delay_before_retry(const aws_error& error, unsigned attempted_retries) const override;
|
||||
|
||||
[[nodiscard]] unsigned get_max_retries() const override { return _max_retries; }
|
||||
};
|
||||
|
||||
} // namespace aws
|
||||
Reference in New Issue
Block a user