diff --git a/db/config.cc b/db/config.cc index 25d28f1baa..81289d01d1 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1254,6 +1254,8 @@ db::config::config(std::shared_ptr exts) "Time period in seconds after which unused schema versions will be evicted from the local schema registry cache. Default is 1 second.") , max_concurrent_requests_per_shard(this, "max_concurrent_requests_per_shard", liveness::LiveUpdate, value_status::Used, std::numeric_limits::max(), "Maximum number of concurrent requests a single shard can handle before it starts shedding extra load. By default, no requests will be shed.") + , uninitialized_connections_semaphore_cpu_concurrency(this, "uninitialized_connections_semaphore_cpu_concurrency", liveness::LiveUpdate, value_status::Used, 8, + "Maximum number of new concurrent connections from drivers that a single shard can be processing before it starts throttling incomming connections. This limit applies only to new connections excluding the ones blocked on network IO; connections that are ready to serve requests are not affected. By default the limit is 8.") , cdc_dont_rewrite_streams(this, "cdc_dont_rewrite_streams", value_status::Used, false, "Disable rewriting streams from cdc_streams_descriptions to cdc_streams_descriptions_v2. Should not be necessary, but the procedure is expensive and prone to failures; this config option is left as a backdoor in case some user requires manual intervention.") , strict_allow_filtering(this, "strict_allow_filtering", liveness::LiveUpdate, value_status::Used, strict_allow_filtering_default(), "Match Cassandra in requiring ALLOW FILTERING on slow queries. Can be true, false, or warn. When false, Scylla accepts some slow queries even without ALLOW FILTERING that Cassandra rejects. Warn is same as false, but with warning.") diff --git a/db/config.hh b/db/config.hh index fd939dbb16..467af02979 100644 --- a/db/config.hh +++ b/db/config.hh @@ -432,6 +432,7 @@ public: named_value user_defined_function_contiguous_allocation_limit_bytes; named_value schema_registry_grace_period; named_value max_concurrent_requests_per_shard; + named_value uninitialized_connections_semaphore_cpu_concurrency; named_value cdc_dont_rewrite_streams; named_value strict_allow_filtering; named_value strict_is_not_null_in_views; diff --git a/generic_server.cc b/generic_server.cc index 2b5f710321..a8e9a5d515 100644 --- a/generic_server.cc +++ b/generic_server.cc @@ -19,8 +19,9 @@ namespace generic_server { -connection::connection(server& server, connected_socket&& fd) - : _server{server} +connection::connection(server& server, connected_socket&& fd, named_semaphore& sem, semaphore_units initial_sem_units) + : _conns_cpu_concurrency{sem, std::move(initial_sem_units), false} + , _server{server} , _fd{std::move(fd)} , _read_buf(_fd.input()) , _write_buf(_fd.output()) @@ -150,6 +151,8 @@ future<> connection::shutdown() } config::config(const db::config& cfg) + : uninitialized_connections_semaphore_cpu_concurrency( + cfg.uninitialized_connections_semaphore_cpu_concurrency) { } @@ -161,7 +164,21 @@ config::config(uint32_t uninitialized_connections_semaphore_cpu_concurrency) server::server(const sstring& server_name, logging::logger& logger, config cfg) : _server_name{server_name} , _logger{logger} + , _conns_cpu_concurrency(cfg.uninitialized_connections_semaphore_cpu_concurrency) + , _prev_conns_cpu_concurrency(_conns_cpu_concurrency) + , _conns_cpu_concurrency_semaphore(_conns_cpu_concurrency, named_semaphore_exception_factory{"connections cpu concurrency semaphore"}) { + _conns_cpu_concurrency.observe([this] (const uint32_t &concurrency) { + if (concurrency == _prev_conns_cpu_concurrency) { + return; + } + if (concurrency > _prev_conns_cpu_concurrency) { + _conns_cpu_concurrency_semaphore.signal(concurrency - _prev_conns_cpu_concurrency); + } else { + _conns_cpu_concurrency_semaphore.consume(_prev_conns_cpu_concurrency - concurrency); + } + _prev_conns_cpu_concurrency = concurrency; + }); } server::~server() @@ -256,7 +273,9 @@ future<> server::do_accepts(int which, bool keepalive, socket_address server_add auto addr = std::move(cs_sa.remote_address); fd.set_nodelay(true); fd.set_keepalive(keepalive); - auto conn = make_connection(server_addr, std::move(fd), std::move(addr)); + auto conn = make_connection(server_addr, std::move(fd), std::move(addr), + _conns_cpu_concurrency_semaphore, {}); + // Move the processing into the background. (void)futurize_invoke([this, conn] { return advertise_new_connection(conn); // Notify any listeners about new connection. diff --git a/generic_server.hh b/generic_server.hh index 50b8150672..ad40c45fb1 100644 --- a/generic_server.hh +++ b/generic_server.hh @@ -12,7 +12,9 @@ #include "utils/log.hh" #include "seastarx.hh" +#include "utils/updateable_value.hh" +#include #include #include @@ -41,6 +43,12 @@ public: using connection_process_loop = noncopyable_function ()>; using execute_under_tenant_type = noncopyable_function (connection_process_loop)>; bool _tenant_switch = false; + struct cpu_concurrency_t { + named_semaphore& semaphore; + semaphore_units units; + bool stopped; + }; + cpu_concurrency_t _conns_cpu_concurrency; execute_under_tenant_type _execute_under_current_tenant = no_tenant(); protected: server& _server; @@ -54,7 +62,7 @@ protected: private: future<> process_until_tenant_switch(); public: - connection(server& server, connected_socket&& fd); + connection(server& server, connected_socket&& fd, named_semaphore& sem, semaphore_units initial_sem_units); virtual ~connection(); virtual future<> process(); @@ -115,7 +123,10 @@ protected: std::list _gentle_iterators; std::vector _listeners; shared_ptr _credentials; - +private: + utils::updateable_value _conns_cpu_concurrency; + uint32_t _prev_conns_cpu_concurrency; + named_semaphore _conns_cpu_concurrency_semaphore; public: server(const sstring& server_name, logging::logger& logger, config cfg); @@ -140,7 +151,7 @@ public: future<> do_accepts(int which, bool keepalive, socket_address server_addr); protected: - virtual seastar::shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr) = 0; + virtual seastar::shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) = 0; virtual future<> advertise_new_connection(shared_ptr conn); diff --git a/redis/server.cc b/redis/server.cc index 32db15df60..07c13b27ba 100644 --- a/redis/server.cc +++ b/redis/server.cc @@ -14,6 +14,8 @@ #include "db/consistency_level_type.hh" +#include +#include #include #include #include @@ -39,8 +41,8 @@ redis_server::redis_server(seastar::sharded& qp, auth::s } shared_ptr -redis_server::make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr) { - auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr)); +redis_server::make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) { + auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr), sem, std::move(initial_sem_units)); ++_stats._connects; ++_stats._connections; return conn; @@ -60,8 +62,8 @@ future redis_server::connection::process_request_one(redis }); } -redis_server::connection::connection(redis_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr) - : generic_server::connection(server, std::move(fd)) +redis_server::connection::connection(redis_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) + : generic_server::connection(server, std::move(fd), sem, std::move(initial_sem_units)) , _server(server) , _server_addr(server_addr) , _options(server._config._read_consistency_level, server._config._write_consistency_level, server._config._timeout_config, server._auth_service, addr, server._total_redis_db_count) diff --git a/redis/server.hh b/redis/server.hh index a18394535e..99063857d7 100644 --- a/redis/server.hh +++ b/redis/server.hh @@ -88,7 +88,7 @@ private: >; static thread_local execution_stage_type _process_request_stage; public: - connection(redis_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr); + connection(redis_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units); virtual ~connection(); future<> process_request() override; void handle_error(future<>&& f) override; @@ -99,7 +99,7 @@ private: future process_request_internal(); }; - virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr) override; + virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) override; future<> unadvertise_connection(shared_ptr conn) override; }; } diff --git a/test/boost/generic_server_test.cc b/test/boost/generic_server_test.cc index 553a4b5a35..de0b918d61 100644 --- a/test/boost/generic_server_test.cc +++ b/test/boost/generic_server_test.cc @@ -26,7 +26,7 @@ class test_server : public server { public: test_server(const db::config& cfg) : server("test_server", test_logger, config(cfg)) {}; protected: - [[noreturn]] shared_ptr make_connection(socket_address, connected_socket&&, socket_address) override { + [[noreturn]] shared_ptr make_connection(socket_address, connected_socket&&, socket_address, named_semaphore& sem, semaphore_units initial_sem_units) override { SCYLLA_ASSERT(false); } }; diff --git a/transport/server.cc b/transport/server.cc index 703f9f8b23..272bdbf605 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -11,6 +11,7 @@ #include "cql3/statements/batch_statement.hh" #include "cql3/statements/modification_statement.hh" #include +#include #include "types/collection.hh" #include "types/list.hh" #include "types/set.hh" @@ -318,8 +319,8 @@ cql_server::cql_server(distributed& qp, auth::service& au cql_server::~cql_server() = default; shared_ptr -cql_server::make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr) { - auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr)); +cql_server::make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) { + auto conn = make_shared(*this, server_addr, std::move(fd), std::move(addr),sem, std::move(initial_sem_units)); ++_stats.connects; ++_stats.connections; return conn; @@ -609,8 +610,8 @@ future>> }); } -cql_server::connection::connection(cql_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr) - : generic_server::connection{server, std::move(fd)} +cql_server::connection::connection(cql_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) + : generic_server::connection{server, std::move(fd), sem, std::move(initial_sem_units)} , _server(server) , _server_addr(server_addr) , _client_state(service::client_state::external_tag{}, server._auth_service, &server._sl_controller, server.timeout_config(), addr) diff --git a/transport/server.hh b/transport/server.hh index ed159f1bb9..c02a20b042 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -245,7 +245,7 @@ private: service_permit>; static thread_local execution_stage_type _process_request_stage; public: - connection(cql_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr); + connection(cql_server& server, socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units); virtual ~connection(); future<> process_request() override; void handle_error(future<>&& f) override; @@ -336,7 +336,7 @@ private: friend class type_codec; private: - virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr) override; + virtual shared_ptr make_connection(socket_address server_addr, connected_socket&& fd, socket_address addr, named_semaphore& sem, semaphore_units initial_sem_units) override; future<> advertise_new_connection(shared_ptr conn) override; future<> unadvertise_connection(shared_ptr conn) override;