diff --git a/db/config.cc b/db/config.cc index 2c1e4fa428..0c4d846bcc 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1705,7 +1705,7 @@ db::config::config(std::shared_ptr exts) , ldap_bind_passwd(this, "ldap_bind_passwd", value_status::Used, "", "Password used by LDAPRoleManager for binding to LDAP server.") , saslauthd_socket_path(this, "saslauthd_socket_path", value_status::Used, "", "UNIX domain socket on which saslauthd is listening.") , object_storage_endpoints(this, "object_storage_endpoints", liveness::LiveUpdate, value_status::Used, {}, "Object storage endpoints configuration.") - , object_storage_connections_per_shard(this, "object_storage_connections_per_shard", value_status::Used, 128, + , object_storage_connections_per_shard(this, "object_storage_connections_per_shard", liveness::LiveUpdate, value_status::Used, 128, "Maximum number of object storage connections per shard. " "Connections are distributed proportionally across scheduling groups based on their shares.") , error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.") diff --git a/sstables/object_storage_client.cc b/sstables/object_storage_client.cc index 0b1dd88ef2..8c5bafa682 100644 --- a/sstables/object_storage_client.cc +++ b/sstables/object_storage_client.cc @@ -118,6 +118,9 @@ public: auto& epc = ep.get_s3_storage(); _client->update_config_sync(epc.region, epc.iam_role_arn); } + void update_connections_per_shard(unsigned connections_per_shard) override { + _client->update_connections_per_shard(connections_per_shard); + } future<> close() override { return _client->close(); } @@ -323,6 +326,9 @@ public: osclog.info("Old GCS client cleanup done, use_count={}", old_client.use_count()); }); } + void update_connections_per_shard(unsigned) override { + // GCS client does not support per-scheduling-group connection budgeting + } future<> close() override { osclog.info("Closing GCS client..."); co_await _config_update_gate.close(); diff --git a/sstables/object_storage_client.hh b/sstables/object_storage_client.hh index 85914f259a..86a9f6e310 100644 --- a/sstables/object_storage_client.hh +++ b/sstables/object_storage_client.hh @@ -85,6 +85,7 @@ public: virtual future<> upload_file(std::filesystem::path path, object_name, utils::upload_progress& up, seastar::abort_source* = nullptr) = 0; virtual void update_config_sync(const db::object_storage_endpoint_param&) = 0; + virtual void update_connections_per_shard(unsigned) = 0; virtual future<> close() = 0; }; diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index bed86a5a0b..4208ad7d54 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -86,6 +86,7 @@ storage_manager::storage_manager(const db::config& cfg, config stm_cfg) : _object_storage_clients_memory(stm_cfg.object_storage_clients_memory) , _connections_per_shard(cfg.object_storage_connections_per_shard()) , _config_updater(std::make_unique(cfg, *this)) + , _connections_updater(std::make_unique(cfg, *this)) { for (auto& e : cfg.object_storage_endpoints()) { _object_storage_endpoints.emplace(std::make_pair(e.key(), e)); @@ -169,6 +170,18 @@ storage_manager::config_updater_sync::config_updater_sync(const db::config& cfg, })) {} +storage_manager::connections_updater_sync::connections_updater_sync(const db::config& cfg, storage_manager& sstm) + : observer(cfg.object_storage_connections_per_shard.observe([&sstm] (unsigned new_value) { + smlogger.info("connections_updater: updating connections_per_shard to {}", new_value); + sstm._connections_per_shard = new_value; + for (auto& [endpoint, ep] : sstm._object_storage_endpoints) { + if (ep.client) { + ep.client->update_connections_per_shard(new_value); + } + } + })) +{} + sstables::sstable::version_types sstables_manager::get_highest_supported_format() const noexcept { if (_features.ms_sstable) { return sstable_version_types::ms; diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index 70460d108c..a731d90181 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -67,6 +67,11 @@ class storage_manager : public peering_sharded_service { config_updater_sync(const db::config& cfg, storage_manager&); }; + struct connections_updater_sync { + utils::observer observer; + connections_updater_sync(const db::config& cfg, storage_manager&); + }; + struct object_storage_endpoint { db::object_storage_endpoint_param cfg; shared_ptr client; @@ -77,6 +82,7 @@ class storage_manager : public peering_sharded_service { unsigned _connections_per_shard; std::unordered_map _object_storage_endpoints; std::unique_ptr _config_updater; + std::unique_ptr _connections_updater; seastar::metrics::metric_groups metrics; object_storage_endpoint& get_endpoint(const sstring& ep); diff --git a/utils/s3/client.cc b/utils/s3/client.cc index faabe9840c..ec959b292b 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -150,6 +150,20 @@ void client::update_config_sync(std::string region, std::string ira) { }); } +void client::update_connections_per_shard(unsigned connections_per_shard) { + if (_config_update_gate.is_closed()) { + s3l.info("config update gate is closed"); + return; + } + auto holder = _config_update_gate.hold(); + (void)with_semaphore(_rebalance_sem, 1, [this, connections_per_shard] { + _cfg->connections_per_shard = connections_per_shard; + return rebalance_connections(); + }).handle_exception([holder = std::move(holder)](auto ex) { + s3l.warn("Failed to rebalance connections after config update: {}", ex); + }); +} + shared_ptr client::make(std::string endpoint, endpoint_config_ptr cfg, semaphore& mem, global_factory gf) { return seastar::make_shared(std::move(endpoint), std::move(cfg), mem, std::move(gf), private_tag{}); } diff --git a/utils/s3/client.hh b/utils/s3/client.hh index ccb3e34aa3..7032af61e6 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -230,6 +230,7 @@ public: seastar::abort_source* = nullptr); void update_config_sync(std::string reg, std::string ira); + void update_connections_per_shard(unsigned connections_per_shard); struct handle { std::string _host;