/* * Copyright (C) 2025-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include #include "db/config.hh" #include "transport/generic_server.hh" #include "test/perf/perf.hh" seastar::logger plog("perf"); struct test_config { unsigned duration; unsigned concurrency; uint64_t requests_per_connection; unsigned request_size; unsigned response_size; std::string server_host; uint16_t server_port; }; class test_connection : public generic_server::connection { const test_config& _conf; uint64_t _requests; public: test_connection(generic_server::server& server, connected_socket&& fd, named_semaphore& sem, semaphore_units initial_sem_units, const test_config& conf) : generic_server::connection(server, std::move(fd), sem, std::move(initial_sem_units)) , _conf(conf) , _requests(0) { } virtual void handle_error(future<>&& f) override { try { f.get(); } catch (const std::exception& ex) { plog.error("Error during processing request: {}", ex); } } virtual future<> process_request() override { co_await _read_buf.read_exactly(_conf.request_size); co_await _write_buf.write(temporary_buffer(_conf.response_size)); co_await _write_buf.flush(); // simulate that it take 2 exchanges to establish logical connection // this is important for performance as server disables cpu concurrency // limiting code after that if (++_requests == 1) { on_connection_ready(); } } }; class test_server : public generic_server::server { const test_config& _conf; public: virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) override { return make_shared(*this, std::move(fd), sem, std::move(initial_sem_units), _conf); } scheduling_group get_scheduling_group_for_new_connection() const override { return current_scheduling_group(); } test_server(const test_config& conf) : generic_server::server("test_server", plog, generic_server::config{utils::updateable_value(std::numeric_limits::max())}) , _conf(conf) {} }; struct tester { test_config conf; test_server server; tester(const test_config& cfg) : conf(cfg) , server(conf) {} socket_address addr() { return socket_address(net::inet_address(conf.server_host), conf.server_port); } future<> start() { return server.listen(addr(), nullptr, false, false, {}); } future<> run() { return seastar::async([&] { auto results = time_parallel([&] -> future<> { connected_socket sock = co_await seastar::connect(addr()); auto out = sock.output(); auto in = sock.input(); for (uint64_t i = 0; i <= conf.requests_per_connection; i++) { co_await out.write(temporary_buffer(conf.request_size)); co_await out.flush(); co_await in.read_exactly(conf.response_size); } co_await out.close(); co_await in.close(); }, conf.concurrency, conf.duration, 0 ,true, conf.requests_per_connection); // Technically, we're measuring the client side here, // but since it's a single process with the server, both sides are included. std::cout << aggregated_perf_results(results) << std::endl; }).or_terminate(); } future<> stop() { co_await server.stop(); } }; int main(int argc, char** argv) { namespace bpo = boost::program_options; app_template app; app.add_options() ("duration", bpo::value()->default_value(10), "seconds to run") ("concurrency", bpo::value()->default_value(200), "clients per shard") ("requests-per-connection", bpo::value()->default_value(1024), "number of requests issued before closing the connection and making new one") ("request-size", bpo::value()->default_value(1024), "request size") ("response-size", bpo::value()->default_value(1024), "response size") ("server-host", bpo::value()->default_value("127.0.0.1"), "server address, defaults to localhost") ("server-port", bpo::value()->default_value(1234), "server port") ; return app.run(argc, argv, [&app] () -> future<> { test_config conf; conf.duration = app.configuration()["duration"].as(); conf.concurrency = app.configuration()["concurrency"].as(); conf.requests_per_connection = app.configuration()["requests-per-connection"].as(); conf.request_size = app.configuration()["request-size"].as(); conf.response_size = app.configuration()["response-size"].as(); conf.server_host = app.configuration()["server-host"].as(); conf.server_port = app.configuration()["server-port"].as(); sharded test; plog.info("Starting"); co_await test.start(std::cref(conf)); co_await test.invoke_on_all(&tester::start); try { plog.info("Running"); co_await test.invoke_on_all(&tester::run); } catch (...) { plog.error("Error running: {}", std::current_exception()); } co_await test.stop(); }); }