s3: make connections_per_shard live-updateable

Wire the object_storage_connections_per_shard config option as
LiveUpdate so it can be changed at runtime without restart. When
the value changes, the storage_manager observer propagates it to
all existing S3 clients, which rebalance their connection pools
under the rebalance semaphore.
This commit is contained in:
Ernest Zaslavsky
2026-05-13 18:19:10 +03:00
parent b9e1dcc0fe
commit 86f678a592
7 changed files with 42 additions and 1 deletions

View File

@@ -1705,7 +1705,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, ldap_bind_passwd(this, "ldap_bind_passwd", value_status::Used, "", "Password used by LDAPRoleManager for binding to LDAP server.")
, saslauthd_socket_path(this, "saslauthd_socket_path", value_status::Used, "", "UNIX domain socket on which saslauthd is listening.")
, object_storage_endpoints(this, "object_storage_endpoints", liveness::LiveUpdate, value_status::Used, {}, "Object storage endpoints configuration.")
, object_storage_connections_per_shard(this, "object_storage_connections_per_shard", value_status::Used, 128,
, object_storage_connections_per_shard(this, "object_storage_connections_per_shard", liveness::LiveUpdate, value_status::Used, 128,
"Maximum number of object storage connections per shard. "
"Connections are distributed proportionally across scheduling groups based on their shares.")
, error_injections_at_startup(this, "error_injections_at_startup", error_injection_value_status, {}, "List of error injections that should be enabled on startup.")

View File

@@ -118,6 +118,9 @@ public:
auto& epc = ep.get_s3_storage();
_client->update_config_sync(epc.region, epc.iam_role_arn);
}
void update_connections_per_shard(unsigned connections_per_shard) override {
_client->update_connections_per_shard(connections_per_shard);
}
future<> close() override {
return _client->close();
}
@@ -323,6 +326,9 @@ public:
osclog.info("Old GCS client cleanup done, use_count={}", old_client.use_count());
});
}
void update_connections_per_shard(unsigned) override {
// GCS client does not support per-scheduling-group connection budgeting
}
future<> close() override {
osclog.info("Closing GCS client...");
co_await _config_update_gate.close();

View File

@@ -85,6 +85,7 @@ public:
virtual future<> upload_file(std::filesystem::path path, object_name, utils::upload_progress& up, seastar::abort_source* = nullptr) = 0;
virtual void update_config_sync(const db::object_storage_endpoint_param&) = 0;
virtual void update_connections_per_shard(unsigned) = 0;
virtual future<> close() = 0;
};

View File

@@ -86,6 +86,7 @@ storage_manager::storage_manager(const db::config& cfg, config stm_cfg)
: _object_storage_clients_memory(stm_cfg.object_storage_clients_memory)
, _connections_per_shard(cfg.object_storage_connections_per_shard())
, _config_updater(std::make_unique<config_updater_sync>(cfg, *this))
, _connections_updater(std::make_unique<connections_updater_sync>(cfg, *this))
{
for (auto& e : cfg.object_storage_endpoints()) {
_object_storage_endpoints.emplace(std::make_pair(e.key(), e));
@@ -169,6 +170,18 @@ storage_manager::config_updater_sync::config_updater_sync(const db::config& cfg,
}))
{}
storage_manager::connections_updater_sync::connections_updater_sync(const db::config& cfg, storage_manager& sstm)
: observer(cfg.object_storage_connections_per_shard.observe([&sstm] (unsigned new_value) {
smlogger.info("connections_updater: updating connections_per_shard to {}", new_value);
sstm._connections_per_shard = new_value;
for (auto& [endpoint, ep] : sstm._object_storage_endpoints) {
if (ep.client) {
ep.client->update_connections_per_shard(new_value);
}
}
}))
{}
sstables::sstable::version_types sstables_manager::get_highest_supported_format() const noexcept {
if (_features.ms_sstable) {
return sstable_version_types::ms;

View File

@@ -67,6 +67,11 @@ class storage_manager : public peering_sharded_service<storage_manager> {
config_updater_sync(const db::config& cfg, storage_manager&);
};
struct connections_updater_sync {
utils::observer<unsigned> observer;
connections_updater_sync(const db::config& cfg, storage_manager&);
};
struct object_storage_endpoint {
db::object_storage_endpoint_param cfg;
shared_ptr<object_storage_client> client;
@@ -77,6 +82,7 @@ class storage_manager : public peering_sharded_service<storage_manager> {
unsigned _connections_per_shard;
std::unordered_map<sstring, object_storage_endpoint> _object_storage_endpoints;
std::unique_ptr<config_updater_sync> _config_updater;
std::unique_ptr<connections_updater_sync> _connections_updater;
seastar::metrics::metric_groups metrics;
object_storage_endpoint& get_endpoint(const sstring& ep);

View File

@@ -150,6 +150,20 @@ void client::update_config_sync(std::string region, std::string ira) {
});
}
void client::update_connections_per_shard(unsigned connections_per_shard) {
if (_config_update_gate.is_closed()) {
s3l.info("config update gate is closed");
return;
}
auto holder = _config_update_gate.hold();
(void)with_semaphore(_rebalance_sem, 1, [this, connections_per_shard] {
_cfg->connections_per_shard = connections_per_shard;
return rebalance_connections();
}).handle_exception([holder = std::move(holder)](auto ex) {
s3l.warn("Failed to rebalance connections after config update: {}", ex);
});
}
shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, semaphore& mem, global_factory gf) {
return seastar::make_shared<client>(std::move(endpoint), std::move(cfg), mem, std::move(gf), private_tag{});
}

View File

@@ -230,6 +230,7 @@ public:
seastar::abort_source* = nullptr);
void update_config_sync(std::string reg, std::string ira);
void update_connections_per_shard(unsigned connections_per_shard);
struct handle {
std::string _host;