Merge 'audit: write out to both table and syslog' from Dario Mirovic

This patch adds support for multiple audit log outputs.

If only one audit log output is enabled, the behavior does not change.
If multiple audit log outputs are enabled, then the `audit_composite_storage_helper` class is used. It has a collection
of `storage_helper` objects.

Performance testing shows that read query throughput and auth request throughput are consistent even at high reactor utilization. It can also be observed that read query latency increases a bit.

Read query ops = 60k/s
AUTH ops = 200/s

| Audit Mode | QUERY latency (p99) | Δ% vs none |
|------------|---------------------|------------|
| none | 777 | 0 |
|table| 801 | +3.09% |
|syslog | 803 | +3.35% |
|table,syslog | 818 | +5.28% |

Read query ops = 50k/s
AUTH ops = 200/s

| Audit Mode | QUERY latency (p99) | Δ% vs none |
|------------|---------------------|------------|
| none | 643 | 0 |
|table| 647 | +0.62% |
|syslog | 648 | +0.78% |
|table,syslog | 656 | +2.02% |

Detailed performance results are in the following Confluence document: [Audit performance impact test](https://scylladb.atlassian.net/wiki/spaces/RND/pages/148308005/Audit+performance+impact+test)

Fixes #26022

Backport:

The decision is to not backport for now. After making sure it works on the latest release, and if there is a need, we can do it.

Closes scylladb/scylladb#26613

* github.com:scylladb/scylladb:
  test: dtest: audit_test.py: add AuditBackendComposite
  test: dtest: audit_test.py: group logs in dict per audit mode
  audit: write out to both table and syslog
  audit: move storage helper creation from `audit::start` to `audit::audit`
  audit: fix formatting in `audit::start_audit`
  audit: unify `create_audit` and `start_audit`
This commit is contained in:
Botond Dénes
2025-11-17 15:04:15 +02:00
11 changed files with 411 additions and 147 deletions

View File

@@ -5,6 +5,7 @@ target_sources(scylla_audit
PRIVATE
audit.cc
audit_cf_storage_helper.cc
audit_composite_storage_helper.cc
audit_syslog_storage_helper.cc)
target_include_directories(scylla_audit
PUBLIC

View File

@@ -13,9 +13,11 @@
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
#include "storage_helper.hh"
#include "audit_cf_storage_helper.hh"
#include "audit_syslog_storage_helper.hh"
#include "audit_composite_storage_helper.hh"
#include "audit.hh"
#include "../db/config.hh"
#include "utils/class_registrator.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
@@ -26,6 +28,47 @@ namespace audit {
logging::logger logger("audit");
static std::set<sstring> parse_audit_modes(const sstring& data) {
std::set<sstring> result;
if (!data.empty()) {
std::vector<sstring> audit_modes;
boost::split(audit_modes, data, boost::is_any_of(","));
if (audit_modes.empty()) {
return {};
}
for (sstring& audit_mode : audit_modes) {
boost::trim(audit_mode);
if (audit_mode == "none") {
return {};
}
if (audit_mode != "table" && audit_mode != "syslog") {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", audit_mode));
}
result.insert(std::move(audit_mode));
}
}
return result;
}
static std::unique_ptr<storage_helper> create_storage_helper(const std::set<sstring>& audit_modes, cql3::query_processor& qp, service::migration_manager& mm) {
SCYLLA_ASSERT(!audit_modes.empty() && !audit_modes.contains("none"));
std::vector<std::unique_ptr<storage_helper>> helpers;
for (const sstring& audit_mode : audit_modes) {
if (audit_mode == "table") {
helpers.emplace_back(std::make_unique<audit_cf_storage_helper>(qp, mm));
} else if (audit_mode == "syslog") {
helpers.emplace_back(std::make_unique<audit_syslog_storage_helper>(qp, mm));
}
}
SCYLLA_ASSERT(!helpers.empty());
if (helpers.size() == 1) {
return std::move(helpers.front());
}
return std::make_unique<audit_composite_storage_helper>(std::move(helpers));
}
static sstring category_to_string(statement_category category)
{
switch (category) {
@@ -103,7 +146,9 @@ static std::set<sstring> parse_audit_keyspaces(const sstring& data) {
}
audit::audit(locator::shared_token_metadata& token_metadata,
sstring&& storage_helper_name,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
@@ -112,28 +157,21 @@ audit::audit(locator::shared_token_metadata& token_metadata,
, _audited_keyspaces(std::move(audited_keyspaces))
, _audited_tables(std::move(audited_tables))
, _audited_categories(std::move(audited_categories))
, _storage_helper_class_name(std::move(storage_helper_name))
, _cfg(cfg)
, _cfg_keyspaces_observer(cfg.audit_keyspaces.observe([this] (sstring const& new_value){ update_config<std::set<sstring>>(new_value, parse_audit_keyspaces, _audited_keyspaces); }))
, _cfg_tables_observer(cfg.audit_tables.observe([this] (sstring const& new_value){ update_config<std::map<sstring, std::set<sstring>>>(new_value, parse_audit_tables, _audited_tables); }))
, _cfg_categories_observer(cfg.audit_categories.observe([this] (sstring const& new_value){ update_config<category_set>(new_value, parse_audit_categories, _audited_categories); }))
{ }
{
_storage_helper_ptr = create_storage_helper(std::move(audit_modes), qp, mm);
}
audit::~audit() = default;
future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm) {
sstring storage_helper_name;
if (cfg.audit() == "table") {
storage_helper_name = "audit_cf_storage_helper";
} else if (cfg.audit() == "syslog") {
storage_helper_name = "audit_syslog_storage_helper";
} else if (cfg.audit() == "none") {
// Audit is off
future<> audit::start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
std::set<sstring> audit_modes = parse_audit_modes(cfg.audit());
if (audit_modes.empty()) {
logger.info("Audit is disabled");
return make_ready_future<>();
} else {
throw audit_exception(fmt::format("Bad configuration: invalid 'audit': {}", cfg.audit()));
}
category_set audited_categories = parse_audit_categories(cfg.audit_categories());
std::map<sstring, std::set<sstring>> audited_tables = parse_audit_tables(cfg.audit_tables());
@@ -143,19 +181,20 @@ future<> audit::create_audit(const db::config& cfg, sharded<locator::shared_toke
cfg.audit(), cfg.audit_categories(), cfg.audit_keyspaces(), cfg.audit_tables());
return audit_instance().start(std::ref(stm),
std::move(storage_helper_name),
std::ref(qp),
std::ref(mm),
std::move(audit_modes),
std::move(audited_keyspaces),
std::move(audited_tables),
std::move(audited_categories),
std::cref(cfg));
}
future<> audit::start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm) {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg, &qp, &mm] (audit& local_audit) {
return local_audit.start(cfg, qp.local(), mm.local());
std::cref(cfg))
.then([&cfg] {
if (!audit_instance().local_is_initialized()) {
return make_ready_future<>();
}
return audit_instance().invoke_on_all([&cfg] (audit& local_audit) {
return local_audit.start(cfg);
});
});
}
@@ -181,15 +220,7 @@ audit_info_ptr audit::create_no_audit_info() {
return audit_info_ptr();
}
future<> audit::start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm) {
try {
_storage_helper_ptr = create_object<storage_helper>(_storage_helper_class_name, qp, mm);
} catch (no_such_class& e) {
logger.error("Can't create audit storage helper {}: not supported", _storage_helper_class_name);
throw;
} catch (...) {
throw;
}
future<> audit::start(const db::config& cfg) {
return _storage_helper_ptr->start(cfg);
}

View File

@@ -102,7 +102,6 @@ class audit final : public seastar::async_sharded_service<audit> {
std::map<sstring, std::set<sstring>> _audited_tables;
category_set _audited_categories;
sstring _storage_helper_class_name;
std::unique_ptr<storage_helper> _storage_helper_ptr;
const db::config& _cfg;
@@ -125,18 +124,20 @@ public:
static audit& local_audit_instance() {
return audit_instance().local();
}
static future<> create_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm);
static future<> start_audit(const db::config& cfg, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> start_audit(const db::config& cfg, sharded<locator::shared_token_metadata>& stm, sharded<cql3::query_processor>& qp, sharded<service::migration_manager>& mm);
static future<> stop_audit();
static audit_info_ptr create_audit_info(statement_category cat, const sstring& keyspace, const sstring& table);
static audit_info_ptr create_no_audit_info();
audit(locator::shared_token_metadata& stm, sstring&& storage_helper_name,
audit(locator::shared_token_metadata& stm,
cql3::query_processor& qp,
service::migration_manager& mm,
std::set<sstring>&& audit_modes,
std::set<sstring>&& audited_keyspaces,
std::map<sstring, std::set<sstring>>&& audited_tables,
category_set&& audited_categories,
const db::config& cfg);
~audit();
future<> start(const db::config& cfg, cql3::query_processor& qp, service::migration_manager& mm);
future<> start(const db::config& cfg);
future<> stop();
future<> shutdown();
bool should_log(const audit_info* audit_info) const;

View File

@@ -11,7 +11,6 @@
#include "cql3/query_processor.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "utils/UUID_gen.hh"
#include "utils/class_registrator.hh"
#include "cql3/query_options.hh"
#include "cql3/statements/ks_prop_defs.hh"
#include "service/migration_manager.hh"
@@ -198,7 +197,4 @@ cql3::query_options audit_cf_storage_helper::make_login_data(socket_address node
return cql3::query_options(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT);
}
using registry = class_registrator<storage_helper, audit_cf_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_cf_storage_helper");
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/loop.hh>
#include <seastar/core/future-util.hh>
#include "audit/audit_composite_storage_helper.hh"
#include "utils/class_registrator.hh"
namespace audit {
audit_composite_storage_helper::audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&& storage_helpers)
: _storage_helpers(std::move(storage_helpers))
{}
future<> audit_composite_storage_helper::start(const db::config& cfg) {
auto res = seastar::parallel_for_each(
_storage_helpers,
[&cfg] (std::unique_ptr<storage_helper>& h) {
return h->start(cfg);
}
);
return res;
}
future<> audit_composite_storage_helper::stop() {
auto res = seastar::parallel_for_each(
_storage_helpers,
[] (std::unique_ptr<storage_helper>& h) {
return h->stop();
}
);
return res;
}
future<> audit_composite_storage_helper::write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[audit_info, node_ip, client_ip, cl, &username, error](std::unique_ptr<storage_helper>& h) {
return h->write(audit_info, node_ip, client_ip, cl, username, error);
}
);
}
future<> audit_composite_storage_helper::write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) {
return seastar::parallel_for_each(
_storage_helpers,
[&username, node_ip, client_ip, error](std::unique_ptr<storage_helper>& h) {
return h->write_login(username, node_ip, client_ip, error);
}
);
}
} // namespace audit

View File

@@ -0,0 +1,37 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include "audit/audit.hh"
#include <seastar/core/future.hh>
#include "storage_helper.hh"
namespace audit {
class audit_composite_storage_helper : public storage_helper {
std::vector<std::unique_ptr<storage_helper>> _storage_helpers;
public:
explicit audit_composite_storage_helper(std::vector<std::unique_ptr<storage_helper>>&&);
virtual ~audit_composite_storage_helper() = default;
virtual future<> start(const db::config& cfg) override;
virtual future<> stop() override;
virtual future<> write(const audit_info* audit_info,
socket_address node_ip,
socket_address client_ip,
db::consistency_level cl,
const sstring& username,
bool error) override;
virtual future<> write_login(const sstring& username,
socket_address node_ip,
socket_address client_ip,
bool error) override;
};
} // namespace audit

View File

@@ -21,7 +21,6 @@
#include <fmt/chrono.h>
#include "cql3/query_processor.hh"
#include "utils/class_registrator.hh"
namespace cql3 {
@@ -143,7 +142,4 @@ future<> audit_syslog_storage_helper::write_login(const sstring& username,
co_await syslog_send_helper(msg.c_str());
}
using registry = class_registrator<storage_helper, audit_syslog_storage_helper, cql3::query_processor&, service::migration_manager&>;
static registry registrator1("audit_syslog_storage_helper");
}

View File

@@ -1196,6 +1196,7 @@ scylla_core = (['message/messaging_service.cc',
'table_helper.cc',
'audit/audit.cc',
'audit/audit_cf_storage_helper.cc',
'audit/audit_composite_storage_helper.cc',
'audit/audit_syslog_storage_helper.cc',
'tombstone_gc_options.cc',
'tombstone_gc.cc',

View File

@@ -1366,9 +1366,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auto destroy_tracing = defer_verbose_shutdown("tracing instance", [&tracing] {
tracing.stop().get();
});
audit::audit::create_audit(*cfg, token_metadata).handle_exception([&] (auto&& e) {
startlog.error("audit creation failed: {}", e);
}).get();
stop_signal.check();
ctx.http_server.server().invoke_on_all([] (auto& server) { server.set_content_streaming(true); }).get();
@@ -2507,7 +2504,9 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
seastar::set_abort_on_ebadf(cfg->abort_on_ebadf());
api::set_server_done(ctx).get();
audit::audit::start_audit(*cfg, qp, mm).get();
audit::audit::start_audit(*cfg, token_metadata, qp, mm).handle_exception([&] (auto&& e) {
startlog.error("audit start failed: {}", e);
}).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});

View File

@@ -126,10 +126,13 @@ class AuditBackend:
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def audit_mode(self) -> str:
raise NotImplementedError
def before_cluster_start(self):
pass
def get_audit_log_list(self, session, consistency_level):
def get_audit_log_dict(self, session, consistency_level):
raise NotImplementedError
@@ -144,6 +147,10 @@ class AuditBackendTable(AuditBackend):
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@override
def audit_mode(self) -> str:
return "table"
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
@@ -153,10 +160,10 @@ class AuditBackendTable(AuditBackend):
return new_audit_settings
@override
def get_audit_log_list(self, session, consistency_level):
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a sorted list of audit log, the logs are sorted by the event times (time-uuid)
with the node as tie breaker.
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
# We would like to have named tuples as results so we can verify the
# order in which the fields are returned as the tests make assumptions about this.
@@ -164,7 +171,7 @@ class AuditBackendTable(AuditBackend):
res = session.execute(SimpleStatement(self.AUDIT_LOG_QUERY, consistency_level=consistency_level))
res_list = list(res)
res_list.sort(key=lambda row: (row.event_time.time, row.node))
return res_list
return { self.audit_mode(): res_list }
class UnixSockerListener:
@@ -236,24 +243,35 @@ class AuditBackendSyslog(AuditBackend):
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
@override
def audit_mode(self) -> str:
return "syslog"
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
# This is a hack. The test framework uses "table" as "not none".
# Appropriate audit mode should be passed from the test itself, and not set here.
# This converts "table" to its own audit mode, or keeps "none" as is.
if "audit" in new_audit_settings and new_audit_settings["audit"] == "table":
new_audit_settings["audit"] = "syslog"
new_audit_settings["audit"] = self.audit_mode()
new_audit_settings["audit_unix_socket_path"] = self.socket_path
for key in modifiers:
new_audit_settings[key] = modifiers[key]
return new_audit_settings
@override
def get_audit_log_list(self, session, consistency_level):
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
lines = self.unix_socket_listener.get_lines()
entries = []
for idx, line in enumerate(lines):
entries.append(self.line_to_row(line, idx))
return entries
return { self.audit_mode(): entries }
def line_to_row(self, line, idx):
metadata, data = line.split(": ", 1)
@@ -278,6 +296,57 @@ class AuditBackendSyslog(AuditBackend):
self.socket_path = new_socket_path
class AuditBackendComposite(AuditBackend):
audit_default_settings = {"audit": "table,syslog", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
backends: list[AuditBackend]
def __init__(self):
super().__init__()
self.backends = [AuditBackendTable(), AuditBackendSyslog()]
def __exit__(self, exc_type, exc_val, exc_tb):
for backend in reversed(self.backends):
try:
backend.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.error(f"Error while exiting backend: {e}")
@override
def audit_mode(self) -> str:
return ",".join([backend.audit_mode() for backend in self.backends])
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
# This is a hack. The test framework uses "table" as "not none".
# The syslog backend may change "table" to "syslog" before this is called.
# Appropriate audit mode should be passed from the test itself, and not set here.
# This converts "table" or "syslog" to its own audit mode, or keeps "none" as is.
for backend in self.backends:
new_audit_settings = backend.update_audit_settings(new_audit_settings)
if "audit" in new_audit_settings and (new_audit_settings["audit"] == "table" or new_audit_settings["audit"] == "syslog"):
new_audit_settings["audit"] = self.audit_mode()
for key in modifiers:
new_audit_settings[key] = modifiers[key]
return new_audit_settings
@override
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
rows_dict = dict[str, list[AuditEntry]]()
for backend in self.backends:
backend_rows_dict = backend.get_audit_log_dict(session, consistency_level)
for mode, backend_rows in backend_rows_dict.items():
assert mode not in rows_dict
rows_dict[mode] = backend_rows
return rows_dict
@pytest.mark.single_node
class TestCQLAudit(AuditTester):
"""
@@ -286,33 +355,36 @@ class TestCQLAudit(AuditTester):
AUDIT_LOG_QUERY = "SELECT * FROM audit.audit_log"
def deduplicate_audit_entries(self, entries):
def deduplicate_audit_entries(self, entries_dict):
"""
Returns a list of audit entries with duplicate entries removed.
Returns a dictionary mapping audit mode name to a list of audit entries with duplicate entries removed.
"""
unique = set()
deduplicated_entries = []
deduplicated_entries_dict = dict[str, list[AuditEntry]]()
for entry in entries:
fields_subset = (entry.node, entry.category, entry.consistency, entry.error, entry.keyspace_name, entry.operation, entry.source, entry.table_name, entry.username)
for mode, entries in entries_dict.items():
unique = set()
deduplicated_entries = list[AuditEntry]()
for entry in entries:
fields_subset = (entry.node, entry.category, entry.consistency, entry.error, entry.keyspace_name, entry.operation, entry.source, entry.table_name, entry.username)
if fields_subset in unique:
continue
if fields_subset in unique:
continue
unique.add(fields_subset)
deduplicated_entries.append(entry)
unique.add(fields_subset)
deduplicated_entries.append(entry)
deduplicated_entries_dict[mode] = deduplicated_entries
return deduplicated_entries
return deduplicated_entries_dict
def get_audit_log_list(self, session):
def get_audit_log_dict(self, session):
"""_summary_
returns a sorted list of audit log, the logs are sorted by the event times (time-uuid)
with the node as tie breaker.
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
consistency_level = ConsistencyLevel.QUORUM if len(self.cluster.nodelist()) > 1 else ConsistencyLevel.ONE
log_list = self.helper.get_audit_log_list(session, consistency_level)
logger.debug(f"get_audit_log_list: {log_list}")
return log_list
log_dict = self.helper.get_audit_log_dict(session, consistency_level)
logger.debug(f"get_audit_log_dict: {log_dict}")
return log_dict
# This assert is added just in order to still fail the test if the order of columns is changed, this is an implied assumption
def assert_audit_row_fields(self, row):
@@ -341,13 +413,17 @@ class TestCQLAudit(AuditTester):
assert row.table_name == table
assert row.username == user
def get_audit_entries_count(self, session):
res_list = self.get_audit_log_list(session)
res_list = self.filter_out_noise(res_list, filter_out_all_auth=True, filter_out_use=True)
logger.debug("Printing audit table content:")
for row in res_list:
logger.debug(" %s", row)
return len(res_list)
def get_audit_entries_count_dict(self, session) -> dict[str, int]:
"""_summary_
returns a dictionary mapping audit mode name to the count of audit log entries for that mode.
"""
reg_dict = self.get_audit_log_dict(session)
reg_dict = self.filter_out_noise(reg_dict, filter_out_all_auth=True, filter_out_use=True)
for mode, reg_list in reg_dict.items():
logger.debug(f"Printing audit {mode} content:")
for row in reg_list:
logger.debug(" %s", row)
return { mode: len(reg_list) for mode, reg_list in reg_dict.items() }
@staticmethod
def token_in_range(token, start_token, end_token):
@@ -395,17 +471,23 @@ class TestCQLAudit(AuditTester):
@contextmanager
def assert_exactly_n_audit_entries_were_added(self, session: Session, expected_entries: int):
count_before = self.get_audit_entries_count(session)
counts_before = self.get_audit_entries_count_dict(session)
yield
count_after = self.get_audit_entries_count(session)
assert count_after == count_before + expected_entries, f"Expected {expected_entries} new audit entries, but got {count_after - count_before} new entries"
counts_after = self.get_audit_entries_count_dict(session)
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
for mode, count_before in counts_before.items():
count_after = counts_after[mode]
assert count_after == count_before + expected_entries, f"Expected {expected_entries} new audit entries, but got {count_after - count_before} new entries"
@contextmanager
def assert_no_audit_entries_were_added(self, session):
count_before = self.get_audit_entries_count(session)
counts_before = self.get_audit_entries_count_dict(session)
yield
count_after = self.get_audit_entries_count(session)
assert count_before == count_after, f"audit entries count changed (before: {count_before} after: {count_after})"
counts_after = self.get_audit_entries_count_dict(session)
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
for mode, count_before in counts_before.items():
count_after = counts_after[mode]
assert count_before == count_after, f"audit entries count changed (before: {count_before} after: {count_after})"
def execute_and_validate_audit_entry( # noqa: PLR0913
self,
@@ -456,56 +538,70 @@ class TestCQLAudit(AuditTester):
# Filter out queries that can appear in random moments of the tests,
# such as LOGINs and USE statements.
def filter_out_noise(self, rows, filter_out_all_auth=False, filter_out_cassandra_auth=False, filter_out_use=False):
if filter_out_all_auth:
rows = [row for row in rows if row.category != "AUTH"]
if filter_out_cassandra_auth:
rows = [row for row in rows if not (row.category == "AUTH" and row.username == "cassandra")]
if filter_out_use:
rows = [row for row in rows if "USE " not in row.operation]
return rows
def filter_out_noise(self, rows_dict, filter_out_all_auth=False, filter_out_cassandra_auth=False, filter_out_use=False) -> dict[str, list[AuditEntry]]:
for mode, rows in rows_dict.items():
if filter_out_all_auth:
rows = [row for row in rows if row.category != "AUTH"]
if filter_out_cassandra_auth:
rows = [row for row in rows if not (row.category == "AUTH" and row.username == "cassandra")]
if filter_out_use:
rows = [row for row in rows if "USE " not in row.operation]
rows_dict[mode] = rows
return rows_dict
@contextmanager
def assert_entries_were_added(self, session: Session, expected_entries: list[AuditEntry], merge_duplicate_rows: bool = True, filter_out_cassandra_auth: bool = False):
# Get audit entries before executing the query, to later compare with
# audit entries after executing the query.
rows_before = self.get_audit_log_list(session)
set_of_rows_before = set(rows_before)
assert len(set_of_rows_before) == len(rows_before), f"audit table contains duplicate rows: {rows_before}"
set_of_rows_before_dict = dict[str, set[AuditEntry]]()
rows_before_dict = self.get_audit_log_dict(session)
for mode, rows_before in rows_before_dict.items():
set_of_rows_before = set(rows_before)
assert len(set_of_rows_before) == len(rows_before), f"audit {mode} contains duplicate rows: {rows_before}"
set_of_rows_before_dict[mode] = set_of_rows_before
yield
new_rows = []
new_rows_dict = dict[str, list[AuditEntry]]()
def is_number_of_new_rows_correct():
rows_after = self.get_audit_log_list(session)
set_of_rows_after = set(rows_after)
assert len(set_of_rows_after) == len(rows_after), f"audit table contains duplicate rows: {rows_after}"
rows_after_dict = self.get_audit_log_dict(session)
set_of_rows_after_dict = dict[str, set[AuditEntry]]()
for mode, rows_after in rows_after_dict.items():
set_of_rows_after = set(rows_after)
assert len(set_of_rows_after) == len(rows_after), f"audit {mode} contains duplicate rows: {rows_after}"
set_of_rows_after_dict[mode] = set_of_rows_after
nonlocal new_rows
new_rows = rows_after[len(rows_before) :]
assert set(new_rows) == set_of_rows_after - set_of_rows_before, f"new rows are not the last rows in the audit table: rows_after={rows_after}, set_of_rows_after={set_of_rows_after}, set_of_rows_before={set_of_rows_before}"
nonlocal new_rows_dict
for mode, rows_after in rows_after_dict.items():
rows_before = rows_before_dict[mode]
new_rows_dict[mode] = rows_after[len(rows_before) :]
assert set(new_rows_dict[mode]) == set_of_rows_after_dict[mode] - set_of_rows_before_dict[mode], f"new rows are not the last rows in the audit table: rows_after={rows_after}, set_of_rows_after_dict[{mode}]={set_of_rows_after_dict[mode]}, set_of_rows_before_dict[{mode}]={set_of_rows_before_dict[mode]}"
if merge_duplicate_rows:
new_rows = self.deduplicate_audit_entries(new_rows)
new_rows_dict = self.deduplicate_audit_entries(new_rows_dict)
auth_not_expected = (len([entry for entry in expected_entries if entry.category == "AUTH"]) == 0)
use_not_expected = (len([entry for entry in expected_entries if "USE " in entry.statement]) == 0)
new_rows = self.filter_out_noise(
new_rows,
new_rows_dict = self.filter_out_noise(
new_rows_dict,
filter_out_all_auth=auth_not_expected,
filter_out_cassandra_auth=filter_out_cassandra_auth,
filter_out_use=use_not_expected
)
assert len(new_rows) <= len(expected_entries)
return len(new_rows) == len(expected_entries)
for new_rows in new_rows_dict.values():
assert len(new_rows) <= len(expected_entries)
if len(new_rows) != len(expected_entries):
return False
return True
wait_for(is_number_of_new_rows_correct, timeout=60)
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
for mode, new_rows in new_rows_dict.items():
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
def verify_keyspace(self, audit_settings=None, helper=None):
"""
@@ -548,7 +644,7 @@ class TestCQLAudit(AuditTester):
for query in query_sequence:
session.execute(query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_using_non_existent_keyspace(self, helper_class):
"""
Test tha using a non-existent keyspace generates an audit entry with an
@@ -664,22 +760,22 @@ class TestCQLAudit(AuditTester):
for query in query_sequence:
session.execute(query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings=AuditTester.audit_default_settings, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_extra_parameter(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,DML,DDL,DCL", "audit_keyspaces": "ks", "extra_parameter": "new"}, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_many_ks(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "a,b,c,ks"}, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_table_not_exists(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "DML,DDL", "audit_keyspaces": "ks", "audit_tables": "ks.fake"}, helper=helper)
@@ -726,6 +822,28 @@ class TestCQLAudit(AuditTester):
self.ignore_log_patterns.append(expected_error)
self.cluster.nodes["node1"].watch_log_for(expected_error)
def test_composite_audit_type_invalid(self):
"""
'audit': table,syslog,invalid
check node not started
"""
self.fixture_dtest_setup.allow_log_errors = True
audit_settings = {"audit": "table,syslog,invalid", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
cluster = self.cluster
cluster.set_configuration_options(values=audit_settings)
try:
cluster.populate(1).start(no_wait=True)
except (NodeError, RuntimeError):
pass
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit': invalid\)"
self.ignore_log_patterns.append(expected_error)
self.cluster.nodes["node1"].watch_log_for(expected_error)
# TODO: verify that the syslog file doesn't exist
def test_audit_empty_settings(self):
"""
@@ -735,6 +853,14 @@ class TestCQLAudit(AuditTester):
session = self.prepare(create_keyspace=False, audit_settings={"audit": "none"})
assert_invalid(session, "use audit;", expected=InvalidRequest)
def test_composite_audit_empty_settings(self):
"""
'audit': table,syslog,none
check node started, ks audit not created
"""
session = self.prepare(create_keyspace=False, audit_settings={"audit": "table,syslog,none"})
assert_invalid(session, "use audit;", expected=InvalidRequest)
def test_audit_audit_ks(self):
"""
'audit_keyspaces': 'audit'
@@ -746,7 +872,7 @@ class TestCQLAudit(AuditTester):
self.execute_and_validate_audit_entry(session, query=self.AUDIT_LOG_QUERY, category="QUERY", ks="audit", table="audit_log", audit_settings=audit_settings)
@pytest.mark.single_node
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_invalid(self, helper_class):
"""
'audit_categories': invalid
@@ -792,20 +918,20 @@ class TestCQLAudit(AuditTester):
self.verify_table(audit_settings={"audit": "table", "audit_categories": "AUTH,QUERY,DDL"}, table_prefix="test_audit_categories_part1", overwrite_audit_tables=True)
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_part2(self, helper_class):
with helper_class() as helper:
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH,DCL", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part2")
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_part3(self, helper_class):
with helper_class() as helper:
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part3")
PasswordMaskingCase = namedtuple("PasswordMaskingCase", ["name", "password", "new_password"])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_user_password_masking(self, helper_class):
"""
CREATE USER, ALTER USER, DROP USER statements
@@ -922,7 +1048,7 @@ class TestCQLAudit(AuditTester):
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=Unavailable)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_role_password_masking(self, helper_class):
"""
CREATE ROLE, ALTER ROLE, DROP ROLE statements
@@ -1094,26 +1220,35 @@ class TestCQLAudit(AuditTester):
pytest.fail("Expected insert to fail")
node_to_stop.start(wait_for_binary_proto=True, wait_other_notice=True)
rows = []
rows_dict = dict[str, list[AuditEntry]]()
timestamp_before = datetime.datetime.now()
for i in itertools.count(start=1):
if datetime.datetime.now() - timestamp_before > datetime.timedelta(seconds=60):
pytest.fail(f"audit log not updated after {i} iterations")
rows = self.get_audit_log_list(session)
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
logger.info(f"audit log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"
rows_dict = self.get_audit_log_dict(session)
# We need to satisfy the end state condition for all audit modes.
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = list(filter(lambda r: r.error, rows))
if len(rows_with_error) == 6:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"
# We expect the initial insert to be in the audit log.
# it is executed in _test_insert_failure_doesnt_report_success_assign_nodes
rows_without_error = [row for row in rows if row.operation == query_to_fail and not row.error]
assert len(rows_without_error) == 1
# We expect the initial insert to be in the audit log.
# it is executed in _test_insert_failure_doesnt_report_success_assign_nodes
rows_without_error = [row for row in rows if row.operation == query_to_fail and not row.error]
assert len(rows_without_error) == 1
else:
# An audit mode is not done yet, early exit to continue polling.
all_modes_done = False
break
if all_modes_done:
break
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_prepare(self, helper_class):
"""Test prepare statement"""
with helper_class() as helper:
@@ -1141,7 +1276,7 @@ class TestCQLAudit(AuditTester):
table="cf",
)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_permissions(self, helper_class):
"""Test user permissions"""
@@ -1173,7 +1308,7 @@ class TestCQLAudit(AuditTester):
expected_error=Unauthorized,
)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_batch(self, helper_class):
"""
BATCH statement
@@ -1309,7 +1444,7 @@ class TestCQLAudit(AuditTester):
self.verify_change(node, param, settings[param], mark, expected_result)
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_config_liveupdate(self, helper_class, audit_config_changer):
"""
Test liveupdate config changes in audit.
@@ -1370,7 +1505,7 @@ class TestCQLAudit(AuditTester):
session.execute(auditted_query)
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_config_no_liveupdate(self, helper_class, audit_config_changer):
"""
Test audit config parameters that don't allow config changes.
@@ -1402,7 +1537,7 @@ class TestCQLAudit(AuditTester):
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
session.execute(auditted_query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_parallel_syslog_audit(self, helper_class):
"""
Test that cluster doesn't fail if multiple queries are audited in parallel

View File

@@ -395,10 +395,9 @@ int scylla_simple_query_main(int argc, char** argv) {
cfg.stop_on_error = app.configuration()["stop-on-error"].as<bool>();
cfg.timeout = app.configuration()["timeout"].as<std::string>();
cfg.bypass_cache = app.configuration().contains("bypass-cache");
audit::audit::create_audit(env.local_db().get_config(), env.get_shared_token_metadata()).handle_exception([&] (auto&& e) {
fmt::print("audit creation failed: {}", e);
audit::audit::start_audit(env.local_db().get_config(), env.get_shared_token_metadata(), env.qp(), env.migration_manager()).handle_exception([&] (auto&& e) {
fmt::print("audit start failed: {}", e);
}).get();
audit::audit::start_audit(env.local_db().get_config(), env.qp(), env.migration_manager()).get();
auto audit_stop = defer([] {
audit::audit::stop_audit().get();
});