audit/alternator: Refactor in preparation for auditing Alternator

Prepare API in audit for auditing Alternator.
The API provides an externally-callable functions `inspect()`,
for both CQL and Alternator.
Both variants of the function would unpack parameters and merge into
calling a common `maybe_log()`, which can then call `log()` when
conditions are met.
Also, while I was at it, (const) references were favoured over raw
pointers.

The Alternator audit_info subclass (audit_info_alternator) carries an
optional consistency level — only data read/write operations have a
meaningful CL, while DDL and metadata queries store an empty string
in the audit table and syslog (matching the existing write_login
behavior). The storage helpers are updated accordingly.

Add a will_log(category, keyspace, table) method that checks whether
an operation should be audited (category check AND keyspace/table
filtering) without requiring a constructed audit_info object.
should_log() delegates to will_log().
This commit is contained in:
Piotr Szymaniak
2026-01-02 14:12:04 +01:00
parent 3d0582d51e
commit 9646ee05bd
9 changed files with 105 additions and 54 deletions

View File

@@ -228,27 +228,55 @@ future<> audit::shutdown() {
return make_ready_future<>();
}
future<> audit::log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error) {
const service::client_state& client_state = query_state.get_client_state();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
db::consistency_level cl = options.get_consistency();
future<> audit::log(const audit_info& audit_info, const service::client_state& client_state, std::optional<db::consistency_level> cl, bool error) {
thread_local static sstring no_username("undefined");
static const sstring anonymous_username("anonymous");
const sstring& username = client_state.user() ? client_state.user()->name.value_or(anonymous_username) : no_username;
socket_address client_ip = client_state.get_client_address().addr();
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (logger.is_enabled(logging::log_level::debug)) {
logger.debug("Log written: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {}",
node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(),
audit_info->query(), client_ip, audit_info->table(), username);
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username);
}
return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, audit_info, node_ip, client_ip, cl, username, error)
return futurize_invoke(std::mem_fn(&storage_helper::write), _storage_helper_ptr, &audit_info, node_ip, client_ip, cl, username, error)
.handle_exception([audit_info, node_ip, client_ip, cl, username, error] (auto ep) {
logger.error("Unexpected exception when writing log with: node_ip {} category {} cl {} error {} keyspace {} query '{}' client_ip {} table {} username {} exception {}",
node_ip, audit_info->category_string(), cl, error, audit_info->keyspace(),
audit_info->query(), client_ip, audit_info->table(),username, ep);
node_ip, audit_info.category_string(), cl, error, audit_info.keyspace(),
audit_info.query(), client_ip, audit_info.table(), username, ep);
});
}
static future<> maybe_log(const audit_info& audit_info, const service::client_state& client_state, std::optional<db::consistency_level> cl, bool error) {
if(audit::audit_instance().local_is_initialized() && audit::local_audit_instance().should_log(audit_info)) {
return audit::local_audit_instance().log(audit_info, client_state, cl, error);
}
return make_ready_future<>();
}
static future<> inspect(const audit_info& audit_info, const service::query_state& query_state, const cql3::query_options& options, bool error) {
return maybe_log(audit_info, query_state.get_client_state(), options.get_consistency(), error);
}
future<> inspect(shared_ptr<cql3::cql_statement> statement, const service::query_state& query_state, const cql3::query_options& options, bool error) {
const auto audit_info = statement->get_audit_info();
if (audit_info == nullptr) {
return make_ready_future<>();
}
if (audit_info->batch()) {
cql3::statements::batch_statement* batch = dynamic_cast<cql3::statements::batch_statement*>(statement.get());
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
return inspect(m.statement, query_state, options, error);
});
} else {
return inspect(*audit_info, query_state, options, error);
}
}
future<> inspect(const audit_info_alternator& ai, const service::client_state& client_state, bool error) {
return maybe_log(static_cast<const audit_info&>(ai), client_state, ai.get_cl(), error);
}
future<> audit::log_login(const sstring& username, socket_address client_ip, bool error) noexcept {
socket_address node_ip = _token_metadata.get()->get_topology().my_address().addr();
if (logger.is_enabled(logging::log_level::debug)) {
@@ -262,24 +290,6 @@ future<> audit::log_login(const sstring& username, socket_address client_ip, boo
});
}
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error) {
auto audit_info = statement->get_audit_info();
if (!audit_info) {
return make_ready_future<>();
}
if (audit_info->batch()) {
cql3::statements::batch_statement* batch = static_cast<cql3::statements::batch_statement*>(statement.get());
return do_for_each(batch->statements().begin(), batch->statements().end(), [&query_state, &options, error] (auto&& m) {
return inspect(m.statement, query_state, options, error);
});
} else {
if (audit::local_audit_instance().should_log(audit_info)) {
return audit::local_audit_instance().log(audit_info, query_state, options, error);
}
return make_ready_future<>();
}
}
future<> inspect_login(const sstring& username, socket_address client_ip, bool error) {
if (!audit::audit_instance().local_is_initialized() || !audit::local_audit_instance().should_log_login()) {
return make_ready_future<>();
@@ -292,13 +302,21 @@ bool audit::should_log_table(const sstring& keyspace, const sstring& name) const
return keyspace_it != _audited_tables.cend() && keyspace_it->second.find(name) != keyspace_it->second.cend();
}
bool audit::should_log(const audit_info* audit_info) const {
return _audited_categories.contains(audit_info->category())
&& (_audited_keyspaces.find(audit_info->keyspace()) != _audited_keyspaces.cend()
|| should_log_table(audit_info->keyspace(), audit_info->table())
|| audit_info->category() == statement_category::AUTH
|| audit_info->category() == statement_category::ADMIN
|| audit_info->category() == statement_category::DCL);
bool audit::should_log(const audit_info& audit_info) const {
return will_log(audit_info.category(), audit_info.keyspace(), audit_info.table());
}
bool audit::will_log(statement_category cat, std::string_view keyspace, std::string_view table) const {
// If keyspace is empty (e.g., ListTables, or batch operations spanning
// multiple tables), the operation cannot be filtered by keyspace/table,
// so it is logged whenever the category matches.
return _audited_categories.contains(cat)
&& (keyspace.empty()
|| _audited_keyspaces.find(sstring(keyspace)) != _audited_keyspaces.cend()
|| should_log_table(sstring(keyspace), sstring(table))
|| cat == statement_category::AUTH
|| cat == statement_category::ADMIN
|| cat == statement_category::DCL);
}
template<class T>

View File

@@ -10,14 +10,15 @@
#include "seastarx.hh"
#include "utils/log.hh"
#include "utils/observable.hh"
#include "db/consistency_level.hh"
#include "locator/token_metadata_fwd.hh"
#include "service/client_state.hh"
#include "db/consistency_level_type.hh"
#include <seastar/core/sharded.hh>
#include <seastar/util/log.hh>
#include "enum_set.hh"
#include <memory>
#include <optional>
namespace db {
@@ -70,12 +71,15 @@ using category_set = enum_set<super_enum<statement_category, statement_category:
statement_category::AUTH,
statement_category::ADMIN>>;
class audit_info final {
// Holds the audit metadata for a single request: the operation category,
// target keyspace/table, and the query string to be logged.
class audit_info {
protected:
statement_category _category;
sstring _keyspace;
sstring _table;
sstring _query;
bool _batch;
bool _batch; // used only for unpacking batches in CQL, not relevant for Alternator
public:
audit_info(statement_category cat, sstring keyspace, sstring table, bool batch)
: _category(cat)
@@ -83,8 +87,17 @@ public:
, _table(std::move(table))
, _batch(batch)
{ }
void set_query_string(const std::string_view& query_string) {
_query = sstring(query_string);
// 'operation' is for the cases where the query string does not contain it, like with Alternator
audit_info& set_query_string(std::string_view query_string, std::string_view operation = {}) {
return set_query_string(sstring(query_string), sstring(operation));
}
audit_info& set_query_string(const sstring& query_string, const sstring& operation = "") {
if(!operation.empty()) {
_query = operation + "|" + query_string;
} else {
_query = query_string;
}
return *this;
}
const sstring& keyspace() const { return _keyspace; }
const sstring& table() const { return _table; }
@@ -96,6 +109,23 @@ public:
using audit_info_ptr = std::unique_ptr<audit_info>;
// Audit info for Alternator requests.
// Unlike CQL, where the consistency level is available from query_options and
// passed separately to audit::log(), Alternator has no query_options, so we
// store the CL inside the audit_info object.
// Consistency level is optional: only data read/write operations (GetItem,
// PutItem, Query, Scan, etc.) have a meaningful CL. Schema operations and
// metadata queries pass std::nullopt.
class audit_info_alternator final : public audit_info {
std::optional<db::consistency_level> _cl;
public:
audit_info_alternator(statement_category cat, sstring keyspace, sstring table, std::optional<db::consistency_level> cl = std::nullopt)
: audit_info(cat, std::move(keyspace), std::move(table), false), _cl(cl)
{}
std::optional<db::consistency_level> get_cl() const { return _cl; }
};
class storage_helper;
class audit final : public seastar::async_sharded_service<audit> {
@@ -142,13 +172,15 @@ public:
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown();
bool should_log(const audit_info* audit_info) const;
bool should_log(const audit_info& audit_info) const;
bool will_log(statement_category cat, std::string_view keyspace = {}, std::string_view table = {}) const;
bool should_log_login() const { return _audited_categories.contains(statement_category::AUTH); }
future<> log(const audit_info* audit_info, service::query_state& query_state, const cql3::query_options& options, bool error);
future<> log(const audit_info& audit_info, const service::client_state& client_state, std::optional<db::consistency_level> cl, bool error);
future<> log_login(const sstring& username, socket_address client_ip, bool error) noexcept;
};
future<> inspect(shared_ptr<cql3::cql_statement> statement, service::query_state& query_state, const cql3::query_options& options, bool error);
future<> inspect(const audit_info_alternator& audit_info, const service::client_state& client_state, bool error);
future<> inspect(shared_ptr<cql3::cql_statement> statement, const service::query_state& query_state, const cql3::query_options& options, bool error);
future<> inspect_login(const sstring& username, socket_address client_ip, bool error);

View File

@@ -129,7 +129,7 @@ future<> audit_cf_storage_helper::stop() {
future<> audit_cf_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) {
return _table.insert(_qp, _mm, _dummy_query_state, make_data, audit_info, node_ip, client_ip, cl, username, error);
@@ -145,7 +145,7 @@ future<> audit_cf_storage_helper::write_login(const sstring& username,
cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) {
auto time = std::chrono::system_clock::now();
@@ -154,7 +154,7 @@ cql3::query_options audit_cf_storage_helper::make_data(const audit_info* audit_i
auto date = millis_since_epoch / ticks_per_day * ticks_per_day;
thread_local static int64_t last_nanos = 0;
auto time_id = utils::UUID_gen::get_time_UUID(table_helper::make_monotonic_UUID_tp(last_nanos, time));
auto consistency_level = fmt::format("{}", cl);
auto consistency_level = cl ? format("{}", *cl) : sstring("");
std::vector<cql3::raw_value> values {
cql3::raw_value::make_value(timestamp_type->decompose(date)),
cql3::raw_value::make_value(inet_addr_type->decompose(node_ip.addr())),

View File

@@ -37,7 +37,7 @@ class audit_cf_storage_helper : public storage_helper {
static cql3::query_options make_data(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error);
static cql3::query_options make_login_data(socket_address node_ip,
@@ -55,7 +55,7 @@ public:
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,

View File

@@ -42,7 +42,7 @@ future<> audit_composite_storage_helper::stop() {
future<> audit_composite_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) {
return seastar::parallel_for_each(

View File

@@ -25,7 +25,7 @@ public:
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,

View File

@@ -101,18 +101,19 @@ future<> audit_syslog_storage_helper::stop() {
future<> audit_syslog_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) {
auto now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
tm time;
localtime_r(&now, &time);
auto cl_str = cl ? format("{}", *cl) : sstring("");
sstring msg = seastar::format(R"(<{}>{:%h %e %T} scylla-audit: node="{}", category="{}", cl="{}", error="{}", keyspace="{}", query="{}", client_ip="{}", table="{}", username="{}")",
LOG_NOTICE | LOG_USER,
time,
node_ip,
audit_info->category_string(),
cl,
cl_str,
(error ? "true" : "false"),
audit_info->keyspace(),
json_escape(audit_info->query()),

View File

@@ -35,7 +35,7 @@ public:
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,

View File

@@ -22,7 +22,7 @@ public:
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
std::optional<db::consistency_level> cl,
const sstring& username,
bool error) = 0;
virtual future<> write_login(const sstring& username,