From 71714fdc0ea01e861832ffa7ae3f5fc3a3da9f8b Mon Sep 17 00:00:00 2001 From: Dimitrios Symonidis Date: Thu, 5 Mar 2026 21:27:30 +0100 Subject: [PATCH 1/4] db: introduce read-write lock to synchronize config updates with REST API Config is reloaded from SIGHUP on shard 0 and broadcast to all shards under a write lock. REST API callers reading find_config_id acquire a read lock via value_as_json_string_for_name() and are guaranteed a consistent snapshot even when a reload is in progress. --- api/config.cc | 17 +++++++++-------- db/config.cc | 19 +++++++++++++++++++ db/config.hh | 18 ++++++++++++++++++ main.cc | 1 + 4 files changed, 47 insertions(+), 8 deletions(-) diff --git a/api/config.cc b/api/config.cc index f855d54d68..eb93df0d62 100644 --- a/api/config.cc +++ b/api/config.cc @@ -82,15 +82,16 @@ void set_config(std::shared_ptr < api_registry_builder20 > rb, http_context& ctx }); }); - cs::find_config_id.set(r, [&cfg] (const_req r) { - auto id = r.get_path_param("id"); - for (auto&& cfg_ref : cfg.values()) { - auto&& cfg = cfg_ref.get(); - if (id == cfg.name()) { - return cfg.value_as_json(); - } + cs::find_config_id.set(r, [&cfg] (std::unique_ptr req) -> future { + auto id = req->get_path_param("id"); + auto value = co_await cfg.value_as_json_string_for_name(id); + if (!value) { + throw bad_param_exception(sstring("No such config entry: ") + id); } - throw bad_param_exception(sstring("No such config entry: ") + id); + //value is already a json string + json::json_return_type ret{json::json_void()}; + ret._res = std::move(*value); + co_return ret; }); sp::get_rpc_timeout.set(r, [&cfg](const_req req) { diff --git a/db/config.cc b/db/config.cc index 3ab564b3d4..b24d62b65b 100644 --- a/db/config.cc +++ b/db/config.cc @@ -7,6 +7,7 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 */ +#include #include #include @@ -19,6 +20,7 @@ #include #include +#include #include #include #include @@ -1725,6 +1727,23 @@ void db::config::maybe_in_workdir(named_value& tos, const char* sub } } +future> db::config::value_as_json_string_for_name(sstring name) const { + // Config reloads triggered by SIGHUP are applied on shard 0 and then + // broadcast to all other shards. We read the value on shard 0 under + // a read lock to guarantee a consistent snapshot — the SIGHUP handler + // holds the write lock across the entire reload-and-broadcast sequence. + co_return co_await smp::submit_to(0, [this, name = std::move(name)] () -> future> { + auto lock = co_await _config_update_lock.hold_read_lock(); + for (auto&& cfg_ref : values()) { + auto&& c = cfg_ref.get(); + if (name == c.name()) { + co_return c.value_as_json()._res; + } + } + co_return std::nullopt; + }); +} + const sstring db::config::default_tls_priority("SECURE128:-VERS-TLS1.0"); template <> diff --git a/db/config.hh b/db/config.hh index 24a572664c..52e9019028 100644 --- a/db/config.hh +++ b/db/config.hh @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -179,6 +180,16 @@ public: static fs::path get_conf_dir(); static fs::path get_conf_sub(fs::path); + future lock_for_config_update() { + return _config_update_lock.hold_write_lock(); + }; + + // Look up a config entry by name and return its JSON representation as a string. + // Runs on shard 0 under a read lock so the result is consistent with + // any in-progress SIGHUP reload + broadcast_to_all_shards() sequence. + // Returns std::nullopt if no config entry with the given name exists. + future> value_as_json_string_for_name(sstring name) const; + using string_map = std::unordered_map; //program_options::string_map; using string_list = std::vector; @@ -657,6 +668,13 @@ private: void maybe_in_workdir(named_value&, const char*); std::shared_ptr _extensions; + + // Read-write lock used to synchronize config updates (SIGHUP reload + + // broadcast to all shards) with config value readers. + // The SIGHUP handler holds the write lock across read_config() + + // broadcast_to_all_shards(). Readers acquire the read lock on shard 0 + // via value_as_json_string_for_name() so they always see a consistent snapshot. + mutable rwlock _config_update_lock; }; } diff --git a/main.cc b/main.cc index 44678c836a..95f5990b33 100644 --- a/main.cc +++ b/main.cc @@ -270,6 +270,7 @@ private: _pending = false; try { startlog.info("re-reading configuration file"); + auto lock = _cfg.lock_for_config_update().get(); read_config(_opts, _cfg).get(); _cfg.broadcast_to_all_shards().get(); startlog.info("completed re-reading configuration file"); From a958da0ab9051e7b2bd774007ab8ca899bb634b0 Mon Sep 17 00:00:00 2001 From: Dimitrios Symonidis Date: Thu, 5 Mar 2026 10:08:35 +0100 Subject: [PATCH 2/4] test_config: improve logging for wait_for_config API --- test/cluster/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/cluster/test_config.py b/test/cluster/test_config.py index 7223feed8a..771893cf8e 100644 --- a/test/cluster/test_config.py +++ b/test/cluster/test_config.py @@ -17,7 +17,7 @@ async def wait_for_config(manager, server, config_name, value): async def config_value_equal(): await read_barrier(manager.api, server.ip_addr) resp = await manager.api.get_config(server.ip_addr, config_name) - logging.info(f"Obtained config via REST api - config_name={config_name} value={value}") + logging.info(f"Obtained config via REST api - config_name={config_name} response={resp} expected value={value}") if resp == value: return True return None From 24a7b146faff165bfc1310f350716b6e29f9c97a Mon Sep 17 00:00:00 2001 From: Dimitrios Symonidis Date: Wed, 15 Apr 2026 11:58:46 +0200 Subject: [PATCH 3/4] sstables/utils/s3: split config update into sync and async parts Config observers run synchronously in a reactor turn and must not suspend. Split the previous monolithic async update_config() coroutine into two phases: Sync (runs in the observer, never suspends): - S3: atomically swap _cfg (lw_shared_ptr) and set a credentials refresh flag. - GCS: install a freshly constructed client; stash the old one for async cleanup. - storage_manager: update _object_storage_endpoints and fire the async cleanup via a gate-guarded background fiber. Async (gate-guarded background fiber): - S3: acquire _creds_sem, invalidate and rearm credentials only if the refresh flag is set. - GCS: drain and close stashed old clients. --- sstables/object_storage_client.cc | 37 +++++++++++++++--- sstables/object_storage_client.hh | 2 +- sstables/sstables_manager.cc | 65 +++++++++++++------------------ sstables/sstables_manager.hh | 8 ++-- utils/s3/client.cc | 30 +++++++++++--- utils/s3/client.hh | 4 +- 6 files changed, 90 insertions(+), 56 deletions(-) diff --git a/sstables/object_storage_client.cc b/sstables/object_storage_client.cc index c1b7605090..28e82efbd3 100644 --- a/sstables/object_storage_client.cc +++ b/sstables/object_storage_client.cc @@ -6,14 +6,18 @@ * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 */ +#include #include #include #include +#include #include #include #include +#include "utils/log.hh" + #include "db/object_storage_endpoint_param.hh" #include "object_storage_client.hh" @@ -30,6 +34,8 @@ using namespace seastar; using namespace sstables; using namespace utils; +static logging::logger osclog("object_storage_client"); + sstables::object_name::object_name(std::string_view bucket, std::string_view prefix, std::string_view type) : _name(fmt::format("/{}/{}/{}", bucket, prefix, type)) @@ -108,9 +114,9 @@ public: future<> upload_file(std::filesystem::path path, object_name name, utils::upload_progress& up, seastar::abort_source* as) override { return _client->upload_file(std::move(path), name.str(), up, as); } - future<> update_config(const db::object_storage_endpoint_param& ep) override { + void update_config_sync(const db::object_storage_endpoint_param& ep) override { auto& epc = ep.get_s3_storage(); - return _client->update_config(epc.region, epc.iam_role_arn); + _client->update_config_sync(epc.region, epc.iam_role_arn); } future<> close() override { return _client->close(); @@ -137,6 +143,7 @@ class gs_client_wrapper : public sstables::object_storage_client { shared_ptr _client; semaphore& _memory; std::function()> _shard_client; + seastar::gate _config_update_gate; public: gs_client_wrapper(const db::object_storage_endpoint_param& ep, semaphore& memory, shard_client_factory cf) : _client(make_gcs_client(ep, memory)) @@ -297,12 +304,30 @@ public: } } - future<> update_config(const db::object_storage_endpoint_param& ep) override { - auto client = std::exchange(_client, make_gcs_client(ep, _memory)); - co_await client->close(); + void update_config_sync(const db::object_storage_endpoint_param& ep) override { + if (_config_update_gate.is_closed()) { + osclog.info("config update gate is closed"); + return; + } + + osclog.info("Updating GCS client config"); + + auto holder = _config_update_gate.hold(); + auto new_client = make_gcs_client(ep, _memory); + auto old_client = std::exchange(_client, std::move(new_client)); + (void)old_client->close() + .handle_exception([](std::exception_ptr ex) { + osclog.error("Failed to close old GCS client during config update: {}", ex); + }) + .finally([old_client = std::move(old_client), h = std::move(holder)] { + osclog.info("Old GCS client cleanup done, use_count={}", old_client.use_count()); + }); } future<> close() override { - return _client->close(); + osclog.info("Closing GCS client..."); + co_await _config_update_gate.close(); + co_await _client->close(); + osclog.info("Closed GCS client"); } }; diff --git a/sstables/object_storage_client.hh b/sstables/object_storage_client.hh index f6f5885ca2..16ed1ba449 100644 --- a/sstables/object_storage_client.hh +++ b/sstables/object_storage_client.hh @@ -83,7 +83,7 @@ public: virtual future<> upload_file(std::filesystem::path path, object_name, utils::upload_progress& up, seastar::abort_source* = nullptr) = 0; - virtual future<> update_config(const db::object_storage_endpoint_param&) = 0; + virtual void update_config_sync(const db::object_storage_endpoint_param&) = 0; virtual future<> close() = 0; }; diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 1888608f05..05657d99a7 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "utils/log.hh" #include "sstables/sstables_manager.hh" #include "sstables/sstable_directory.hh" @@ -83,7 +84,7 @@ storage_manager::object_storage_endpoint::object_storage_endpoint(db::object_sto storage_manager::storage_manager(const db::config& cfg, config stm_cfg) : _object_storage_clients_memory(stm_cfg.object_storage_clients_memory) - , _config_updater(std::make_unique(cfg, *this)) + , _config_updater(std::make_unique(cfg, *this)) { for (auto& e : cfg.object_storage_endpoints()) { _object_storage_endpoints.emplace(std::make_pair(e.key(), e)); @@ -99,10 +100,6 @@ storage_manager::storage_manager(const db::config& cfg, config stm_cfg) } future<> storage_manager::stop() { - if (_config_updater) { - co_await _config_updater->action.join(); - } - for (auto ep : _object_storage_endpoints) { if (ep.second.client != nullptr) { co_await ep.second.client->close(); @@ -110,34 +107,6 @@ future<> storage_manager::stop() { } } -future<> storage_manager::update_config(const db::config& cfg) { - // Updates S3 client configurations if the endpoint is already known and - // removes the entries that are not present in the new configuration. - // Even though we remove obsolete S3 clients from this map, each IO - // holds a shared_ptr to the client, so the clients will be kept alive for - // as long as needed. - // This was split in two loops to guarantee the code is exception safe with - // regards to _s3_endpoints content. - std::unordered_set updates; - for (auto& e : cfg.object_storage_endpoints()) { - auto endpoint = e.key(); - updates.insert(endpoint); - - auto [it, added] = _object_storage_endpoints.try_emplace(endpoint, e); - if (!added) { - if (it->second.client != nullptr) { - co_await it->second.client->update_config(e); - } - it->second.cfg = e; - } - } - - std::erase_if(_object_storage_endpoints, [&updates](const auto& e) { - return !updates.contains(e.first); - }); - - co_return; -} auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& { auto found = _object_storage_endpoints.find(endpoint); @@ -172,11 +141,31 @@ std::vector storage_manager::endpoints(sstring type) const noexcept { }) | std::views::keys | std::ranges::to(); } -storage_manager::config_updater::config_updater(const db::config& cfg, storage_manager& sstm) - : action([&sstm, &cfg] () mutable { - return sstm.update_config(cfg); - }) - , observer(cfg.object_storage_endpoints.observe(action.make_observer())) +storage_manager::config_updater_sync::config_updater_sync(const db::config& cfg, storage_manager& sstm) + : observer(cfg.object_storage_endpoints.observe([&sstm, &cfg] (const std::vector&) { + // Sync part: runs atomically in the current reactor turn + // Update _object_storage_endpoints so that any subsequent call to + // get_endpoint_client() immediately sees the new configuration. + // Each client's update_config_sync() spawns its own async background + // work (credential refresh for S3, old client close for GCS) into a + // client-local gated fiber. + std::unordered_set updates; + for (auto& e : cfg.object_storage_endpoints()) { + auto endpoint = e.key(); + smlogger.info("config_updater: endpoint={}, config={}", endpoint, e); + updates.insert(endpoint); + auto [it, added] = sstm._object_storage_endpoints.try_emplace(endpoint, e); + if (!added) { + if (it->second.client) { + it->second.client->update_config_sync(e); + } + it->second.cfg = e; + } + } + std::erase_if(sstm._object_storage_endpoints, [&updates](const auto& e) { + return !updates.contains(e.first); + }); + })) {} sstables::sstable::version_types sstables_manager::get_highest_supported_format() const noexcept { diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index a56b64dfab..be0fde9459 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -62,10 +62,9 @@ struct sstable_snapshot_metadata { }; class storage_manager : public peering_sharded_service { - struct config_updater { - serialized_action action; + struct config_updater_sync { utils::observer> observer; - config_updater(const db::config& cfg, storage_manager&); + config_updater_sync(const db::config& cfg, storage_manager&); }; struct object_storage_endpoint { @@ -76,10 +75,9 @@ class storage_manager : public peering_sharded_service { semaphore _object_storage_clients_memory; std::unordered_map _object_storage_endpoints; - std::unique_ptr _config_updater; + std::unique_ptr _config_updater; seastar::metrics::metric_groups metrics; - future<> update_config(const db::config&); object_storage_endpoint& get_endpoint(const sstring& ep); public: diff --git a/utils/s3/client.cc b/utils/s3/client.cc index 3b09078783..d33e4cba14 100644 --- a/utils/s3/client.cc +++ b/utils/s3/client.cc @@ -120,7 +120,13 @@ client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global } } -future<> client::update_config(std::string region, std::string ira) { +void client::update_config_sync(std::string region, std::string ira) { + if (_config_update_gate.is_closed()) { + s3l.info("config update gate is closed"); + return; + } + auto holder = _config_update_gate.hold(); + endpoint_config new_cfg = { .port = _cfg->port, .use_https = _cfg->use_https, @@ -128,10 +134,19 @@ future<> client::update_config(std::string region, std::string ira) { .role_arn = std::move(ira), }; _cfg = make_lw_shared(std::move(new_cfg)); - auto units = co_await get_units(_creds_sem, 1); - _creds_provider_chain.invalidate_credentials(); - _credentials = {}; - _creds_update_timer.rearm(lowres_clock::now()); + + (void)get_units(_creds_sem, 1).then_wrapped([this, h = std::move(holder)](future> f) { + try { + s3l.info("Invalidating credentials"); + + auto units = f.get(); + _creds_provider_chain.invalidate_credentials(); + _credentials = {}; + _creds_update_timer.rearm(lowres_clock::now()); + } catch (...) { + s3l.error("Failed to refresh credentials during config update: {}", std::current_exception()); + } + }); } shared_ptr client::make(std::string endpoint, endpoint_config_ptr cfg, semaphore& mem, global_factory gf) { @@ -1800,6 +1815,9 @@ file client::make_readable_file(sstring object_name, seastar::abort_source* as) } future<> client::close() { + s3l.info("Closing S3 client..."); + + co_await _config_update_gate.close(); { auto units = co_await get_units(_creds_sem, 1); _creds_invalidation_timer.cancel(); @@ -1808,6 +1826,8 @@ future<> client::close() { co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> { co_await it.second.http.close(); }); + + s3l.info("Closed S3 client"); } client::bucket_lister::bucket_lister(shared_ptr client, sstring bucket, sstring prefix, size_t objects_per_page, size_t entries_batch) diff --git a/utils/s3/client.hh b/utils/s3/client.hh index 87e19f316d..32ab551b50 100644 --- a/utils/s3/client.hh +++ b/utils/s3/client.hh @@ -9,6 +9,7 @@ #pragma once #include +#include #include #include #include @@ -116,6 +117,7 @@ class client : public enable_shared_from_this { timer _creds_update_timer; aws_credentials _credentials; aws::aws_credentials_provider_chain _creds_provider_chain; + seastar::gate _config_update_gate; struct io_stats { uint64_t ops = 0; @@ -220,7 +222,7 @@ public: upload_progress& up, seastar::abort_source* = nullptr); - future<> update_config(std::string reg, std::string ira); + void update_config_sync(std::string reg, std::string ira); struct handle { std::string _host; From ca003680a72e41cc0ace407e1ec154e83c0ed5bf Mon Sep 17 00:00:00 2001 From: Dimitrios Symonidis Date: Wed, 15 Apr 2026 14:28:39 +0200 Subject: [PATCH 4/4] test/object_store: verify object storage client creation and live reconfiguration --- test/cluster/object_store/test_basic.py | 51 ++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/test/cluster/object_store/test_basic.py b/test/cluster/object_store/test_basic.py index de87fc841b..2ebad23309 100644 --- a/test/cluster/object_store/test_basic.py +++ b/test/cluster/object_store/test_basic.py @@ -17,6 +17,7 @@ from test.cqlpy.rest_api import scylla_inject_error from test.cluster.test_config import wait_for_config from test.cluster.util import new_test_keyspace from test.pylib.tablets import get_all_tablet_replicas +from test.pylib.skip_types import skip_bug logger = logging.getLogger(__name__) @@ -292,11 +293,10 @@ async def test_get_object_store_endpoints(manager: ManagerClient, config_with_fu @pytest.mark.asyncio async def test_create_keyspace_after_config_update(manager: ManagerClient, object_storage): + print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()') server = await manager.server_add() cql = manager.get_cql() - - print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()') - endpoint = 'http://a:456' + endpoint = object_storage.address replication_opts = format_tuples({'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}) storage_opts = format_tuples(type=f'{object_storage.type}', @@ -308,11 +308,50 @@ async def test_create_keyspace_after_config_update(manager: ManagerClient, objec f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};')) print('Update config with a new endpoint and SIGHUP Scylla to reload configuration') - new_endpoint = MinioServer.create_conf(endpoint, 'region') - await manager.server_update_config(server.server_id, 'object_storage_endpoints', new_endpoint) - await wait_for_config(manager, server, 'object_storage_endpoints', {endpoint: '{ "type": "s3", "aws_region": "region", "iam_role_arn": "" }'}) + objconf = object_storage.create_endpoint_conf() + await manager.server_update_config(server.server_id, 'object_storage_endpoints', objconf) + ep = objconf[0] + if ep['type'] == 's3': + expected_conf = f'{{ "type": "s3", "aws_region": "{ep["aws_region"]}", "iam_role_arn": "{ep["iam_role_arn"]}" }}' + else: + expected_conf = f'{{ "type": "gs", "credentials_file": "{ep["credentials_file"]}" }}' + await wait_for_config(manager, server, 'object_storage_endpoints', {ep['name']: expected_conf}) print('Passing a known endpoint will make the CREATE KEYSPACE stmt to succeed') cql.execute((f'CREATE KEYSPACE random_ks WITH' f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};')) + print('Create a table, insert data and flush the keyspace to force object_storage_client creation') + cql.execute(f'CREATE TABLE random_ks.test (name text PRIMARY KEY, value int);') + await cql.run_async(f"INSERT INTO random_ks.test (name, value) VALUES ('test_key', 123);") + await manager.api.flush_keyspace(server.ip_addr, 'random_ks') + res = cql.execute(f"SELECT value FROM random_ks.test WHERE name = 'test_key';") + assert res.one().value == 123, f'Unexpected value after flush: {res.one().value}' + + # Now that a live object_storage_client exists for this endpoint, push a + # config update that modifies the endpoint parameters. This exercises the + # update_config_sync path on an already-instantiated client + print('Push a config update to reconfigure the live object_storage_client') + updated_objconf = object_storage.create_endpoint_conf() + updated_ep = updated_objconf[0] + if updated_ep['type'] == 's3': + updated_ep['aws_region'] = 'updated-region' + updated_expected_conf = f'{{ "type": "s3", "aws_region": "{updated_ep["aws_region"]}", "iam_role_arn": "{updated_ep["iam_role_arn"]}" }}' + else: + updated_ep['credentials_file'] = '' + updated_expected_conf = f'{{ "type": "gs", "credentials_file": "{updated_ep["credentials_file"]}" }}' + skip_bug("https://scylladb.atlassian.net/browse/SCYLLADB-1559") + + await manager.server_update_config(server.server_id, 'object_storage_endpoints', updated_objconf) + await wait_for_config(manager, server, 'object_storage_endpoints', {updated_ep['name']: updated_expected_conf}) + + print('Verify the reconfigured client still works: insert more data and flush') + await cql.run_async(f"INSERT INTO random_ks.test (name, value) VALUES ('after_reconfig', 456);") + await manager.api.flush_keyspace(server.ip_addr, 'random_ks') + res = cql.execute(f"SELECT value FROM random_ks.test WHERE name = 'after_reconfig';") + assert res.one().value == 456, f'Unexpected value after reconfiguration flush: {res.one().value}' + + print('Verify all data is intact') + rows = {r.name: r.value for r in cql.execute(f'SELECT * FROM random_ks.test;')} + assert rows == {'test_key': 123, 'after_reconfig': 456}, f'Unexpected table content: {rows}' +