Files
scylladb/test/cluster/dtest/audit_test.py
Dario Mirovic f545ed37bc test: dtest: audit_test.py: fix audit error log detection
`test_insert_failure_doesnt_report_success` test in `test/cluster/dtest/audit_test.py`
has an insert statement that is expected to fail. Dtest environment uses
`FlakyRetryPolicy`, which has `max_retries = 5`. 1 initial fail and 5 retry fails
means we expect 6 error audit logs.

The test failed because `create keyspace ks` failed once, then succeeded on retry.
It allowed the test to proceed properly, but the last part of the test that expects
exactly 6 failed queries actually had 7.

The goal of this patch is to make sure there are exactly 6 = 1 + `max_retries` failed
queries, counting only the query expected to fail. If other queries fail with
successful retry, it's fine. If other queries fail without successful retry, the test
will fail, as it should in such situations. They are not related to this expected
failed insert statement.

Fixes #27322

Closes scylladb/scylladb#27378
2025-12-11 10:17:07 +03:00

1560 lines
73 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import copy
import datetime
import enum
import itertools
import logging
import os.path
import re
import socket
import socketserver
import tempfile
import threading
import uuid
from collections import namedtuple
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Optional, override
import pytest
import requests
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
from cassandra.query import SimpleStatement, named_tuple_factory
from ccmlib.scylla_node import ScyllaNode, NodeError
from dtest_class import Tester, create_ks, wait_for
from tools.assertions import assert_invalid
from tools.cluster import run_rest_api
from tools.data import rows_to_list, run_in_parallel
from test.pylib.rest_client import read_barrier
logger = logging.getLogger(__name__)
class AuditRowMustNotExistError(Exception):
pass
class AuditTester(Tester):
audit_default_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
def prepare( # noqa: PLR0913
self,
ordered=False,
create_keyspace=True,
use_cache=False,
nodes=1,
rf=1,
protocol_version=None,
user=None,
password=None,
audit_settings=audit_default_settings,
reload_config=False,
helper=None,
**kwargs,
):
logger.debug(f"Preparing cluster with {nodes} node(s): rf={rf} ordered={ordered} use_cache={use_cache} audit_settings={audit_settings}")
cluster = self.cluster
cluster.set_configuration_options(values={"logger_log_level": {"audit": "debug"}})
if helper is None:
self.helper = AuditBackendTable()
else:
self.helper = helper
self.helper.before_cluster_start()
audit_settings = self.helper.update_audit_settings(audit_settings)
if ordered:
cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner")
if use_cache:
cluster.set_configuration_options(values={"row_cache_size_in_mb": 100})
jvm_args = kwargs.pop("jvm_args", [])
cluster.set_configuration_options(values=audit_settings)
if user:
config = {"authenticator": "org.apache.cassandra.auth.PasswordAuthenticator", "authorizer": "org.apache.cassandra.auth.CassandraAuthorizer", "permissions_validity_in_ms": 0}
cluster.set_configuration_options(values=config)
if reload_config:
# The cluster is restarted to reload the config file.
cluster.stop()
cluster.start(wait_for_binary_proto=True)
if not cluster.nodelist():
if not isinstance(nodes, dict):
nodes = [nodes]
cluster.populate(nodes).start(wait_for_binary_proto=True, jvm_args=jvm_args)
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1, protocol_version=protocol_version, user=user, password=password)
if create_keyspace:
session.execute("DROP KEYSPACE IF EXISTS ks")
create_ks(session, "ks", rf)
return session
@dataclass(eq=True, order=True)
class AuditEntry:
category: str
cl: str
error: bool
ks: str
statement: str
table: str
user: str
class AuditBackend:
def __init__(self) -> None:
super().__init__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def audit_mode(self) -> str:
raise NotImplementedError
def before_cluster_start(self):
pass
def get_audit_log_dict(self, session, consistency_level):
raise NotImplementedError
class AuditBackendTable(AuditBackend):
AUDIT_LOG_QUERY = "SELECT * FROM audit.audit_log"
audit_default_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
def __init__(self):
super().__init__()
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@override
def audit_mode(self) -> str:
return "table"
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
for key in modifiers:
new_audit_settings[key] = modifiers[key]
return new_audit_settings
@override
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
# We would like to have named tuples as results so we can verify the
# order in which the fields are returned as the tests make assumptions about this.
assert session.row_factory == named_tuple_factory
res = session.execute(SimpleStatement(self.AUDIT_LOG_QUERY, consistency_level=consistency_level))
res_list = list(res)
res_list.sort(key=lambda row: (row.event_time.time, row.node))
return { self.audit_mode(): res_list }
class UnixSockerListener:
class UnixDatagramHandler(socketserver.BaseRequestHandler):
def handle(self):
with self.server.parent_instance.condition:
data = self.request[0].decode("utf-8").strip()
if data.startswith(self.server.parent_instance.notification_msg):
received_id = int(data[len(self.server.parent_instance.notification_msg):])
self.server.parent_instance.notification_id = received_id
self.server.parent_instance.condition.notify_all()
elif data != "Initializing syslog audit backend.":
self.server.parent_instance.lines.append(data)
class UnixDatagramServer(socketserver.ThreadingUnixDatagramServer):
def __init__(self, socket_path, handler, parent_instance, lock):
self.parent_instance = parent_instance
self.mutex = lock
super().__init__(socket_path, handler)
def __init__(self, socket_path):
self.socket_path = socket_path
self.lines = []
self.server = self.UnixDatagramServer(socket_path, self.UnixDatagramHandler, self, threading.Lock())
self.server.server_activate()
self.notification_msg = "Notifying syslog server with id: "
self.notification_id = 0
self.condition = threading.Condition(self.server.mutex)
self.thread = threading.Thread(target=self.server.serve_forever)
self.thread.start()
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
def wait_for_newer_notification_id(self):
with self.condition:
expected_id = self.notification_id + 1
self.sock.sendto(bytes(self.notification_msg + str(expected_id), 'utf-8'), self.socket_path)
self.condition.wait_for(lambda: self.notification_id == expected_id)
def get_lines(self):
# Make sure all in-progress handle() calls are finished
self.wait_for_newer_notification_id()
return copy.deepcopy(self.lines)
def shutdown(self):
self.server.shutdown()
self.thread.join()
self.server.server_close()
self.sock.close()
class AuditBackendSyslog(AuditBackend):
audit_default_settings = {"audit": "syslog", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
def __init__(self):
super().__init__()
self.update_socket_path()
self.unix_socket_listener = UnixSockerListener(self.socket_path)
self.named_tuple_factory = namedtuple("Row", ["date", "node", "event_time", "category", "consistency", "error", "keyspace_name", "operation", "source", "table_name", "username"])
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.unix_socket_listener.shutdown()
if os.path.exists(self.socket_path):
os.remove(self.socket_path)
@override
def audit_mode(self) -> str:
return "syslog"
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
# This is a hack. The test framework uses "table" as "not none".
# Appropriate audit mode should be passed from the test itself, and not set here.
# This converts "table" to its own audit mode, or keeps "none" as is.
if "audit" in new_audit_settings and new_audit_settings["audit"] == "table":
new_audit_settings["audit"] = self.audit_mode()
new_audit_settings["audit_unix_socket_path"] = self.socket_path
for key in modifiers:
new_audit_settings[key] = modifiers[key]
return new_audit_settings
@override
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
lines = self.unix_socket_listener.get_lines()
entries = []
for idx, line in enumerate(lines):
entries.append(self.line_to_row(line, idx))
return { self.audit_mode(): entries }
def line_to_row(self, line, idx):
metadata, data = line.split(": ", 1)
data = "".join(data.splitlines()) # Remove newlines
fields = ["node", "category", "cl", "error", "keyspace", "query", "client_ip", "table", "username"]
regexp = ", ".join(f"{field}=\"(?P<{field}>.*)\"" for field in fields)
match = re.match(regexp, data)
# Arbitrary date because we don't really check the field. We just need to fill it with something
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
date = datetime.datetime(2000, 1, 1, 0, 0)
node = match.group("node").split(":")[0]
statement = match.group("query").replace("\\", "")
source = match.group("client_ip").split(":")[0]
event_time = uuid.UUID(int=idx)
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
return t
def update_socket_path(self):
new_socket_path = tempfile.mktemp(prefix="/tmp/scylla-audit-", suffix=".socket")
self.socket_path = new_socket_path
class AuditBackendComposite(AuditBackend):
audit_default_settings = {"audit": "table,syslog", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
backends: list[AuditBackend]
def __init__(self):
super().__init__()
self.backends = [AuditBackendTable(), AuditBackendSyslog()]
def __exit__(self, exc_type, exc_val, exc_tb):
for backend in reversed(self.backends):
try:
backend.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.error(f"Error while exiting backend: {e}")
@override
def audit_mode(self) -> str:
return ",".join([backend.audit_mode() for backend in self.backends])
def update_audit_settings(self, audit_settings, modifiers=None):
if modifiers is None:
modifiers = {}
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
# This is a hack. The test framework uses "table" as "not none".
# The syslog backend may change "table" to "syslog" before this is called.
# Appropriate audit mode should be passed from the test itself, and not set here.
# This converts "table" or "syslog" to its own audit mode, or keeps "none" as is.
for backend in self.backends:
new_audit_settings = backend.update_audit_settings(new_audit_settings)
if "audit" in new_audit_settings and (new_audit_settings["audit"] == "table" or new_audit_settings["audit"] == "syslog"):
new_audit_settings["audit"] = self.audit_mode()
for key in modifiers:
new_audit_settings[key] = modifiers[key]
return new_audit_settings
@override
def get_audit_log_dict(self, session, consistency_level):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
rows_dict = dict[str, list[AuditEntry]]()
for backend in self.backends:
backend_rows_dict = backend.get_audit_log_dict(session, consistency_level)
for mode, backend_rows in backend_rows_dict.items():
assert mode not in rows_dict
rows_dict[mode] = backend_rows
return rows_dict
@pytest.mark.single_node
class TestCQLAudit(AuditTester):
"""
Make sure CQL statements are audited
"""
AUDIT_LOG_QUERY = "SELECT * FROM audit.audit_log"
def deduplicate_audit_entries(self, entries_dict):
"""
Returns a dictionary mapping audit mode name to a list of audit entries with duplicate entries removed.
"""
deduplicated_entries_dict = dict[str, list[AuditEntry]]()
for mode, entries in entries_dict.items():
unique = set()
deduplicated_entries = list[AuditEntry]()
for entry in entries:
fields_subset = (entry.node, entry.category, entry.consistency, entry.error, entry.keyspace_name, entry.operation, entry.source, entry.table_name, entry.username)
if fields_subset in unique:
continue
unique.add(fields_subset)
deduplicated_entries.append(entry)
deduplicated_entries_dict[mode] = deduplicated_entries
return deduplicated_entries_dict
def get_audit_log_dict(self, session):
"""_summary_
returns a dictionary mapping audit mode name to a sorted list of audit log,
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
"""
consistency_level = ConsistencyLevel.QUORUM if len(self.cluster.nodelist()) > 1 else ConsistencyLevel.ONE
log_dict = self.helper.get_audit_log_dict(session, consistency_level)
logger.debug(f"get_audit_log_dict: {log_dict}")
return log_dict
# This assert is added just in order to still fail the test if the order of columns is changed, this is an implied assumption
def assert_audit_row_fields(self, row):
expected_fields = ["date", "node", "event_time", "category", "consistency", "error", "keyspace_name", "operation", "source", "table_name", "username"]
assert list(row._fields) == expected_fields
def assert_audit_row_eq( # noqa: PLR0913
self,
row,
category,
statement,
table="",
ks="ks",
user="anonymous",
cl="ONE",
error=False,
):
self.assert_audit_row_fields(row)
assert row.node in map(lambda x: x.address(), self.cluster.nodelist())
assert row.category == category
assert row.consistency == cl
assert row.error == error
assert row.keyspace_name == ks
assert row.operation == statement
assert row.source == "127.0.0.1"
assert row.table_name == table
assert row.username == user
def get_audit_entries_count_dict(self, session) -> dict[str, int]:
"""_summary_
returns a dictionary mapping audit mode name to the count of audit log entries for that mode.
"""
reg_dict = self.get_audit_log_dict(session)
reg_dict = self.filter_out_noise(reg_dict, filter_out_all_auth=True, filter_out_use=True)
for mode, reg_list in reg_dict.items():
logger.debug(f"Printing audit {mode} content:")
for row in reg_list:
logger.debug(" %s", row)
return { mode: len(reg_list) for mode, reg_list in reg_dict.items() }
@staticmethod
def token_in_range(token, start_token, end_token):
if end_token < start_token:
# the case where the end_token circles back to negative values
# {
# "start_token": "9216493101126045011",
# "end_token": "-9213846192999778351",
# "endpoints": [
# "172.19.0.6",
# "172.19.0.2",
# "172.19.0.3"
# ]
# }
return token >= start_token or token <= end_token
else:
return token >= start_token and token <= end_token
def get_partitions_for_token(self, ks, table, token):
partitions = set()
for node in self.cluster.nodelist():
result = run_rest_api(node, f"/storage_service/describe_ring/{ks}", api_method="GET", params={"table": table})
ranges = result.json()
for token_range in ranges:
start_token = int(token_range["start_token"])
end_token = int(token_range["end_token"])
if self.token_in_range(token, start_token, end_token):
logger.info(f"Found token {token} in token_range: {token_range}")
for e in token_range["endpoints"]:
partitions.add(e)
return partitions
def get_audit_partitions_for_operation(self, session, operation):
logger.info(f"get_audit_partitions_for_operation: {operation}")
rows = rows_to_list(session.execute(f"SELECT token(date, node) FROM audit.audit_log WHERE operation = %s ALLOW FILTERING", [operation]))
if len(rows) == 0:
return None
token = rows[0][0]
paritions = self.get_partitions_for_token("audit", "audit_log", token)
assert len(paritions) > 0
by_address = {node.address(): node for node in self.cluster.nodelist()}
return list(map(lambda p: by_address[p], paritions))
@contextmanager
def assert_exactly_n_audit_entries_were_added(self, session: Session, expected_entries: int):
counts_before = self.get_audit_entries_count_dict(session)
yield
counts_after = self.get_audit_entries_count_dict(session)
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
for mode, count_before in counts_before.items():
count_after = counts_after[mode]
assert count_after == count_before + expected_entries, f"Expected {expected_entries} new audit entries, but got {count_after - count_before} new entries"
@contextmanager
def assert_no_audit_entries_were_added(self, session):
counts_before = self.get_audit_entries_count_dict(session)
yield
counts_after = self.get_audit_entries_count_dict(session)
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
for mode, count_before in counts_before.items():
count_after = counts_after[mode]
assert count_before == count_after, f"audit entries count changed (before: {count_before} after: {count_after})"
def execute_and_validate_audit_entry( # noqa: PLR0913
self,
session: Session,
query: Any,
category: str,
audit_settings: dict[str, str] = AuditTester.audit_default_settings,
table: str = "",
ks: str = "ks",
cl: str = "ONE",
user: str = "anonymous",
expected_error: Any = None,
bound_values: list[Any] | None = None,
expect_new_audit_entry: bool = True,
expected_operation: str | None = None,
session_for_audit_entry_validation: Session | None = None,
):
"""
Execute a query and validate that an audit entry was added to the audit
log table. Use the audit_settings parameter in combination with category
to determine if the audit entry should be added or not. If the audit
entry is expected, validate that the audit entry's content is as
expected.
"""
# In some cases, provided session does not have access to the audit
# table. In that case, session_for_audit_entry_validation should be
# provided.
if session_for_audit_entry_validation is None:
session_for_audit_entry_validation = session
if category in audit_settings["audit_categories"].split(",") and expect_new_audit_entry:
operation = query if expected_operation is None else expected_operation
error = expected_error is not None
expected_entries = [AuditEntry(category, cl, error, ks, operation, table, user)]
else:
expected_entries = []
with self.assert_entries_were_added(session_for_audit_entry_validation, expected_entries):
if expected_error is None:
res = session.execute(query, bound_values)
else:
assert_invalid(session, query, expected=expected_error)
res = None
return res
# Filter out queries that can appear in random moments of the tests,
# such as LOGINs and USE statements.
def filter_out_noise(self, rows_dict, filter_out_all_auth=False, filter_out_cassandra_auth=False, filter_out_use=False) -> dict[str, list[AuditEntry]]:
for mode, rows in rows_dict.items():
if filter_out_all_auth:
rows = [row for row in rows if row.category != "AUTH"]
if filter_out_cassandra_auth:
rows = [row for row in rows if not (row.category == "AUTH" and row.username == "cassandra")]
if filter_out_use:
rows = [row for row in rows if "USE " not in row.operation]
rows_dict[mode] = rows
return rows_dict
@contextmanager
def assert_entries_were_added(self, session: Session, expected_entries: list[AuditEntry], merge_duplicate_rows: bool = True, filter_out_cassandra_auth: bool = False):
# Get audit entries before executing the query, to later compare with
# audit entries after executing the query.
set_of_rows_before_dict = dict[str, set[AuditEntry]]()
rows_before_dict = self.get_audit_log_dict(session)
for mode, rows_before in rows_before_dict.items():
set_of_rows_before = set(rows_before)
assert len(set_of_rows_before) == len(rows_before), f"audit {mode} contains duplicate rows: {rows_before}"
set_of_rows_before_dict[mode] = set_of_rows_before
yield
new_rows_dict = dict[str, list[AuditEntry]]()
def is_number_of_new_rows_correct():
rows_after_dict = self.get_audit_log_dict(session)
set_of_rows_after_dict = dict[str, set[AuditEntry]]()
for mode, rows_after in rows_after_dict.items():
set_of_rows_after = set(rows_after)
assert len(set_of_rows_after) == len(rows_after), f"audit {mode} contains duplicate rows: {rows_after}"
set_of_rows_after_dict[mode] = set_of_rows_after
nonlocal new_rows_dict
for mode, rows_after in rows_after_dict.items():
rows_before = rows_before_dict[mode]
new_rows_dict[mode] = rows_after[len(rows_before) :]
assert set(new_rows_dict[mode]) == set_of_rows_after_dict[mode] - set_of_rows_before_dict[mode], f"new rows are not the last rows in the audit table: rows_after={rows_after}, set_of_rows_after_dict[{mode}]={set_of_rows_after_dict[mode]}, set_of_rows_before_dict[{mode}]={set_of_rows_before_dict[mode]}"
if merge_duplicate_rows:
new_rows_dict = self.deduplicate_audit_entries(new_rows_dict)
auth_not_expected = (len([entry for entry in expected_entries if entry.category == "AUTH"]) == 0)
use_not_expected = (len([entry for entry in expected_entries if "USE " in entry.statement]) == 0)
new_rows_dict = self.filter_out_noise(
new_rows_dict,
filter_out_all_auth=auth_not_expected,
filter_out_cassandra_auth=filter_out_cassandra_auth,
filter_out_use=use_not_expected
)
for new_rows in new_rows_dict.values():
assert len(new_rows) <= len(expected_entries)
if len(new_rows) != len(expected_entries):
return False
return True
wait_for(is_number_of_new_rows_correct, timeout=60)
for mode, new_rows in new_rows_dict.items():
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
assert len(sorted_new_rows) == len(expected_entries)
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
def verify_keyspace(self, audit_settings=None, helper=None):
"""
CREATE KEYSPACE, USE KEYSPACE, ALTER KEYSPACE, DROP KEYSPACE statements
"""
session = self.prepare(create_keyspace=False, audit_settings=audit_settings, helper=helper)
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(session, query, category, audit_settings, **kwargs)
execute_and_validate_audit_entry(
"CREATE KEYSPACE ks WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
category="DDL",
)
execute_and_validate_audit_entry(
'USE "ks"',
category="DML",
)
execute_and_validate_audit_entry(
"ALTER KEYSPACE ks WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false",
category="DDL",
)
execute_and_validate_audit_entry(
"DROP KEYSPACE ks",
category="DDL",
)
# Test that the audit entries are not added if the keyspace is not
# specified in the audit_keyspaces setting.
keyspaces = audit_settings["audit_keyspaces"].split(",") if "audit_keyspaces" in audit_settings else []
assert "ks2" not in keyspaces
query_sequence = [
"CREATE KEYSPACE ks2 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
'USE "ks2"',
"ALTER KEYSPACE ks2 WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false",
"DROP KEYSPACE ks2",
]
with self.assert_no_audit_entries_were_added(session):
for query in query_sequence:
session.execute(query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_using_non_existent_keyspace(self, helper_class):
"""
Test tha using a non-existent keyspace generates an audit entry with an
error field set to True.
"""
with helper_class() as helper:
session = self.prepare(create_keyspace=False, helper=helper)
self.execute_and_validate_audit_entry(
session,
'USE "ks"', # ks doesn't exist because create_keyspace=False in prepare
category="DML",
expected_error=InvalidRequest,
)
def verify_table(self, audit_settings=AuditTester.audit_default_settings, helper=None, table_prefix="test", overwrite_audit_tables=False):
"""
CREATE TABLE, ALTER TABLE, TRUNCATE TABLE, DROP TABLE statements
"""
first_table = f"{table_prefix}_1_{uuid.uuid4().hex[:4]}"
second_table = f"{table_prefix}_2_{uuid.uuid4().hex[:4]}"
if helper is None:
helper = AuditBackendTable()
if overwrite_audit_tables:
audit_settings["audit_tables"] = f"ks.{first_table}, ks.{second_table}"
session = self.prepare(audit_settings=audit_settings, helper=helper)
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(session, query, category, audit_settings, **kwargs)
execute_and_validate_audit_entry(
f"CREATE TABLE {first_table} (k int PRIMARY KEY, v1 int)",
category="DDL",
table=first_table,
)
execute_and_validate_audit_entry(
f"CREATE TABLE {second_table} (k int, c1 int, v1 int, PRIMARY KEY (k, c1)) WITH COMPACT STORAGE",
category="DDL",
table=second_table,
)
execute_and_validate_audit_entry(
f"ALTER TABLE {first_table} ADD v2 int",
category="DDL",
table=first_table,
)
for table in [first_table, second_table]:
for i in range(10):
if table == first_table:
columns = "(k, v1, v2)"
else:
columns = "(k, c1, v1)"
execute_and_validate_audit_entry(
f"INSERT INTO {table} {columns} VALUES ({i}, {i}, {i})",
category="DML",
table=f"{table}",
)
res = execute_and_validate_audit_entry(
f"SELECT * FROM {table}",
category="QUERY",
table=f"{table}",
)
assert sorted(rows_to_list(res)) == [[i, i, i] for i in range(10)], res
execute_and_validate_audit_entry(
f"TRUNCATE {table}",
category="DML",
table=f"{table}",
)
res = execute_and_validate_audit_entry(
f"SELECT * FROM {table}",
category="QUERY",
table=f"{table}",
)
assert rows_to_list(res) == [], res
execute_and_validate_audit_entry(
f"DROP TABLE {table}",
category="DDL",
table=f"{table}",
)
execute_and_validate_audit_entry(
f"SELECT * FROM {table}",
category="QUERY",
table=f"{table}",
expected_error=InvalidRequest,
expect_new_audit_entry=False,
)
# Test that the audit entries are not added if the keyspace is not
# specified in the audit_keyspaces setting.
keyspaces = audit_settings["audit_keyspaces"].split(",") if "audit_keyspaces" in audit_settings else []
assert "ks2" not in keyspaces
query_sequence = [
"CREATE KEYSPACE ks2 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
f"CREATE TABLE ks2.{first_table} (k int PRIMARY KEY, v1 int)",
f"ALTER TABLE ks2.{first_table} ADD v2 int",
f"INSERT INTO ks2.{first_table} (k, v1, v2) VALUES (1, 1, 1)",
f"SELECT * FROM ks2.{first_table}",
f"TRUNCATE ks2.{first_table}",
f"DROP TABLE ks2.{first_table}",
]
with self.assert_no_audit_entries_were_added(session):
for query in query_sequence:
session.execute(query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings=AuditTester.audit_default_settings, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_extra_parameter(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,DML,DDL,DCL", "audit_keyspaces": "ks", "extra_parameter": "new"}, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_many_ks(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "a,b,c,ks"}, helper=helper)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_keyspace_table_not_exists(self, helper_class):
with helper_class() as helper:
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "DML,DDL", "audit_keyspaces": "ks", "audit_tables": "ks.fake"}, helper=helper)
def test_audit_type_none(self):
"""
'audit': None
CREATE KEYSPACE, USE KEYSPACE, ALTER KEYSPACE, DROP KEYSPACE statements
check audit KS not created
"""
audit_settings = {"audit": None, "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
session = self.prepare(create_keyspace=False, audit_settings=audit_settings)
session.execute("CREATE KEYSPACE ks WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true")
session.execute("USE ks")
session.execute("ALTER KEYSPACE ks WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false")
session.execute("DROP KEYSPACE ks")
assert_invalid(session, "use audit;", expected=InvalidRequest)
def test_audit_type_invalid(self):
"""
'audit': invalid
check node not started
"""
self.fixture_dtest_setup.allow_log_errors = True
audit_settings = {"audit": "invalid", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
cluster = self.cluster
cluster.set_configuration_options(values=audit_settings)
try:
cluster.populate(1).start(no_wait=True)
except (NodeError, RuntimeError):
pass
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit': invalid\)"
self.ignore_log_patterns.append(expected_error)
self.cluster.nodes["node1"].watch_log_for(expected_error)
def test_composite_audit_type_invalid(self):
"""
'audit': table,syslog,invalid
check node not started
"""
self.fixture_dtest_setup.allow_log_errors = True
audit_settings = {"audit": "table,syslog,invalid", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
cluster = self.cluster
cluster.set_configuration_options(values=audit_settings)
try:
cluster.populate(1).start(no_wait=True)
except (NodeError, RuntimeError):
pass
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit': invalid\)"
self.ignore_log_patterns.append(expected_error)
self.cluster.nodes["node1"].watch_log_for(expected_error)
# TODO: verify that the syslog file doesn't exist
def test_audit_empty_settings(self):
"""
'audit': none
check node started, ks audit not created
"""
session = self.prepare(create_keyspace=False, audit_settings={"audit": "none"})
assert_invalid(session, "use audit;", expected=InvalidRequest)
def test_composite_audit_empty_settings(self):
"""
'audit': table,syslog,none
check node started, ks audit not created
"""
session = self.prepare(create_keyspace=False, audit_settings={"audit": "table,syslog,none"})
assert_invalid(session, "use audit;", expected=InvalidRequest)
def test_audit_audit_ks(self):
"""
'audit_keyspaces': 'audit'
check node started, ks audit created
"""
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "audit"}
session = self.prepare(create_keyspace=False, audit_settings=audit_settings)
self.execute_and_validate_audit_entry(session, query=self.AUDIT_LOG_QUERY, category="QUERY", ks="audit", table="audit_log", audit_settings=audit_settings)
@pytest.mark.single_node
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_invalid(self, helper_class):
"""
'audit_categories': invalid
check node not started
"""
with helper_class() as helper:
self.fixture_dtest_setup.allow_log_errors = True
audit_settings = {"audit": "table", "audit_categories": "INVALID", "audit_keyspaces": "ks"}
cluster = self.cluster
cluster.set_configuration_options(values=audit_settings)
cluster.force_wait_for_cluster_start = False
try:
cluster.populate(1).start(no_wait=True)
except NodeError:
pass
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit_categories': INVALID\)"
self.ignore_log_patterns.append(expected_error)
self.cluster.nodes["node1"].watch_log_for(expected_error)
# compact storage is current required for all tests that call verify_table
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
def test_audit_table(self):
self.verify_table(audit_settings=AuditTester.audit_default_settings, table_prefix="test_audit_table")
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
def test_audit_table_extra_parameter(self):
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks", "extra_parameter": "new"}, table_prefix="test_audit_table_extra_parameter")
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
def test_audit_table_audit_keyspaces_empty(self):
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL"}, table_prefix="test_audit_table_audit_keyspaces_empty", overwrite_audit_tables=True)
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
def test_audit_table_no_ks(self):
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL"}, table_prefix="test_audit_table_no_ks", overwrite_audit_tables=True)
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
def test_audit_categories_part1(self):
self.verify_table(audit_settings={"audit": "table", "audit_categories": "AUTH,QUERY,DDL"}, table_prefix="test_audit_categories_part1", overwrite_audit_tables=True)
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_part2(self, helper_class):
with helper_class() as helper:
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH,DCL", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part2")
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_audit_categories_part3(self, helper_class):
with helper_class() as helper:
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part3")
PasswordMaskingCase = namedtuple("PasswordMaskingCase", ["name", "password", "new_password"])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_user_password_masking(self, helper_class):
"""
CREATE USER, ALTER USER, DROP USER statements
"""
with helper_class() as helper:
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs, user="cassandra", ks="")
tests = [self.PasswordMaskingCase("user1", "secret", "Secret^%$#@!"), self.PasswordMaskingCase("user2", "", "")]
for username, password, new_password in tests:
execute_and_validate_audit_entry(
f"CREATE USER {username} WITH PASSWORD '{password}'",
category="DCL",
expected_operation=f"CREATE USER {username} WITH PASSWORD '***'",
)
execute_and_validate_audit_entry(
f"ALTER USER {username} WITH PASSWORD '{new_password}'",
category="DCL",
expected_operation=f"ALTER USER {username} WITH PASSWORD '***'",
)
execute_and_validate_audit_entry(
f"DROP USER {username}",
category="DCL",
)
def test_negative_audit_records_auth(self):
"""
Test that failed AUTH attempts are audited.
"""
session = self.prepare(user="cassandra", password="cassandra")
expected_entry = AuditEntry(category="AUTH", statement="LOGIN", table="", ks="", user="wrong_user", cl="", error=True)
with self.assert_entries_were_added(session, [expected_entry], filter_out_cassandra_auth=True):
try:
bad_session = self.exclusive_cql_connection(self.cluster.nodelist()[0], user="wrong_user", password="wrong_password")
pytest.fail()
except NoHostAvailable as e:
errors = e.errors.values()
assert len(errors) == 1
error = next(iter(errors))
assert isinstance(error, AuthenticationFailed)
def test_negative_audit_records_admin(self):
"""
Test that failed ADMIN statements are audited.
"""
session = self.prepare(user="cassandra", password="cassandra")
session.execute("CREATE ROLE test_role")
stmt = "ATTACH SERVICE_LEVEL test_service_level TO test_role"
expected_entry = AuditEntry(category="ADMIN", table="", ks="", user="cassandra", cl="ONE", error=True, statement=stmt)
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=InvalidRequest)
def test_negative_audit_records_ddl(self):
"""
Test that failed DDL statements are audited.
"""
session = self.prepare(user="cassandra", password="cassandra")
stmt = "CREATE KEYSPACE ks WITH replication = { 'class':'NetworkTopologyStrategy', 'replication_factor': 3 }"
expected_entry = AuditEntry(category="DDL", table="", ks="ks", user="cassandra", cl="ONE", error=True, statement=stmt)
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=AlreadyExists)
def test_negative_audit_records_dml(self):
"""
Test that failed DML statements are audited.
"""
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
session = self.prepare(user="cassandra", password="cassandra", audit_settings=audit_settings)
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int, v2 int)")
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1, v2) VALUES (1, 1, 1)", consistency_level=ConsistencyLevel.TWO)
expected_entry = AuditEntry(category="DML", table="test1", ks="ks", user="cassandra", cl="TWO", error=True, statement=stmt.query_string)
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=Unavailable)
def test_negative_audit_records_dcl(self):
"""
Test that failed DCL statements are audited.
"""
session = self.prepare(user="cassandra", password="cassandra")
stmt = "GRANT SELECT ON ALL KEYSPACES TO test_role"
expected_entry = AuditEntry(category="DCL", table="", ks="", user="cassandra", cl="ONE", error=True, statement=stmt)
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=InvalidRequest)
def test_negative_audit_records_query(self):
"""
Test that failed QUERY statements are audited.
"""
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
session = self.prepare(audit_settings=audit_settings)
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v int)")
stmt = SimpleStatement("SELECT * FROM ks.test1", consistency_level=ConsistencyLevel.TWO)
expected_entry = AuditEntry(category="QUERY", table="test1", ks="ks", user="anonymous", cl="TWO", error=True, statement=stmt.query_string)
with self.assert_entries_were_added(session, [expected_entry]):
assert_invalid(session, stmt, expected=Unavailable)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_role_password_masking(self, helper_class):
"""
CREATE ROLE, ALTER ROLE, DROP ROLE statements
"""
with helper_class() as helper:
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs, user="cassandra", ks="")
tests = [self.PasswordMaskingCase("role1", "Secret!@#$", "Secret^%$#@!"), self.PasswordMaskingCase("role2", "", "")]
for role_name, password, new_password in tests:
execute_and_validate_audit_entry(
f"CREATE ROLE {role_name} WITH PASSWORD = '{password}'",
category="DCL",
expected_operation=f"CREATE ROLE {role_name} WITH PASSWORD = '***'",
)
execute_and_validate_audit_entry(
f"ALTER ROLE {role_name} WITH PASSWORD = '{new_password}'",
category="DCL",
expected_operation=f"ALTER ROLE {role_name} WITH PASSWORD = '***'",
)
execute_and_validate_audit_entry(
f"DROP ROLE {role_name}",
category="DCL",
)
def test_login(self):
"""
USER LOGIN
"""
session = self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
session.execute("CREATE USER test WITH PASSWORD 'test'")
expected_audit_entries = [AuditEntry(category="AUTH", statement="LOGIN", user="test", table="", ks="", cl="", error=False)]
with self.assert_entries_were_added(session, expected_audit_entries, filter_out_cassandra_auth=True):
self.prepare(user="test", password="test", create_keyspace=False)
def test_cassandra_login(self):
"""
Test user login to default (cassandra) user
"""
session = self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
expected_audit_entries = [AuditEntry(category="AUTH", statement="LOGIN", user="cassandra", table="", ks="", cl="", error=False)]
with self.assert_entries_were_added(session, expected_audit_entries, filter_out_cassandra_auth=False):
self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
def test_categories(self):
"""
Test filtering audit categories
"""
session = self.prepare(experimental=True, audit_settings={"audit": "table", "audit_categories": "DML", "audit_keyspaces": "ks"})
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs)
with self.assert_no_audit_entries_were_added(session):
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
session.execute("ALTER TABLE test1 ADD v2 int")
for i in range(10):
execute_and_validate_audit_entry(
f"INSERT INTO test1 (k, v1, v2) VALUES ({i}, {i}, {i})",
category="DML",
table="test1",
)
with self.assert_no_audit_entries_were_added(session):
res = sorted(session.execute("SELECT * FROM test1"))
assert rows_to_list(res) == [[i, i, i] for i in range(10)], res
execute_and_validate_audit_entry(
"TRUNCATE test1",
category="DML",
table="test1",
)
with self.assert_no_audit_entries_were_added(session):
res = session.execute("SELECT * FROM test1")
assert rows_to_list(res) == [], res
session.execute("DROP TABLE test1")
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
policy = cl_profile.retry_policy
retries = getattr(policy, "max_retries", None)
assert retries is not None
return 1 + retries
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
assert len(all_nodes) == 7
address_to_node = {node.address(): node for node in all_nodes}
node_to_audit_nodes = {}
for index, node in enumerate(all_nodes):
with self.patient_exclusive_cql_connection(node) as conn:
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES (1000, 1000)", consistency_level=ConsistencyLevel.THREE)
conn.execute(stmt)
audit_nodes = self.get_audit_partitions_for_operation(session, stmt.query_string)
node_to_audit_nodes[index] = set([node.address() for node in audit_nodes])
all_nodes_addresses = set([node.address() for node in all_nodes])
# the number here is arbitrary
for i in range(256):
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
session.execute(stmt)
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
partitions = self.get_partitions_for_token("ks", "test1", token)
for index, audit_nodes in node_to_audit_nodes.items():
common = audit_nodes & partitions
if not common:
insert_node = all_nodes_addresses - partitions - audit_nodes
if len(all_nodes_addresses) != 7 or len(partitions) != 3 or len(audit_nodes) != 3 or len(insert_node) != 1:
raise pytest.skip(f"Failed to assign nodes for insert failure test")
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
insert_node = address_to_node[insert_node.pop()]
kill_node = address_to_node[partitions.pop()]
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
return [], [], None, None, None
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
def test_insert_failure_doesnt_report_success(self):
"""
Test that if an insert fails, the audit log doesn't report the insert
as successful.
The test works by creating a table with RF=3, and then stopping one of
the nodes. Then, an insert is executed with CL=THREE. This insert will
fail, since the node that is stopped is a replica for the inserted
partition. The test verifies that the audit log doesn't report the
insert as successful and reports unsuccessful inserts as expected.
"""
cluster_topology = {"datacenter1": {"rack1": 3, "rack2": 2, "rack3": 2}}
session: Session = self.prepare(nodes=cluster_topology, rf=3)
with self.patient_exclusive_cql_connection(self.cluster.nodelist()[0]) as conn:
stmt = SimpleStatement("CREATE TABLE ks.test1 (k int PRIMARY KEY, v1 int)")
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
# TODO: remove the loop when scylladb#24473 is fixed
# We call get_host_id only to cache host_id
for node in audit_paritition_nodes + [insert_node] + [node_to_stop]:
node.cluster.manager.get_host_id(node.server_id)
if len(audit_paritition_nodes) != 3 or node_to_stop is None or insert_node is None:
raise pytest.skip(f"Failed to assign nodes for insert failure test")
for audit_node in audit_paritition_nodes:
logger.debug(f"audit_paritition_nodes: {audit_node.name} {audit_node.address()}")
logger.debug(f"node_to_stop: {node_to_stop.name} {node_to_stop.address()}")
logger.debug(f"insert_node: {insert_node.name} {insert_node.address()}")
with self.patient_exclusive_cql_connection(insert_node) as conn:
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1) VALUES (1, 1)", consistency_level=ConsistencyLevel.THREE)
with self.assert_exactly_n_audit_entries_were_added(session, 1):
conn.execute(stmt)
node_to_stop.stop(wait_other_notice=True)
with self.patient_exclusive_cql_connection(insert_node) as conn:
with pytest.raises(Unavailable):
stmt = SimpleStatement(query_to_fail, consistency_level=ConsistencyLevel.THREE)
conn.execute(stmt)
pytest.fail("Expected insert to fail")
node_to_stop.start(wait_for_binary_proto=True, wait_other_notice=True)
rows_dict = dict[str, list[AuditEntry]]()
timestamp_before = datetime.datetime.now()
for i in itertools.count(start=1):
if datetime.datetime.now() - timestamp_before > datetime.timedelta(seconds=60):
pytest.fail(f"audit log not updated after {i} iterations")
rows_dict = self.get_audit_log_dict(session)
# We need to satisfy the end state condition for all audit modes.
# If any audit mode is not done yet, continue polling.
all_modes_done = True
for mode, rows in rows_dict.items():
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
if len(rows_with_error) == query_fail_count:
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
assert rows_with_error[0].error is True
assert rows_with_error[0].consistency == "THREE"
# We expect the initial insert to be in the audit log.
# it is executed in _test_insert_failure_doesnt_report_success_assign_nodes
rows_without_error = [row for row in rows if row.operation == query_to_fail and not row.error]
assert len(rows_without_error) == 1
else:
# An audit mode is not done yet, early exit to continue polling.
all_modes_done = False
break
if all_modes_done:
break
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_prepare(self, helper_class):
"""Test prepare statement"""
with helper_class() as helper:
session = self.prepare(helper=helper)
session.execute(
"""
CREATE TABLE cf (
k varchar PRIMARY KEY,
c int,
)
"""
)
with self.assert_no_audit_entries_were_added(session):
query = "INSERT INTO cf (k, c) VALUES (?, ?);"
pq = session.prepare(query)
self.execute_and_validate_audit_entry(
session,
pq,
bound_values=["foo", 4],
category="DML",
expected_operation="INSERT INTO cf (k, c) VALUES (?, ?);",
table="cf",
)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_permissions(self, helper_class):
"""Test user permissions"""
with helper_class() as helper:
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
session.execute("CREATE USER test WITH PASSWORD 'test'")
session.execute("GRANT SELECT ON ks.test1 TO test")
session.execute("INSERT INTO test1 (k, v1) VALUES (1, 1)")
logging.info("Waiting for appling permissions")
for srv in self.cluster.manager.running_servers():
asyncio.run(read_barrier(self.cluster.manager.api, srv.ip_addr))
test_session = self.patient_cql_connection(self.cluster.nodelist()[0], user="test", password="test")
def execute_and_validate_audit_entry(query, category, **kwargs):
return self.execute_and_validate_audit_entry(test_session, query, category, session_for_audit_entry_validation=session, user="test", **kwargs)
execute_and_validate_audit_entry(
"SELECT * FROM ks.test1",
category="QUERY",
table="test1",
)
execute_and_validate_audit_entry(
"INSERT INTO ks.test1 (k, v1) VALUES (2, 2)",
category="DML",
table="test1",
expected_error=Unauthorized,
)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_batch(self, helper_class):
"""
BATCH statement
"""
with helper_class() as helper:
session = self.prepare(helper=helper)
session.execute(
"""
CREATE TABLE test8 (
userid text PRIMARY KEY,
name text,
password text
)
"""
)
batch_query = SimpleStatement(
"""
BEGIN BATCH
INSERT INTO test8 (userid, password, name) VALUES ('user1', 'ch@ngem3b', 'second user');
UPDATE test8 SET password = 'ps22dhds' WHERE userid = 'user3';
INSERT INTO test8 (userid, password) VALUES ('user4', 'ch@ngem3c');
DELETE name FROM test8 WHERE userid = 'user1';
APPLY BATCH;
""",
consistency_level=ConsistencyLevel.QUORUM,
)
expected_audit_operations = [
"INSERT INTO test8 (userid, password, name) VALUES (user1, ch@ngem3b, second user)",
"UPDATE test8 SET password = ps22dhds WHERE userid = user3",
"INSERT INTO test8 (userid, password) VALUES (user4, ch@ngem3c)",
"DELETE name FROM test8 WHERE userid = user1",
]
expected_entries = list(map(lambda query: AuditEntry(category="DML", statement=query, table="test8", ks="ks", user="anonymous", cl="QUORUM", error=False), expected_audit_operations))
with self.assert_entries_were_added(session, expected_entries, merge_duplicate_rows=False):
session.execute(batch_query)
def test_service_level_statements(self):
"""
Test auditing service level statements - ones that use the ADMIN audit category.
"""
audit_settings = {"audit": "table", "audit_categories": "ADMIN"}
session = self.prepare(user="cassandra", password="cassandra", audit_settings=audit_settings, jvm_args=["--smp", "1"])
# Create role to which a service level can be attached.
session.execute("CREATE ROLE test_role")
query_sequence = [
"CREATE SERVICE_LEVEL test_service_level WITH SHARES = 1",
"ATTACH SERVICE_LEVEL test_service_level TO test_role",
"DETACH SERVICE_LEVEL FROM test_role",
"LIST SERVICE_LEVEL test_service_level",
"LIST ALL SERVICE_LEVELS",
"LIST ATTACHED SERVICE_LEVEL OF test_role",
"LIST ALL ATTACHED SERVICE_LEVELS",
"ALTER SERVICE_LEVEL test_service_level WITH SHARES = 2",
"DROP SERVICE_LEVEL test_service_level",
]
# Execute previously defined service level statements.
# Validate that the audit log contains the expected entries.
for query in query_sequence:
self.execute_and_validate_audit_entry(session, query, category="ADMIN", audit_settings=audit_settings, ks="", user="cassandra")
# Create a session with the ADMIN category disabled to validate that
# the service level statements are not audited in that case.
session = self.prepare(user="cassandra", password="cassandra", audit_settings={"audit_categories": "QUERY,DML,DDL,DCL"}, reload_config=True)
# Execute previously defined service level statements.
# Validate that the audit log does not contain any entries.
with self.assert_no_audit_entries_were_added(session):
for query in query_sequence:
session.execute(query)
class AuditConfigChanger:
class ExpectedResult(enum.Enum):
SUCCESS = 1
FAILURE_UNPARSABLE_VALUE = 2
FAILURE_UNUPDATABLE_PARAM = 3
def __init__(self) -> None:
super().__init__()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def change_config(self, test, settings, expected_result):
assert False, "change_config not implemented in AuditConfigChanger"
def verify_change(self, node, param, value, from_mark, expected_result):
current_value = requests.get(f"http://{node.address()}:10000/v2/config/{param}").text.replace('"', "")
match expected_result:
case self.ExpectedResult.SUCCESS:
assert current_value == value, f"response: {current_value}, expected: {value}"
node.watch_log_for(r"Audit configuration is updated", from_mark=from_mark, timeout=60)
case self.ExpectedResult.FAILURE_UNPARSABLE_VALUE:
assert current_value == value, f"response: {current_value}, expected: {value}"
node.watch_log_for(r"Audit configuration update failed ", from_mark=from_mark, timeout=60)
case self.ExpectedResult.FAILURE_UNUPDATABLE_PARAM:
assert current_value != value, f"response: {current_value}, expected different than: {value}"
class AuditSighupConfigChanger(AuditConfigChanger):
def change_config(self, test, settings, expected_result):
for node in test.cluster.nodelist():
logger.info(f"Changing config via manager.server_update_config: Node={node.address()} settings={settings}")
mark = node.mark_log()
for param, value in settings.items():
mark_per_param = node.mark_log()
logger.info(f"server_update_config: param={param} value={value}")
node.cluster.manager.server_update_config(node.server_id, param, value)
node.watch_log_for(r"completed re-reading configuration file", from_mark=mark_per_param, timeout=60)
for param, value in settings.items():
self.verify_change(node, param, value, mark, expected_result)
class AuditCqlConfigChanger(AuditConfigChanger):
def change_config(self, test, settings, expected_result):
for node in test.cluster.nodelist():
mark = node.mark_log()
for param in settings:
with test.patient_exclusive_cql_connection(node) as session:
logger.debug(f"Changing config via CQL: Node={node.address()} param={param} value={settings[param]}")
try:
session.execute(f"UPDATE system.config SET value='{settings[param]}' where name='{param}';")
except WriteFailure:
assert expected_result == self.ExpectedResult.FAILURE_UNUPDATABLE_PARAM, f"CQL execution failed but expected_result: {expected_result}"
for param in settings:
self.verify_change(node, param, settings[param], mark, expected_result)
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_config_liveupdate(self, helper_class, audit_config_changer):
"""
Test liveupdate config changes in audit.
Liveupdate categories, tables, and keyspaces and confirm proper audit behavior.
Execute with both audit sinks (Table, Syslog) and both liveupdate methods (SIGHUP, CQL).
Use multi-node configuration (nodes=3).
"""
with helper_class() as helper, audit_config_changer() as config_changer:
self.fixture_dtest_setup.allow_log_errors = True
default_categories = copy.deepcopy(self.audit_default_settings["audit_categories"])
enabled_but_empty_audit_config = helper.update_audit_settings(self.audit_default_settings, modifiers={"audit_categories": "", "audit_tables": "", "audit_keyspaces": ""})
session = self.prepare(helper=helper, nodes=3, audit_settings=enabled_but_empty_audit_config)
session.execute(
"""
CREATE TABLE test_config_lifeupdate (
userid text PRIMARY KEY,
name text,
password text
)
"""
)
auditted_query = SimpleStatement("INSERT INTO test_config_lifeupdate (userid, password, name) VALUES ('user2', 'password2', 'second user');", consistency_level=ConsistencyLevel.QUORUM)
expected_new_entries = [AuditEntry(category="DML", statement=auditted_query.query_string, table="test_config_lifeupdate", ks="ks", user="anonymous", cl="QUORUM", error=False)]
# Started with enabled_but_empty_audit_config config: no auditing
with self.assert_no_audit_entries_were_added(session):
session.execute(auditted_query)
# Config modified with correct categories, multiple tables, and multiple keyspaces: audit works
config_changer.change_config(self, {"audit_categories": default_categories, "audit_tables": "test.table1,test.table2", "audit_keyspaces": "ks,ks2,ks3"}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
session.execute(auditted_query)
# Conifg modified with invalid categories: audit works, because previous audit configuration is used
config_changer.change_config(self, {"audit_categories": "INVALID_CATEGORIES"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNPARSABLE_VALUE)
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
session.execute(auditted_query)
# Conifg modified with valid categories but invalid tables and and non-existing keyspaces: no auditing
config_changer.change_config(
self, {"audit_categories": default_categories, "audit_tables": "invalid.table.twodots", "audit_keyspaces": "non-existing"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNPARSABLE_VALUE
)
with self.assert_no_audit_entries_were_added(session):
session.execute(auditted_query)
# Config modified with valid tables and empty keyspaces: audit works
config_changer.change_config(self, {"audit_tables": "ks.test_config_lifeupdate", "audit_keyspaces": ""}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
session.execute(auditted_query)
# Set empty categories, tables, keyspaces: no auditing
config_changer.change_config(self, {"audit_categories": "", "audit_tables": "", "audit_keyspaces": ""}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
with self.assert_no_audit_entries_were_added(session):
session.execute(auditted_query)
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_config_no_liveupdate(self, helper_class, audit_config_changer):
"""
Test audit config parameters that don't allow config changes.
Modification of "audit", "audit_unix_socket_path", and "audit_syslog_write_buffer_size" should be forbidden.
"""
with helper_class() as helper, audit_config_changer() as config_changer:
self.fixture_dtest_setup.allow_log_errors = True
session = self.prepare(helper=helper)
session.execute(
"""
CREATE TABLE test_config_no_lifeupdate (
userid text PRIMARY KEY,
name text,
password text
)
"""
)
auditted_query = SimpleStatement("INSERT INTO test_config_no_lifeupdate (userid, password, name) VALUES ('user2', 'password2', 'second user');", consistency_level=ConsistencyLevel.QUORUM)
expected_new_entries = [AuditEntry(category="DML", statement=auditted_query.query_string, table="test_config_no_lifeupdate", ks="ks", user="anonymous", cl="QUORUM", error=False)]
# Modifications of "audit", "audit_unix_socket_path", "audit_syslog_write_buffer_size" are forbidden and will fail
config_changer.change_config(self, {"audit": "none"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
config_changer.change_config(self, {"audit_unix_socket_path": "/path/"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
config_changer.change_config(self, {"audit_syslog_write_buffer_size": "123123123"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
# Despite unsuccesful attempts to change config, audit works as expected
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
session.execute(auditted_query)
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
def test_parallel_syslog_audit(self, helper_class):
"""
Test that cluster doesn't fail if multiple queries are audited in parallel
"""
with helper_class() as helper:
audit_settings = helper.update_audit_settings(self.audit_default_settings, modifiers={"audit_keyspaces": "ks,kss"})
session = self.prepare(helper=helper, audit_settings=audit_settings)
session.execute("CREATE KEYSPACE kss WITH replication = { 'class':'NetworkTopologyStrategy', 'replication_factor': 3 }")
run_in_parallel([{"func": lambda: session.execute("use ks;")}, {"func": lambda: session.execute("use kss;")}] * 1000)