audit: split startup into construction and storage phases

The table-based audit backend needs Raft to create its keyspace,
but the audit service must exist earlier so that CQL paths don't
silently skip auditing.

Split startup into two phases: construction and storage
initialization.  Queries arriving between the two phases are
logged as errors.

This is a refactoring commit and the split sections will be
moved later in this patch series.

Refs SCYLLADB-1615
This commit is contained in:
Andrzej Jackowski
2026-04-16 17:24:48 +02:00
parent 1616c71bf0
commit bc67dd0b82
4 changed files with 48 additions and 19 deletions

View File

@@ -194,17 +194,30 @@ future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
std::cref(cfg));
}
future<> audit::start_storage(const db::config& cfg) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] {
local_audit._storage_running = true;
});
});
}
future<> audit::stop_storage() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([] (audit& local_audit) {
local_audit._storage_running = false;
return local_audit._storage_helper_ptr->stop();
});
}
future<> audit::stop_audit() {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
@@ -223,14 +236,6 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k
return std::make_unique<audit_info>(cat, keyspace, table, batch);
}
future<> audit::start(const db::config& cfg) {
return _storage_helper_ptr->start(cfg);
}
future<> audit::stop() {
return _storage_helper_ptr->stop();
}
future<> audit::shutdown() {
return make_ready_future<>();
}
@@ -241,6 +246,12 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
@@ -286,6 +297,11 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (!_storage_running) {
on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}",
node_ip, client_ip, username, error ? "true" : "false"));
return make_ready_future<>();
}
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}",
node_ip, client_ip, username, error ? "true" : "false");

View File

@@ -141,6 +141,7 @@ private:
category_set _audited_categories;
std::unique_ptr<storage_helper> _storage_helper_ptr;
bool _storage_running = false;
const db::config& _cfg;
utils::observer<sstring> _cfg_keyspaces_observer;
@@ -163,6 +164,8 @@ public:
return audit_instance().local();
}
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> start_storage(const db::config& cfg);
static future<> stop_storage();
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false);
audit(locator::shared_token_metadata& stm,
@@ -174,8 +177,6 @@ public:
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown();
bool should_log(const audit_info& audit_info) const;
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;

12
main.cc
View File

@@ -2357,8 +2357,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors");
db.local().check_rack_list_everywhere(cfg->enforce_rack_list());
// Start audit service after join_cluster so that the table-based audit backend
// can properly create its keyspace and table.
checkpoint(stop_signal, "starting audit service");
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
@@ -2367,6 +2365,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
audit::audit::stop_audit().get();
});
// The table-based audit backend needs Raft (via join_cluster)
// to create its keyspace and table.
checkpoint(stop_signal, "starting audit storage");
audit::audit::start_storage(*cfg).handle_exception([&] (auto&& e) {
startlog.error("audit storage start failed: {}", e);
}).get();
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.

View File

@@ -371,9 +371,13 @@ int scylla_simple_query_main(int argc, char** argv) {
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
fmt::print("audit start failed: {}", e);
}).get();
audit::audit::start_storage(env.local_db().get_config()).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});
auto audit_storage_stop = defer([] {
audit::audit::stop_storage().get();
});
auto results = do_cql_test(env, cfg);
aggregated_perf_results agg(results);
std::cout << agg << std::endl;