Before this change, new connections were handled in a default scheduling group (`main`), because before the user is authenticated we do not know which service level should be used. With the new `sl:driver` service level, creation of new connections can be moved to `sl:driver`. We switch the service level as early as possible, in `do_accepts`. There is a possibility, that `sl:driver` will not exist yet, for instance, in specific upgrade cases, or if it was removed. Therefore, we also switch to `sl:driver` after a connection is accepted. Refs: scylladb/scylladb#24411
156 lines
5.9 KiB
C++
156 lines
5.9 KiB
C++
/*
|
|
* Copyright (C) 2025-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <cstdint>
|
|
#include <seastar/core/app-template.hh>
|
|
#include <seastar/coroutine/parallel_for_each.hh>
|
|
#include <seastar/core/temporary_buffer.hh>
|
|
#include <seastar/net/socket_defs.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/shared_ptr.hh>
|
|
#include <seastar/core/thread.hh>
|
|
|
|
#include "db/config.hh"
|
|
#include "transport/generic_server.hh"
|
|
#include "test/perf/perf.hh"
|
|
|
|
seastar::logger plog("perf");
|
|
|
|
struct test_config {
|
|
unsigned duration;
|
|
unsigned concurrency;
|
|
uint64_t requests_per_connection;
|
|
unsigned request_size;
|
|
unsigned response_size;
|
|
std::string server_host;
|
|
uint16_t server_port;
|
|
};
|
|
|
|
class test_connection : public generic_server::connection {
|
|
const test_config& _conf;
|
|
uint64_t _requests;
|
|
|
|
public:
|
|
test_connection(generic_server::server& server, connected_socket&& fd, named_semaphore& sem, semaphore_units<named_semaphore_exception_factory> initial_sem_units, const test_config& conf)
|
|
: generic_server::connection(server, std::move(fd), sem, std::move(initial_sem_units))
|
|
, _conf(conf)
|
|
, _requests(0) {
|
|
}
|
|
|
|
virtual void handle_error(future<>&& f) override {
|
|
try {
|
|
f.get();
|
|
} catch (const std::exception& ex) {
|
|
plog.error("Error during processing request: {}", ex);
|
|
}
|
|
}
|
|
|
|
virtual future<> process_request() override {
|
|
co_await _read_buf.read_exactly(_conf.request_size);
|
|
co_await _write_buf.write(temporary_buffer<char>(_conf.response_size));
|
|
co_await _write_buf.flush();
|
|
// simulate that it take 2 exchanges to establish logical connection
|
|
// this is important for performance as server disables cpu concurrency
|
|
// limiting code after that
|
|
if (++_requests == 1) {
|
|
on_connection_ready();
|
|
}
|
|
}
|
|
};
|
|
|
|
class test_server : public generic_server::server {
|
|
const test_config& _conf;
|
|
public:
|
|
virtual shared_ptr<generic_server::connection> make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units<named_semaphore_exception_factory> initial_sem_units) override {
|
|
return make_shared<test_connection>(*this, std::move(fd), sem, std::move(initial_sem_units), _conf);
|
|
}
|
|
|
|
scheduling_group get_scheduling_group_for_new_connection() const override { return current_scheduling_group(); }
|
|
|
|
test_server(const test_config& conf)
|
|
: generic_server::server("test_server", plog,
|
|
generic_server::config{utils::updateable_value<uint32_t>(std::numeric_limits<uint32_t>::max())})
|
|
, _conf(conf) {}
|
|
};
|
|
|
|
struct tester {
|
|
test_config conf;
|
|
test_server server;
|
|
|
|
tester(const test_config& cfg) : conf(cfg) , server(conf) {}
|
|
|
|
socket_address addr() {
|
|
return socket_address(net::inet_address(conf.server_host), conf.server_port);
|
|
}
|
|
|
|
future<> start() {
|
|
return server.listen(addr(), nullptr, false, false, {});
|
|
}
|
|
|
|
future<> run() {
|
|
return seastar::async([&] {
|
|
auto results = time_parallel([&] -> future<> {
|
|
connected_socket sock = co_await seastar::connect(addr());
|
|
auto out = sock.output();
|
|
auto in = sock.input();
|
|
for (uint64_t i = 0; i <= conf.requests_per_connection; i++) {
|
|
co_await out.write(temporary_buffer<char>(conf.request_size));
|
|
co_await out.flush();
|
|
co_await in.read_exactly(conf.response_size);
|
|
}
|
|
co_await out.close();
|
|
co_await in.close();
|
|
}, conf.concurrency, conf.duration, 0 ,true, conf.requests_per_connection);
|
|
|
|
// Technically, we're measuring the client side here,
|
|
// but since it's a single process with the server, both sides are included.
|
|
std::cout << aggregated_perf_results(results) << std::endl;
|
|
}).or_terminate();
|
|
}
|
|
|
|
future<> stop() {
|
|
co_await server.stop();
|
|
}
|
|
};
|
|
|
|
int main(int argc, char** argv) {
|
|
namespace bpo = boost::program_options;
|
|
app_template app;
|
|
app.add_options()
|
|
("duration", bpo::value<unsigned>()->default_value(10), "seconds to run")
|
|
("concurrency", bpo::value<unsigned>()->default_value(200), "clients per shard")
|
|
("requests-per-connection", bpo::value<uint64_t>()->default_value(1024), "number of requests issued before closing the connection and making new one")
|
|
("request-size", bpo::value<unsigned>()->default_value(1024), "request size")
|
|
("response-size", bpo::value<unsigned>()->default_value(1024), "response size")
|
|
("server-host", bpo::value<std::string>()->default_value("127.0.0.1"), "server address, defaults to localhost")
|
|
("server-port", bpo::value<uint16_t>()->default_value(1234), "server port")
|
|
;
|
|
return app.run(argc, argv, [&app] () -> future<> {
|
|
test_config conf;
|
|
conf.duration = app.configuration()["duration"].as<unsigned>();
|
|
conf.concurrency = app.configuration()["concurrency"].as<unsigned>();
|
|
conf.requests_per_connection = app.configuration()["requests-per-connection"].as<uint64_t>();
|
|
conf.request_size = app.configuration()["request-size"].as<unsigned>();
|
|
conf.response_size = app.configuration()["response-size"].as<unsigned>();
|
|
conf.server_host = app.configuration()["server-host"].as<std::string>();
|
|
conf.server_port = app.configuration()["server-port"].as<uint16_t>();
|
|
|
|
sharded<tester> test;
|
|
plog.info("Starting");
|
|
co_await test.start(std::cref(conf));
|
|
co_await test.invoke_on_all(&tester::start);
|
|
try {
|
|
plog.info("Running");
|
|
co_await test.invoke_on_all(&tester::run);
|
|
} catch (...) {
|
|
plog.error("Error running: {}", std::current_exception());
|
|
}
|
|
co_await test.stop();
|
|
});
|
|
}
|