diff --git a/audit/audit.cc b/audit/audit.cc index a1b3feb7a6..34d53074b8 100644 --- a/audit/audit.cc +++ b/audit/audit.cc @@ -228,27 +228,55 @@ future<> audit::shutdown() { return make_ready_future<>(); } -future<> audit::log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error) { - const service::client_state& client_state = query_state.get_client_state(); - socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); - db::consistency_level cl = options.get_consistency(); +future<> audit::log(const audit_info& audit_info, const service::client_state& client_state, std::optional cl, bool error) { thread_local static sstring no_username("undefined"); static const sstring anonymous_username("anonymous"); const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username; socket_address client_ip = client_state.get_client_address().addr(); + socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); if (logger.is_enabled(logging::log_level::debug)) { logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}", - node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(), - audit_info->query(), client_ip, audit_info->table(), username); + node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), + audit_info.query(), client_ip, audit_info.table(), username); } - return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, audit_info, node_ip, client_ip, cl, username, error) + return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, &audit_info, node_ip, client_ip, cl, username, error) .handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) { logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}", - node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(), - audit_info->query(), client_ip, audit_info->table(),username, ep); + node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(), + audit_info.query(), client_ip, audit_info.table(), username, ep); }); } +static future<> maybe_log(const audit_info& audit_info, const service::client_state& client_state, std::optional cl, bool error) { + if(audit::audit_instance().local_is_initialized() && audit::local_audit_instance().should_log(audit_info)) { + return audit::local_audit_instance().log(audit_info, client_state, cl, error); + } + return make_ready_future<>(); +} + +static future<> inspect(const audit_info& audit_info, const service::query_state& query_state, const cql3::query_options& options, bool error) { + return maybe_log(audit_info, query_state.get_client_state(), options.get_consistency(), error); +} + +future<> inspect(shared_ptr statement, const service::query_state& query_state, const cql3::query_options& options, bool error) { + const auto audit_info = statement->get_audit_info(); + if (audit_info == nullptr) { + return make_ready_future<>(); + } + if (audit_info->batch()) { + cql3::statements::batch_statement* batch = dynamic_cast(statement.get()); + return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) { + return inspect(m.statement, query_state, options, error); + }); + } else { + return inspect(*audit_info, query_state, options, error); + } +} + +future<> inspect(const audit_info_alternator& ai, const service::client_state& client_state, bool error) { + return maybe_log(static_cast(ai), client_state, ai.get_cl(), error); +} + future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept { socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr(); if (logger.is_enabled(logging::log_level::debug)) { @@ -262,24 +290,6 @@ future<> audit::log_login(const sstring& username, socket_address client_ip, boo }); } -future<> inspect(shared_ptr statement, service::query_state& query_state, const cql3::query_options& options, bool error) { - auto audit_info = statement->get_audit_info(); - if (!audit_info) { - return make_ready_future<>(); - } - if (audit_info->batch()) { - cql3::statements::batch_statement* batch = static_cast(statement.get()); - return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) { - return inspect(m.statement, query_state, options, error); - }); - } else { - if (audit::local_audit_instance().should_log(audit_info)) { - return audit::local_audit_instance().log(audit_info, query_state, options, error); - } - return make_ready_future<>(); - } -} - future<> inspect_login(const sstring& username, socket_address client_ip, bool error) { if (!audit::audit_instance().local_is_initialized() || !audit::local_audit_instance().should_log_login()) { return make_ready_future<>(); @@ -292,13 +302,21 @@ bool audit::should_log_table(const sstring& keyspace, const sstring& name) const return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend(); } -bool audit::should_log(const audit_info* audit_info) const { - return _audited_categories.contains(audit_info->category()) - && (_audited_keyspaces.find(audit_info->keyspace()) != _audited_keyspaces.cend() - || should_log_table(audit_info->keyspace(), audit_info->table()) - || audit_info->category() == statement_category::AUTH - || audit_info->category() == statement_category::ADMIN - || audit_info->category() == statement_category::DCL); +bool audit::should_log(const audit_info& audit_info) const { + return will_log(audit_info.category(), audit_info.keyspace(), audit_info.table()); +} + +bool audit::will_log(statement_category cat, std::string_view keyspace, std::string_view table) const { + // If keyspace is empty (e.g., ListTables, or batch operations spanning + // multiple tables), the operation cannot be filtered by keyspace/table, + // so it is logged whenever the category matches. + return _audited_categories.contains(cat) + && (keyspace.empty() + || _audited_keyspaces.find(sstring(keyspace)) != _audited_keyspaces.cend() + || should_log_table(sstring(keyspace), sstring(table)) + || cat == statement_category::AUTH + || cat == statement_category::ADMIN + || cat == statement_category::DCL); } template diff --git a/audit/audit.hh b/audit/audit.hh index 32ded3a87b..3663949153 100644 --- a/audit/audit.hh +++ b/audit/audit.hh @@ -10,14 +10,15 @@ #include "seastarx.hh" #include "utils/log.hh" #include "utils/observable.hh" -#include "db/consistency_level.hh" -#include "locator/token_metadata_fwd.hh" +#include "service/client_state.hh" +#include "db/consistency_level_type.hh" #include #include #include "enum_set.hh" #include +#include namespace db { @@ -70,12 +71,15 @@ using category_set = enum_set>; -class audit_info final { +// Holds the audit metadata for a single request: the operation category, +// target keyspace/table, and the query string to be logged. +class audit_info { +protected: statement_category _category; sstring _keyspace; sstring _table; sstring _query; - bool _batch; + bool _batch; // used only for unpacking batches in CQL, not relevant for Alternator public: audit_info(statement_category cat, sstring keyspace, sstring table, bool batch) : _category(cat) @@ -83,8 +87,17 @@ public: , _table(std::move(table)) , _batch(batch) { } - void set_query_string(const std::string_view& query_string) { - _query = sstring(query_string); + // 'operation' is for the cases where the query string does not contain it, like with Alternator + audit_info& set_query_string(std::string_view query_string, std::string_view operation = {}) { + return set_query_string(sstring(query_string), sstring(operation)); + } + audit_info& set_query_string(const sstring& query_string, const sstring& operation = "") { + if(!operation.empty()) { + _query = operation + "|" + query_string; + } else { + _query = query_string; + } + return *this; } const sstring& keyspace() const { return _keyspace; } const sstring& table() const { return _table; } @@ -96,6 +109,23 @@ public: using audit_info_ptr = std::unique_ptr; +// Audit info for Alternator requests. +// Unlike CQL, where the consistency level is available from query_options and +// passed separately to audit::log(), Alternator has no query_options, so we +// store the CL inside the audit_info object. +// Consistency level is optional: only data read/write operations (GetItem, +// PutItem, Query, Scan, etc.) have a meaningful CL. Schema operations and +// metadata queries pass std::nullopt. +class audit_info_alternator final : public audit_info { + std::optional _cl; +public: + audit_info_alternator(statement_category cat, sstring keyspace, sstring table, std::optional cl = std::nullopt) + : audit_info(cat, std::move(keyspace), std::move(table), false), _cl(cl) + {} + + std::optional get_cl() const { return _cl; } +}; + class storage_helper; class audit final : public seastar::async_sharded_service { @@ -142,13 +172,15 @@ public: future<> start(const db::config& cfg); future<> stop(); future<> shutdown(); - bool should_log(const audit_info* audit_info) const; + bool should_log(const audit_info& audit_info) const; + bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const; bool should_log_login() const { return _audited_categories.contains(statement_category::AUTH); } - future<> log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error); + future<> log(const audit_info& audit_info, const service::client_state& client_state, std::optional cl, bool error); future<> log_login(const sstring& username, socket_address client_ip, bool error) noexcept; }; -future<> inspect(shared_ptr statement, service::query_state& query_state, const cql3::query_options& options, bool error); +future<> inspect(const audit_info_alternator& audit_info, const service::client_state& client_state, bool error); +future<> inspect(shared_ptr statement, const service::query_state& query_state, const cql3::query_options& options, bool error); future<> inspect_login(const sstring& username, socket_address client_ip, bool error); diff --git a/audit/audit_cf_storage_helper.cc b/audit/audit_cf_storage_helper.cc index cc56c9f807..1b3e0f7718 100644 --- a/audit/audit_cf_storage_helper.cc +++ b/audit/audit_cf_storage_helper.cc @@ -129,7 +129,7 @@ future<> audit_cf_storage_helper::stop() { future<> audit_cf_storage_helper::write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) { return _table.insert(_qp, _mm, _dummy_query_state, make_data, audit_info, node_ip, client_ip, cl, username, error); @@ -145,7 +145,7 @@ future<> audit_cf_storage_helper::write_login(const sstring& username, cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) { auto time = std::chrono::system_clock::now(); @@ -154,7 +154,7 @@ cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_i auto date = millis_since_epoch / ticks_per_day * ticks_per_day; thread_local static int64_t last_nanos = 0; auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time)); - auto consistency_level = fmt::format("{}", cl); + auto consistency_level = cl ? format("{}", *cl) : sstring(""); std::vector values { cql3::raw_value::make_value(timestamp_type->decompose(date)), cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())), diff --git a/audit/audit_cf_storage_helper.hh b/audit/audit_cf_storage_helper.hh index 63d3edcbc4..5d1313df8a 100644 --- a/audit/audit_cf_storage_helper.hh +++ b/audit/audit_cf_storage_helper.hh @@ -37,7 +37,7 @@ class audit_cf_storage_helper : public storage_helper { static cql3::query_options make_data(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error); static cql3::query_options make_login_data(socket_address node_ip, @@ -55,7 +55,7 @@ public: virtual future<> write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) override; virtual future<> write_login(const sstring& username, diff --git a/audit/audit_composite_storage_helper.cc b/audit/audit_composite_storage_helper.cc index 81ed6ae891..6107a6ab77 100644 --- a/audit/audit_composite_storage_helper.cc +++ b/audit/audit_composite_storage_helper.cc @@ -42,7 +42,7 @@ future<> audit_composite_storage_helper::stop() { future<> audit_composite_storage_helper::write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) { return seastar::parallel_for_each( diff --git a/audit/audit_composite_storage_helper.hh b/audit/audit_composite_storage_helper.hh index f5343aa111..7bac0600da 100644 --- a/audit/audit_composite_storage_helper.hh +++ b/audit/audit_composite_storage_helper.hh @@ -25,7 +25,7 @@ public: virtual future<> write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) override; virtual future<> write_login(const sstring& username, diff --git a/audit/audit_syslog_storage_helper.cc b/audit/audit_syslog_storage_helper.cc index 0ea97ab97e..4d00a35131 100644 --- a/audit/audit_syslog_storage_helper.cc +++ b/audit/audit_syslog_storage_helper.cc @@ -101,18 +101,19 @@ future<> audit_syslog_storage_helper::stop() { future<> audit_syslog_storage_helper::write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) { auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); tm time; localtime_r(&now, &time); + auto cl_str = cl ? format("{}", *cl) : sstring(""); sstring msg = seastar::format(R"(<{}>{:%h %e %T} scylla-audit: node="{}", category="{}", cl="{}", error="{}", keyspace="{}", query="{}", client_ip="{}", table="{}", username="{}")", LOG_NOTICE | LOG_USER, time, node_ip, audit_info->category_string(), - cl, + cl_str, (error ? "true" : "false"), audit_info->keyspace(), json_escape(audit_info->query()), diff --git a/audit/audit_syslog_storage_helper.hh b/audit/audit_syslog_storage_helper.hh index 485d3fa6b7..8b11c56d6b 100644 --- a/audit/audit_syslog_storage_helper.hh +++ b/audit/audit_syslog_storage_helper.hh @@ -35,7 +35,7 @@ public: virtual future<> write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) override; virtual future<> write_login(const sstring& username, diff --git a/audit/storage_helper.hh b/audit/storage_helper.hh index 39560faa14..6599219b0b 100644 --- a/audit/storage_helper.hh +++ b/audit/storage_helper.hh @@ -22,7 +22,7 @@ public: virtual future<> write(const audit_info* audit_info, socket_address node_ip, socket_address client_ip, - db::consistency_level cl, + std::optional cl, const sstring& username, bool error) = 0; virtual future<> write_login(const sstring& username,