/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include "test/lib/test_utils.hh" #include "test/lib/random_utils.hh" #include "utils/s3/client.hh" #include "utils/estimated_histogram.hh" seastar::logger plog("perf"); class tester { std::chrono::seconds _duration; std::string _object_name; size_t _object_size; semaphore _mem; shared_ptr _client; utils::estimated_histogram _latencies; unsigned _errors = 0; unsigned _part_size_mb; bool _remove_file; static s3::endpoint_config_ptr make_config(unsigned sockets) { s3::endpoint_config cfg; cfg.port = 443; cfg.use_https = true; cfg.region = tests::getenv_safe("AWS_DEFAULT_REGION"); cfg.max_connections = sockets; return make_lw_shared(std::move(cfg)); } static constexpr unsigned chunk_size = 1000; std::chrono::steady_clock::time_point now() const { return std::chrono::steady_clock::now(); } public: tester(std::chrono::seconds dur, unsigned sockets, unsigned part_size, sstring object_name, size_t obj_size) : _duration(dur) , _object_name(std::move(object_name)) , _object_size(obj_size) , _mem(memory::stats().total_memory()) , _client(s3::client::make(tests::getenv_safe("S3_SERVER_ADDRESS_FOR_TEST"), make_config(sockets), _mem)) , _part_size_mb(part_size) , _remove_file(false) {} private: future<> make_temporary_file() { _object_name = fmt::format("/{}/perfobject-{}-{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), ::getpid(), this_shard_id()); _remove_file = true; plog.debug("Creating {} of {} bytes", _object_name, _object_size); auto out = output_stream(_client->make_upload_sink(_object_name)); std::exception_ptr ex; try { auto rnd = tests::random::get_bytes(chunk_size); uint64_t written = 0; do { co_await out.write(reinterpret_cast(rnd.begin()), std::min(_object_size - written, rnd.size())); written += rnd.size(); } while (written < _object_size); co_await out.flush(); } catch (...) { ex = std::current_exception(); plog.error("Cannot write object: {}", ex); } co_await out.close(); if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } } public: future<> start() { if (_object_name.empty()) { co_await make_temporary_file(); } else { _object_size = co_await _client->get_object_size(_object_name); } } future<> run_contiguous_get() { plog.info("Running GET-s"); auto until = now() + _duration; uint64_t off = 0; do { auto start = now(); try { co_await _client->get_object_contiguous(_object_name, s3::range{off, chunk_size}); off = (off + chunk_size) % (_object_size - chunk_size); _latencies.add(std::chrono::duration_cast(now() - start).count()); } catch (...) { _errors++; } } while (now() < until); } future<> run_download(bool is_chunked) { plog.info("Downloading with input_stream"); auto in = input_stream(is_chunked ? _client->make_chunked_download_source(_object_name, s3::range{0, _object_size}) : _client->make_download_source(_object_name, s3::range{0, _object_size})); auto start = now(); uint64_t sz = 0; co_await in.consume([&sz] (auto buf) { if (buf.empty()) { return make_ready_future>(stop_consuming(std::move(buf))); } sz += buf.size(); return make_ready_future>(continue_consuming()); }); co_await in.close(); auto time = std::chrono::duration_cast>(now() - start); plog.info("Downloaded {}MB in {}s, speed {}MB/s", sz >> 20, time.count(), (sz >> 20) / time.count()); } future<> run_upload() { plog.info("Uploading"); auto file_name = fs::path(_object_name); auto sz = co_await seastar::file_size(file_name.native()); _object_name = fmt::format("/{}/{}", tests::getenv_safe("S3_BUCKET_FOR_TEST"), file_name.filename().native()); _remove_file = true; auto start = now(); co_await _client->upload_file(file_name, _object_name, {}, _part_size_mb << 20); auto time = std::chrono::duration_cast>(now() - start); plog.info("Uploaded {}MB in {}s, speed {}MB/s", sz >> 20, time.count(), (sz >> 20) / time.count()); } future<> stop() { if (_remove_file) { plog.debug("Removing {}", _object_name); co_await _client->delete_object(_object_name); } co_await _client->close(); auto print_percentiles = [] (const utils::estimated_histogram& hist) { return format("min: {:-6d}, 50%: {:-6d}, 90%: {:-6d}, 99%: {:-6d}, 99.9%: {:-6d}, max: {:-6d} [ms]", hist.percentile(0), hist.percentile(0.5), hist.percentile(0.9), hist.percentile(0.99), hist.percentile(0.999), hist.percentile(1.0) ); }; plog.info("requests total: {:5}, errors: {:5}; latencies: {}", _latencies._count, _errors, print_percentiles(_latencies)); } }; int main(int argc, char** argv) { namespace bpo = boost::program_options; app_template app; app.add_options() ("operation", bpo::value()->required(), "which test to perform (options: upload, get, download)") ("duration", bpo::value()->default_value(10), "seconds to run") ("sockets", bpo::value()->default_value(1), "maximum number of socket for http client") ("part_size_mb", bpo::value()->default_value(5), "part size") ("object_name", bpo::value()->default_value(""), "use given object/file name") ("object_size", bpo::value()->default_value(1 << 20), "size of test object") ; return app.run(argc, argv, [&app] () -> future<> { auto dur = std::chrono::seconds(app.configuration()["duration"].as()); auto sks = app.configuration()["sockets"].as(); auto part_size = app.configuration()["part_size_mb"].as(); auto oname = app.configuration()["object_name"].as(); auto osz = app.configuration()["object_size"].as(); auto operation = app.configuration()["operation"].as(); sharded test; plog.info("Creating"); co_await test.start(dur, sks, part_size, oname, osz); try { plog.info("Starting"); co_await test.invoke_on_all(&tester::start); plog.info("Running"); if (operation == "upload") { co_await test.invoke_on_all(&tester::run_upload); } else if (operation == "get") { co_await test.invoke_on_all(&tester::run_contiguous_get); } else if (operation == "download") { co_await test.invoke_on_all([](auto& tester) { return tester.run_download(false); }); } else if (operation == "chunked_download") { co_await test.invoke_on_all([](auto& tester) { return tester.run_download(true); }); } else { throw std::runtime_error(format("Unknown operation {}", operation)); } } catch (...) { plog.error("Error running: {}", std::current_exception()); } plog.info("Stopping (and printing results)"); co_await test.stop(); }); }