From bc67dd0b822e6442dc5ba1f74ab473f481264ea0 Mon Sep 17 00:00:00 2001 From: Andrzej Jackowski Date: Thu, 16 Apr 2026 17:24:48 +0200 Subject: [PATCH] audit: split startup into construction and storage phases The table-based audit backend needs Raft to create its keyspace, but the audit service must exist earlier so that CQL paths don't silently skip auditing. Split startup into two phases: construction and storage initialization. Queries arriving between the two phases are logged as errors. This is a refactoring commit and the split sections will be moved later in this patch series. Refs SCYLLADB-1615 --- audit/audit.cc | 46 +++++++++++++++++++++++----------- audit/audit.hh | 5 ++-- main.cc | 12 +++++++-- test/perf/perf_simple_query.cc | 4 +++ 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/audit/audit.cc b/audit/audit.cc index 0c85f7a260..1fcc133e55 100644 --- a/audit/audit.cc +++ b/audit/audit.cc @@ -194,17 +194,30 @@ future<> audit::start_audit(const db::config& cfg, sharded(); - } - return audit_instance().invoke_on_all([&cfg] (audit& local_audit) { - return local_audit.start(cfg); + std::cref(cfg)); +} + +future<> audit::start_storage(const db::config& cfg) { + if (!audit_instance().local_is_initialized()) { + return make_ready_future<>(); + } + return audit_instance().invoke_on_all([&cfg] (audit& local_audit) { + return local_audit._storage_helper_ptr->start(cfg).then([&local_audit] { + local_audit._storage_running = true; }); }); } +future<> audit::stop_storage() { + if (!audit_instance().local_is_initialized()) { + return make_ready_future<>(); + } + return audit_instance().invoke_on_all([] (audit& local_audit) { + local_audit._storage_running = false; + return local_audit._storage_helper_ptr->stop(); + }); +} + future<> audit::stop_audit() { if (!audit_instance().local_is_initialized()) { return make_ready_future<>(); @@ -223,14 +236,6 @@ audit_info_ptr audit::create_audit_info(statement_category cat, const sstring& k return std::make_unique(cat, keyspace, table, batch); } -future<> audit::start(const db::config& cfg) { - return _storage_helper_ptr->start(cfg); -} - -future<> audit::stop() { - return _storage_helper_ptr->stop(); -} - future<> audit::shutdown() { return make_ready_future<>(); } @@ -241,6 +246,12 @@ future<> audit::log(const audit_info& audit_info, const service::client_state& c const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username; socket_address client_ip = client_state.get_client_address().addr(); socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); + if (!_storage_running) { + on_internal_error_noexcept(logger, fmt::format("Audit log dropped (storage not ready): node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}", + node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), + audit_info.query(), client_ip, audit_info.table(), username)); + return make_ready_future<>(); + } if (logger.is_enabled(logging::log_level::debug)) { logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}", node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), @@ -286,6 +297,11 @@ future<> inspect(const audit_info_alternator& ai, const service::client_state& c future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept { socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); + if (!_storage_running) { + on_internal_error_noexcept(logger, fmt::format("Audit login log dropped (storage not ready): node_ip {} client_ip {} username {} error {}", + node_ip, client_ip, username, error ? "true" : "false")); + return make_ready_future<>(); + } if (logger.is_enabled(logging::log_level::debug)) { logger.debug("Login log written: node_ip {}, client_ip {}, username {}, error {}", node_ip, client_ip, username, error ? "true" : "false"); diff --git a/audit/audit.hh b/audit/audit.hh index 2b482d2047..b5833c8189 100644 --- a/audit/audit.hh +++ b/audit/audit.hh @@ -141,6 +141,7 @@ private: category_set _audited_categories; std::unique_ptr _storage_helper_ptr; + bool _storage_running = false; const db::config& _cfg; utils::observer _cfg_keyspaces_observer; @@ -163,6 +164,8 @@ public: return audit_instance().local(); } static future<> start_audit(const db::config& cfg, sharded& stm, sharded& qp, sharded& mm); + static future<> start_storage(const db::config& cfg); + static future<> stop_storage(); static future<> stop_audit(); static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table, bool batch = false); audit(locator::shared_token_metadata& stm, @@ -174,8 +177,6 @@ public: category_set&& audited_categories, const db::config& cfg); ~audit(); - future<> start(const db::config& cfg); - future<> stop(); future<> shutdown(); bool should_log(const audit_info& audit_info) const; bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const; diff --git a/main.cc b/main.cc index 630231e23b..33cd206691 100644 --- a/main.cc +++ b/main.cc @@ -2357,8 +2357,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl startlog.info("Verifying that all of the tablet keyspaces use rack list replication factors"); db.local().check_rack_list_everywhere(cfg->enforce_rack_list()); - // Start audit service after join_cluster so that the table-based audit backend - // can properly create its keyspace and table. checkpoint(stop_signal, "starting audit service"); audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) { startlog.error("audit start failed: {}", e); @@ -2367,6 +2365,16 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl audit::audit::stop_audit().get(); }); + // The table-based audit backend needs Raft (via join_cluster) + // to create its keyspace and table. + checkpoint(stop_signal, "starting audit storage"); + audit::audit::start_storage(*cfg).handle_exception([&] (auto&& e) { + startlog.error("audit storage start failed: {}", e); + }).get(); + auto audit_storage_stop = defer([] { + audit::audit::stop_storage().get(); + }); + // Semantic validation of sstable compression parameters from config. // Adding here (i.e., after `join_cluster`) to ensure that the // required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated. diff --git a/test/perf/perf_simple_query.cc b/test/perf/perf_simple_query.cc index 9f47bef948..0ff6575a39 100644 --- a/test/perf/perf_simple_query.cc +++ b/test/perf/perf_simple_query.cc @@ -371,9 +371,13 @@ int scylla_simple_query_main(int argc, char** argv) { audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) { fmt::print("audit start failed: {}", e); }).get(); + audit::audit::start_storage(env.local_db().get_config()).get(); auto audit_stop = defer([] { audit::audit::stop_audit().get(); }); + auto audit_storage_stop = defer([] { + audit::audit::stop_storage().get(); + }); auto results = do_cql_test(env, cfg); aggregated_perf_results agg(results); std::cout << agg << std::endl;