Merge 'sstables/storage_manager: fix race between object storage config update and keyspace creation' from Dimitrios Symonidis

Previously, config_updater used a serialized_action to trigger update_config() when object_storage_endpoints changed. Because serialized_action::trigger() always schedules the action as a new reactor task (via semaphore::wait().then()), there was a window between the config value becoming visible to the REST API and update_config() actually running. This allowed a concurrent CREATE KEYSPACE to see the new endpoint via is_known_endpoint() before storage_manager had registered it in _object_storage_endpoints.

Now config observers run synchronously in a reactor turn and must not suspend. Split the previous monolithic async update_config() coroutine  into two phases:

- Sync (in the observer, never suspends): storage_manager::_object_storage_endpoints is updated in place; for already-instantiated clients, update_config_sync swaps the new config atomically
- Async (per-client gate): background fibers finish the work that can't run in the observer — S3 refreshes credentials under _creds_sem; GCS drains and closes the replaced client.

Config reloads triggered by SIGHUP are applied on shard 0 and then broadcast to all other shards. An rwlock has been also introduced to make sure that the configuration has been propagated to all cores. This guarantees that a client requesting a config via the REST API will see a consistent snapshot

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-757
Fixes: [28141](https://github.com/scylladb/scylladb/issues/28141)

Closes scylladb/scylladb#28950

* github.com:scylladb/scylladb:
  test/object_store: verify object storage client creation and live reconfiguration
  sstables/utils/s3: split config update into sync and async parts
  test_config: improve logging for wait_for_config API
  db: introduce read-write lock to synchronize config updates with REST API
This commit is contained in:
Botond Dénes
2026-04-16 10:20:43 +03:00
12 changed files with 183 additions and 71 deletions

View File

@@ -82,15 +82,16 @@ void set_config(std::shared_ptr < api_registry_builder20 > rb, http_context& ctx
});
});
cs::find_config_id.set(r, [&cfg] (const_req r) {
auto id = r.get_path_param("id");
for (auto&& cfg_ref : cfg.values()) {
auto&& cfg = cfg_ref.get();
if (id == cfg.name()) {
return cfg.value_as_json();
}
cs::find_config_id.set(r, [&cfg] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto id = req->get_path_param("id");
auto value = co_await cfg.value_as_json_string_for_name(id);
if (!value) {
throw bad_param_exception(sstring("No such config entry: ") + id);
}
throw bad_param_exception(sstring("No such config entry: ") + id);
//value is already a json string
json::json_return_type ret{json::json_void()};
ret._res = std::move(*value);
co_return ret;
});
sp::get_rpc_timeout.set(r, [&cfg](const_req req) {

View File

@@ -7,6 +7,7 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <optional>
#include <unordered_map>
#include <sstream>
@@ -19,6 +20,7 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/format.hh>
#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
#include <seastar/json/json_elements.hh>
#include <seastar/util/log.hh>
@@ -1752,6 +1754,23 @@ void db::config::maybe_in_workdir(named_value<string_list>& tos, const char* sub
}
}
future<std::optional<sstring>> db::config::value_as_json_string_for_name(sstring name) const {
// Config reloads triggered by SIGHUP are applied on shard 0 and then
// broadcast to all other shards. We read the value on shard 0 under
// a read lock to guarantee a consistent snapshot — the SIGHUP handler
// holds the write lock across the entire reload-and-broadcast sequence.
co_return co_await smp::submit_to(0, [this, name = std::move(name)] () -> future<std::optional<sstring>> {
auto lock = co_await _config_update_lock.hold_read_lock();
for (auto&& cfg_ref : values()) {
auto&& c = cfg_ref.get();
if (name == c.name()) {
co_return c.value_as_json()._res;
}
}
co_return std::nullopt;
});
}
const sstring db::config::default_tls_priority("SECURE128:-VERS-TLS1.0");
template <>

View File

@@ -12,6 +12,7 @@
#include <unordered_map>
#include <seastar/core/sstring.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/util/program-options.hh>
#include <seastar/util/log.hh>
@@ -179,6 +180,16 @@ public:
static fs::path get_conf_dir();
static fs::path get_conf_sub(fs::path);
future<rwlock::holder> lock_for_config_update() {
return _config_update_lock.hold_write_lock();
};
// Look up a config entry by name and return its JSON representation as a string.
// Runs on shard 0 under a read lock so the result is consistent with
// any in-progress SIGHUP reload + broadcast_to_all_shards() sequence.
// Returns std::nullopt if no config entry with the given name exists.
future<std::optional<sstring>> value_as_json_string_for_name(sstring name) const;
using string_map = std::unordered_map<sstring, sstring>;
//program_options::string_map;
using string_list = std::vector<sstring>;
@@ -656,6 +667,13 @@ private:
void maybe_in_workdir(named_value<string_list>&, const char*);
std::shared_ptr<db::extensions> _extensions;
// Read-write lock used to synchronize config updates (SIGHUP reload +
// broadcast to all shards) with config value readers.
// The SIGHUP handler holds the write lock across read_config() +
// broadcast_to_all_shards(). Readers acquire the read lock on shard 0
// via value_as_json_string_for_name() so they always see a consistent snapshot.
mutable rwlock _config_update_lock;
};
}

View File

@@ -270,6 +270,7 @@ private:
_pending = false;
try {
startlog.info("re-reading configuration file");
auto lock = _cfg.lock_for_config_update().get();
read_config(_opts, _cfg).get();
_cfg.broadcast_to_all_shards().get();
startlog.info("completed re-reading configuration file");

View File

@@ -6,14 +6,18 @@
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include <exception>
#include <string>
#include <optional>
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/iostream.hh>
#include <fmt/core.h>
#include "utils/log.hh"
#include "db/object_storage_endpoint_param.hh"
#include "object_storage_client.hh"
@@ -30,6 +34,8 @@ using namespace seastar;
using namespace sstables;
using namespace utils;
static logging::logger osclog("object_storage_client");
sstables::object_name::object_name(std::string_view bucket, std::string_view prefix, std::string_view type)
: _name(fmt::format("/{}/{}/{}", bucket, prefix, type))
@@ -108,9 +114,9 @@ public:
future<> upload_file(std::filesystem::path path, object_name name, utils::upload_progress& up, seastar::abort_source* as) override {
return _client->upload_file(std::move(path), name.str(), up, as);
}
future<> update_config(const db::object_storage_endpoint_param& ep) override {
void update_config_sync(const db::object_storage_endpoint_param& ep) override {
auto& epc = ep.get_s3_storage();
return _client->update_config(epc.region, epc.iam_role_arn);
_client->update_config_sync(epc.region, epc.iam_role_arn);
}
future<> close() override {
return _client->close();
@@ -137,6 +143,7 @@ class gs_client_wrapper : public sstables::object_storage_client {
shared_ptr<gcp::storage::client> _client;
semaphore& _memory;
std::function<shared_ptr<gcp::storage::client>()> _shard_client;
seastar::gate _config_update_gate;
public:
gs_client_wrapper(const db::object_storage_endpoint_param& ep, semaphore& memory, shard_client_factory cf)
: _client(make_gcs_client(ep, memory))
@@ -297,12 +304,30 @@ public:
}
}
future<> update_config(const db::object_storage_endpoint_param& ep) override {
auto client = std::exchange(_client, make_gcs_client(ep, _memory));
co_await client->close();
void update_config_sync(const db::object_storage_endpoint_param& ep) override {
if (_config_update_gate.is_closed()) {
osclog.info("config update gate is closed");
return;
}
osclog.info("Updating GCS client config");
auto holder = _config_update_gate.hold();
auto new_client = make_gcs_client(ep, _memory);
auto old_client = std::exchange(_client, std::move(new_client));
(void)old_client->close()
.handle_exception([](std::exception_ptr ex) {
osclog.error("Failed to close old GCS client during config update: {}", ex);
})
.finally([old_client = std::move(old_client), h = std::move(holder)] {
osclog.info("Old GCS client cleanup done, use_count={}", old_client.use_count());
});
}
future<> close() override {
return _client->close();
osclog.info("Closing GCS client...");
co_await _config_update_gate.close();
co_await _client->close();
osclog.info("Closed GCS client");
}
};

View File

@@ -83,7 +83,7 @@ public:
virtual future<> upload_file(std::filesystem::path path, object_name, utils::upload_progress& up, seastar::abort_source* = nullptr) = 0;
virtual future<> update_config(const db::object_storage_endpoint_param&) = 0;
virtual void update_config_sync(const db::object_storage_endpoint_param&) = 0;
virtual future<> close() = 0;
};

View File

@@ -9,6 +9,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/coroutine/switch_to.hh>
#include <unordered_map>
#include <unordered_set>
#include "utils/log.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_directory.hh"
@@ -83,7 +84,7 @@ storage_manager::object_storage_endpoint::object_storage_endpoint(db::object_sto
storage_manager::storage_manager(const db::config& cfg, config stm_cfg)
: _object_storage_clients_memory(stm_cfg.object_storage_clients_memory)
, _config_updater(std::make_unique<config_updater>(cfg, *this))
, _config_updater(std::make_unique<config_updater_sync>(cfg, *this))
{
for (auto& e : cfg.object_storage_endpoints()) {
_object_storage_endpoints.emplace(std::make_pair(e.key(), e));
@@ -99,10 +100,6 @@ storage_manager::storage_manager(const db::config& cfg, config stm_cfg)
}
future<> storage_manager::stop() {
if (_config_updater) {
co_await _config_updater->action.join();
}
for (auto ep : _object_storage_endpoints) {
if (ep.second.client != nullptr) {
co_await ep.second.client->close();
@@ -110,34 +107,6 @@ future<> storage_manager::stop() {
}
}
future<> storage_manager::update_config(const db::config& cfg) {
// Updates S3 client configurations if the endpoint is already known and
// removes the entries that are not present in the new configuration.
// Even though we remove obsolete S3 clients from this map, each IO
// holds a shared_ptr to the client, so the clients will be kept alive for
// as long as needed.
// This was split in two loops to guarantee the code is exception safe with
// regards to _s3_endpoints content.
std::unordered_set<sstring> updates;
for (auto& e : cfg.object_storage_endpoints()) {
auto endpoint = e.key();
updates.insert(endpoint);
auto [it, added] = _object_storage_endpoints.try_emplace(endpoint, e);
if (!added) {
if (it->second.client != nullptr) {
co_await it->second.client->update_config(e);
}
it->second.cfg = e;
}
}
std::erase_if(_object_storage_endpoints, [&updates](const auto& e) {
return !updates.contains(e.first);
});
co_return;
}
auto storage_manager::get_endpoint(const sstring& endpoint) -> object_storage_endpoint& {
auto found = _object_storage_endpoints.find(endpoint);
@@ -172,11 +141,31 @@ std::vector<sstring> storage_manager::endpoints(sstring type) const noexcept {
}) | std::views::keys | std::ranges::to<std::vector>();
}
storage_manager::config_updater::config_updater(const db::config& cfg, storage_manager& sstm)
: action([&sstm, &cfg] () mutable {
return sstm.update_config(cfg);
})
, observer(cfg.object_storage_endpoints.observe(action.make_observer()))
storage_manager::config_updater_sync::config_updater_sync(const db::config& cfg, storage_manager& sstm)
: observer(cfg.object_storage_endpoints.observe([&sstm, &cfg] (const std::vector<db::object_storage_endpoint_param>&) {
// Sync part: runs atomically in the current reactor turn
// Update _object_storage_endpoints so that any subsequent call to
// get_endpoint_client() immediately sees the new configuration.
// Each client's update_config_sync() spawns its own async background
// work (credential refresh for S3, old client close for GCS) into a
// client-local gated fiber.
std::unordered_set<sstring> updates;
for (auto& e : cfg.object_storage_endpoints()) {
auto endpoint = e.key();
smlogger.info("config_updater: endpoint={}, config={}", endpoint, e);
updates.insert(endpoint);
auto [it, added] = sstm._object_storage_endpoints.try_emplace(endpoint, e);
if (!added) {
if (it->second.client) {
it->second.client->update_config_sync(e);
}
it->second.cfg = e;
}
}
std::erase_if(sstm._object_storage_endpoints, [&updates](const auto& e) {
return !updates.contains(e.first);
});
}))
{}
sstables::sstable::version_types sstables_manager::get_highest_supported_format() const noexcept {

View File

@@ -62,10 +62,9 @@ struct sstable_snapshot_metadata {
};
class storage_manager : public peering_sharded_service<storage_manager> {
struct config_updater {
serialized_action action;
struct config_updater_sync {
utils::observer<std::vector<db::object_storage_endpoint_param>> observer;
config_updater(const db::config& cfg, storage_manager&);
config_updater_sync(const db::config& cfg, storage_manager&);
};
struct object_storage_endpoint {
@@ -76,10 +75,9 @@ class storage_manager : public peering_sharded_service<storage_manager> {
semaphore _object_storage_clients_memory;
std::unordered_map<sstring, object_storage_endpoint> _object_storage_endpoints;
std::unique_ptr<config_updater> _config_updater;
std::unique_ptr<config_updater_sync> _config_updater;
seastar::metrics::metric_groups metrics;
future<> update_config(const db::config&);
object_storage_endpoint& get_endpoint(const sstring& ep);
public:

View File

@@ -17,6 +17,7 @@ from test.cqlpy.rest_api import scylla_inject_error
from test.cluster.test_config import wait_for_config
from test.cluster.util import new_test_keyspace
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.skip_types import skip_bug
logger = logging.getLogger(__name__)
@@ -292,11 +293,10 @@ async def test_get_object_store_endpoints(manager: ManagerClient, config_with_fu
@pytest.mark.asyncio
async def test_create_keyspace_after_config_update(manager: ManagerClient, object_storage):
print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()')
server = await manager.server_add()
cql = manager.get_cql()
print('Trying to create a keyspace with an endpoint not configured in object_storage_endpoints should trip storage_manager::is_known_endpoint()')
endpoint = 'http://a:456'
endpoint = object_storage.address
replication_opts = format_tuples({'class': 'NetworkTopologyStrategy',
'replication_factor': '1'})
storage_opts = format_tuples(type=f'{object_storage.type}',
@@ -308,11 +308,50 @@ async def test_create_keyspace_after_config_update(manager: ManagerClient, objec
f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))
print('Update config with a new endpoint and SIGHUP Scylla to reload configuration')
new_endpoint = MinioServer.create_conf(endpoint, 'region')
await manager.server_update_config(server.server_id, 'object_storage_endpoints', new_endpoint)
await wait_for_config(manager, server, 'object_storage_endpoints', {endpoint: '{ "type": "s3", "aws_region": "region", "iam_role_arn": "" }'})
objconf = object_storage.create_endpoint_conf()
await manager.server_update_config(server.server_id, 'object_storage_endpoints', objconf)
ep = objconf[0]
if ep['type'] == 's3':
expected_conf = f'{{ "type": "s3", "aws_region": "{ep["aws_region"]}", "iam_role_arn": "{ep["iam_role_arn"]}" }}'
else:
expected_conf = f'{{ "type": "gs", "credentials_file": "{ep["credentials_file"]}" }}'
await wait_for_config(manager, server, 'object_storage_endpoints', {ep['name']: expected_conf})
print('Passing a known endpoint will make the CREATE KEYSPACE stmt to succeed')
cql.execute((f'CREATE KEYSPACE random_ks WITH'
f' REPLICATION = {replication_opts} AND STORAGE = {storage_opts};'))
print('Create a table, insert data and flush the keyspace to force object_storage_client creation')
cql.execute(f'CREATE TABLE random_ks.test (name text PRIMARY KEY, value int);')
await cql.run_async(f"INSERT INTO random_ks.test (name, value) VALUES ('test_key', 123);")
await manager.api.flush_keyspace(server.ip_addr, 'random_ks')
res = cql.execute(f"SELECT value FROM random_ks.test WHERE name = 'test_key';")
assert res.one().value == 123, f'Unexpected value after flush: {res.one().value}'
# Now that a live object_storage_client exists for this endpoint, push a
# config update that modifies the endpoint parameters. This exercises the
# update_config_sync path on an already-instantiated client
print('Push a config update to reconfigure the live object_storage_client')
updated_objconf = object_storage.create_endpoint_conf()
updated_ep = updated_objconf[0]
if updated_ep['type'] == 's3':
updated_ep['aws_region'] = 'updated-region'
updated_expected_conf = f'{{ "type": "s3", "aws_region": "{updated_ep["aws_region"]}", "iam_role_arn": "{updated_ep["iam_role_arn"]}" }}'
else:
updated_ep['credentials_file'] = ''
updated_expected_conf = f'{{ "type": "gs", "credentials_file": "{updated_ep["credentials_file"]}" }}'
skip_bug("https://scylladb.atlassian.net/browse/SCYLLADB-1559")
await manager.server_update_config(server.server_id, 'object_storage_endpoints', updated_objconf)
await wait_for_config(manager, server, 'object_storage_endpoints', {updated_ep['name']: updated_expected_conf})
print('Verify the reconfigured client still works: insert more data and flush')
await cql.run_async(f"INSERT INTO random_ks.test (name, value) VALUES ('after_reconfig', 456);")
await manager.api.flush_keyspace(server.ip_addr, 'random_ks')
res = cql.execute(f"SELECT value FROM random_ks.test WHERE name = 'after_reconfig';")
assert res.one().value == 456, f'Unexpected value after reconfiguration flush: {res.one().value}'
print('Verify all data is intact')
rows = {r.name: r.value for r in cql.execute(f'SELECT * FROM random_ks.test;')}
assert rows == {'test_key': 123, 'after_reconfig': 456}, f'Unexpected table content: {rows}'

View File

@@ -17,7 +17,7 @@ async def wait_for_config(manager, server, config_name, value):
async def config_value_equal():
await read_barrier(manager.api, server.ip_addr)
resp = await manager.api.get_config(server.ip_addr, config_name)
logging.info(f"Obtained config via REST api - config_name={config_name} value={value}")
logging.info(f"Obtained config via REST api - config_name={config_name} response={resp} expected value={value}")
if resp == value:
return True
return None

View File

@@ -120,7 +120,13 @@ client::client(std::string host, endpoint_config_ptr cfg, semaphore& mem, global
}
}
future<> client::update_config(std::string region, std::string ira) {
void client::update_config_sync(std::string region, std::string ira) {
if (_config_update_gate.is_closed()) {
s3l.info("config update gate is closed");
return;
}
auto holder = _config_update_gate.hold();
endpoint_config new_cfg = {
.port = _cfg->port,
.use_https = _cfg->use_https,
@@ -128,10 +134,19 @@ future<> client::update_config(std::string region, std::string ira) {
.role_arn = std::move(ira),
};
_cfg = make_lw_shared<endpoint_config>(std::move(new_cfg));
auto units = co_await get_units(_creds_sem, 1);
_creds_provider_chain.invalidate_credentials();
_credentials = {};
_creds_update_timer.rearm(lowres_clock::now());
(void)get_units(_creds_sem, 1).then_wrapped([this, h = std::move(holder)](future<semaphore_units<>> f) {
try {
s3l.info("Invalidating credentials");
auto units = f.get();
_creds_provider_chain.invalidate_credentials();
_credentials = {};
_creds_update_timer.rearm(lowres_clock::now());
} catch (...) {
s3l.error("Failed to refresh credentials during config update: {}", std::current_exception());
}
});
}
shared_ptr<client> client::make(std::string endpoint, endpoint_config_ptr cfg, semaphore& mem, global_factory gf) {
@@ -1800,6 +1815,9 @@ file client::make_readable_file(sstring object_name, seastar::abort_source* as)
}
future<> client::close() {
s3l.info("Closing S3 client...");
co_await _config_update_gate.close();
{
auto units = co_await get_units(_creds_sem, 1);
_creds_invalidation_timer.cancel();
@@ -1808,6 +1826,8 @@ future<> client::close() {
co_await coroutine::parallel_for_each(_https, [] (auto& it) -> future<> {
co_await it.second.http.close();
});
s3l.info("Closed S3 client");
}
client::bucket_lister::bucket_lister(shared_ptr<client> client, sstring bucket, sstring prefix, size_t objects_per_page, size_t entries_batch)

View File

@@ -9,6 +9,7 @@
#pragma once
#include <seastar/core/file.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/metrics_registration.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
@@ -116,6 +117,7 @@ class client : public enable_shared_from_this<client> {
timer<seastar::lowres_clock> _creds_update_timer;
aws_credentials _credentials;
aws::aws_credentials_provider_chain _creds_provider_chain;
seastar::gate _config_update_gate;
struct io_stats {
uint64_t ops = 0;
@@ -220,7 +222,7 @@ public:
upload_progress& up,
seastar::abort_source* = nullptr);
future<> update_config(std::string reg, std::string ira);
void update_config_sync(std::string reg, std::string ira);
struct handle {
std::string _host;