`test_insert_failure_doesnt_report_success` test in `test/cluster/dtest/audit_test.py` has an insert statement that is expected to fail. Dtest environment uses `FlakyRetryPolicy`, which has `max_retries = 5`. 1 initial fail and 5 retry fails means we expect 6 error audit logs. The test failed because `create keyspace ks` failed once, then succeeded on retry. It allowed the test to proceed properly, but the last part of the test that expects exactly 6 failed queries actually had 7. The goal of this patch is to make sure there are exactly 6 = 1 + `max_retries` failed queries, counting only the query expected to fail. If other queries fail with successful retry, it's fine. If other queries fail without successful retry, the test will fail, as it should in such situations. They are not related to this expected failed insert statement. Fixes #27322 Closes scylladb/scylladb#27378
1560 lines
73 KiB
Python
1560 lines
73 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import asyncio
|
|
import copy
|
|
import datetime
|
|
import enum
|
|
import itertools
|
|
import logging
|
|
import os.path
|
|
import re
|
|
import socket
|
|
import socketserver
|
|
import tempfile
|
|
import threading
|
|
import uuid
|
|
from collections import namedtuple
|
|
from contextlib import contextmanager
|
|
from dataclasses import dataclass
|
|
from typing import Any, Optional, override
|
|
|
|
import pytest
|
|
import requests
|
|
from cassandra import AlreadyExists, AuthenticationFailed, ConsistencyLevel, InvalidRequest, Unauthorized, Unavailable, WriteFailure
|
|
from cassandra.cluster import NoHostAvailable, Session, EXEC_PROFILE_DEFAULT
|
|
from cassandra.query import SimpleStatement, named_tuple_factory
|
|
from ccmlib.scylla_node import ScyllaNode, NodeError
|
|
|
|
from dtest_class import Tester, create_ks, wait_for
|
|
from tools.assertions import assert_invalid
|
|
from tools.cluster import run_rest_api
|
|
from tools.data import rows_to_list, run_in_parallel
|
|
|
|
from test.pylib.rest_client import read_barrier
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuditRowMustNotExistError(Exception):
|
|
pass
|
|
|
|
|
|
class AuditTester(Tester):
|
|
audit_default_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
def prepare( # noqa: PLR0913
|
|
self,
|
|
ordered=False,
|
|
create_keyspace=True,
|
|
use_cache=False,
|
|
nodes=1,
|
|
rf=1,
|
|
protocol_version=None,
|
|
user=None,
|
|
password=None,
|
|
audit_settings=audit_default_settings,
|
|
reload_config=False,
|
|
helper=None,
|
|
**kwargs,
|
|
):
|
|
logger.debug(f"Preparing cluster with {nodes} node(s): rf={rf} ordered={ordered} use_cache={use_cache} audit_settings={audit_settings}")
|
|
|
|
cluster = self.cluster
|
|
|
|
cluster.set_configuration_options(values={"logger_log_level": {"audit": "debug"}})
|
|
|
|
if helper is None:
|
|
self.helper = AuditBackendTable()
|
|
else:
|
|
self.helper = helper
|
|
self.helper.before_cluster_start()
|
|
audit_settings = self.helper.update_audit_settings(audit_settings)
|
|
|
|
if ordered:
|
|
cluster.set_partitioner("org.apache.cassandra.dht.ByteOrderedPartitioner")
|
|
|
|
if use_cache:
|
|
cluster.set_configuration_options(values={"row_cache_size_in_mb": 100})
|
|
|
|
jvm_args = kwargs.pop("jvm_args", [])
|
|
|
|
cluster.set_configuration_options(values=audit_settings)
|
|
|
|
if user:
|
|
config = {"authenticator": "org.apache.cassandra.auth.PasswordAuthenticator", "authorizer": "org.apache.cassandra.auth.CassandraAuthorizer", "permissions_validity_in_ms": 0}
|
|
cluster.set_configuration_options(values=config)
|
|
|
|
if reload_config:
|
|
# The cluster is restarted to reload the config file.
|
|
cluster.stop()
|
|
cluster.start(wait_for_binary_proto=True)
|
|
|
|
if not cluster.nodelist():
|
|
if not isinstance(nodes, dict):
|
|
nodes = [nodes]
|
|
cluster.populate(nodes).start(wait_for_binary_proto=True, jvm_args=jvm_args)
|
|
node1 = cluster.nodelist()[0]
|
|
|
|
session = self.patient_cql_connection(node1, protocol_version=protocol_version, user=user, password=password)
|
|
if create_keyspace:
|
|
session.execute("DROP KEYSPACE IF EXISTS ks")
|
|
create_ks(session, "ks", rf)
|
|
return session
|
|
|
|
|
|
@dataclass(eq=True, order=True)
|
|
class AuditEntry:
|
|
category: str
|
|
cl: str
|
|
error: bool
|
|
ks: str
|
|
statement: str
|
|
table: str
|
|
user: str
|
|
|
|
class AuditBackend:
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
def audit_mode(self) -> str:
|
|
raise NotImplementedError
|
|
|
|
def before_cluster_start(self):
|
|
pass
|
|
|
|
def get_audit_log_dict(self, session, consistency_level):
|
|
raise NotImplementedError
|
|
|
|
|
|
class AuditBackendTable(AuditBackend):
|
|
AUDIT_LOG_QUERY = "SELECT * FROM audit.audit_log"
|
|
|
|
audit_default_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
@override
|
|
def audit_mode(self) -> str:
|
|
return "table"
|
|
|
|
def update_audit_settings(self, audit_settings, modifiers=None):
|
|
if modifiers is None:
|
|
modifiers = {}
|
|
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
|
|
for key in modifiers:
|
|
new_audit_settings[key] = modifiers[key]
|
|
return new_audit_settings
|
|
|
|
@override
|
|
def get_audit_log_dict(self, session, consistency_level):
|
|
"""_summary_
|
|
returns a dictionary mapping audit mode name to a sorted list of audit log,
|
|
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
|
|
"""
|
|
# We would like to have named tuples as results so we can verify the
|
|
# order in which the fields are returned as the tests make assumptions about this.
|
|
assert session.row_factory == named_tuple_factory
|
|
res = session.execute(SimpleStatement(self.AUDIT_LOG_QUERY, consistency_level=consistency_level))
|
|
res_list = list(res)
|
|
res_list.sort(key=lambda row: (row.event_time.time, row.node))
|
|
return { self.audit_mode(): res_list }
|
|
|
|
|
|
class UnixSockerListener:
|
|
class UnixDatagramHandler(socketserver.BaseRequestHandler):
|
|
|
|
def handle(self):
|
|
with self.server.parent_instance.condition:
|
|
data = self.request[0].decode("utf-8").strip()
|
|
if data.startswith(self.server.parent_instance.notification_msg):
|
|
received_id = int(data[len(self.server.parent_instance.notification_msg):])
|
|
self.server.parent_instance.notification_id = received_id
|
|
self.server.parent_instance.condition.notify_all()
|
|
elif data != "Initializing syslog audit backend.":
|
|
self.server.parent_instance.lines.append(data)
|
|
|
|
class UnixDatagramServer(socketserver.ThreadingUnixDatagramServer):
|
|
def __init__(self, socket_path, handler, parent_instance, lock):
|
|
self.parent_instance = parent_instance
|
|
self.mutex = lock
|
|
super().__init__(socket_path, handler)
|
|
|
|
def __init__(self, socket_path):
|
|
self.socket_path = socket_path
|
|
self.lines = []
|
|
self.server = self.UnixDatagramServer(socket_path, self.UnixDatagramHandler, self, threading.Lock())
|
|
self.server.server_activate()
|
|
|
|
self.notification_msg = "Notifying syslog server with id: "
|
|
self.notification_id = 0
|
|
self.condition = threading.Condition(self.server.mutex)
|
|
|
|
self.thread = threading.Thread(target=self.server.serve_forever)
|
|
self.thread.start()
|
|
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
|
|
def wait_for_newer_notification_id(self):
|
|
with self.condition:
|
|
expected_id = self.notification_id + 1
|
|
self.sock.sendto(bytes(self.notification_msg + str(expected_id), 'utf-8'), self.socket_path)
|
|
self.condition.wait_for(lambda: self.notification_id == expected_id)
|
|
|
|
def get_lines(self):
|
|
# Make sure all in-progress handle() calls are finished
|
|
self.wait_for_newer_notification_id()
|
|
return copy.deepcopy(self.lines)
|
|
|
|
def shutdown(self):
|
|
self.server.shutdown()
|
|
self.thread.join()
|
|
self.server.server_close()
|
|
self.sock.close()
|
|
|
|
|
|
class AuditBackendSyslog(AuditBackend):
|
|
audit_default_settings = {"audit": "syslog", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.update_socket_path()
|
|
self.unix_socket_listener = UnixSockerListener(self.socket_path)
|
|
self.named_tuple_factory = namedtuple("Row", ["date", "node", "event_time", "category", "consistency", "error", "keyspace_name", "operation", "source", "table_name", "username"])
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.unix_socket_listener.shutdown()
|
|
if os.path.exists(self.socket_path):
|
|
os.remove(self.socket_path)
|
|
|
|
@override
|
|
def audit_mode(self) -> str:
|
|
return "syslog"
|
|
|
|
def update_audit_settings(self, audit_settings, modifiers=None):
|
|
if modifiers is None:
|
|
modifiers = {}
|
|
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
|
|
# This is a hack. The test framework uses "table" as "not none".
|
|
# Appropriate audit mode should be passed from the test itself, and not set here.
|
|
# This converts "table" to its own audit mode, or keeps "none" as is.
|
|
if "audit" in new_audit_settings and new_audit_settings["audit"] == "table":
|
|
new_audit_settings["audit"] = self.audit_mode()
|
|
new_audit_settings["audit_unix_socket_path"] = self.socket_path
|
|
for key in modifiers:
|
|
new_audit_settings[key] = modifiers[key]
|
|
return new_audit_settings
|
|
|
|
@override
|
|
def get_audit_log_dict(self, session, consistency_level):
|
|
"""_summary_
|
|
returns a dictionary mapping audit mode name to a sorted list of audit log,
|
|
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
|
|
"""
|
|
lines = self.unix_socket_listener.get_lines()
|
|
entries = []
|
|
for idx, line in enumerate(lines):
|
|
entries.append(self.line_to_row(line, idx))
|
|
return { self.audit_mode(): entries }
|
|
|
|
def line_to_row(self, line, idx):
|
|
metadata, data = line.split(": ", 1)
|
|
data = "".join(data.splitlines()) # Remove newlines
|
|
fields = ["node", "category", "cl", "error", "keyspace", "query", "client_ip", "table", "username"]
|
|
regexp = ", ".join(f"{field}=\"(?P<{field}>.*)\"" for field in fields)
|
|
match = re.match(regexp, data)
|
|
|
|
# Arbitrary date because we don't really check the field. We just need to fill it with something
|
|
# and make sure it doesn't change during the test (e.g. when the test is running at 23:59:59)
|
|
date = datetime.datetime(2000, 1, 1, 0, 0)
|
|
|
|
node = match.group("node").split(":")[0]
|
|
statement = match.group("query").replace("\\", "")
|
|
source = match.group("client_ip").split(":")[0]
|
|
event_time = uuid.UUID(int=idx)
|
|
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
|
|
return t
|
|
|
|
def update_socket_path(self):
|
|
new_socket_path = tempfile.mktemp(prefix="/tmp/scylla-audit-", suffix=".socket")
|
|
self.socket_path = new_socket_path
|
|
|
|
|
|
class AuditBackendComposite(AuditBackend):
|
|
audit_default_settings = {"audit": "table,syslog", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
backends: list[AuditBackend]
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.backends = [AuditBackendTable(), AuditBackendSyslog()]
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
for backend in reversed(self.backends):
|
|
try:
|
|
backend.__exit__(exc_type, exc_val, exc_tb)
|
|
except Exception as e:
|
|
logger.error(f"Error while exiting backend: {e}")
|
|
|
|
@override
|
|
def audit_mode(self) -> str:
|
|
return ",".join([backend.audit_mode() for backend in self.backends])
|
|
|
|
def update_audit_settings(self, audit_settings, modifiers=None):
|
|
if modifiers is None:
|
|
modifiers = {}
|
|
new_audit_settings = copy.deepcopy(audit_settings or self.audit_default_settings)
|
|
# This is a hack. The test framework uses "table" as "not none".
|
|
# The syslog backend may change "table" to "syslog" before this is called.
|
|
# Appropriate audit mode should be passed from the test itself, and not set here.
|
|
# This converts "table" or "syslog" to its own audit mode, or keeps "none" as is.
|
|
for backend in self.backends:
|
|
new_audit_settings = backend.update_audit_settings(new_audit_settings)
|
|
if "audit" in new_audit_settings and (new_audit_settings["audit"] == "table" or new_audit_settings["audit"] == "syslog"):
|
|
new_audit_settings["audit"] = self.audit_mode()
|
|
for key in modifiers:
|
|
new_audit_settings[key] = modifiers[key]
|
|
return new_audit_settings
|
|
|
|
@override
|
|
def get_audit_log_dict(self, session, consistency_level):
|
|
"""_summary_
|
|
returns a dictionary mapping audit mode name to a sorted list of audit log,
|
|
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
|
|
"""
|
|
rows_dict = dict[str, list[AuditEntry]]()
|
|
for backend in self.backends:
|
|
backend_rows_dict = backend.get_audit_log_dict(session, consistency_level)
|
|
for mode, backend_rows in backend_rows_dict.items():
|
|
assert mode not in rows_dict
|
|
rows_dict[mode] = backend_rows
|
|
return rows_dict
|
|
|
|
|
|
@pytest.mark.single_node
|
|
class TestCQLAudit(AuditTester):
|
|
"""
|
|
Make sure CQL statements are audited
|
|
"""
|
|
|
|
AUDIT_LOG_QUERY = "SELECT * FROM audit.audit_log"
|
|
|
|
def deduplicate_audit_entries(self, entries_dict):
|
|
"""
|
|
Returns a dictionary mapping audit mode name to a list of audit entries with duplicate entries removed.
|
|
"""
|
|
deduplicated_entries_dict = dict[str, list[AuditEntry]]()
|
|
|
|
for mode, entries in entries_dict.items():
|
|
unique = set()
|
|
deduplicated_entries = list[AuditEntry]()
|
|
for entry in entries:
|
|
fields_subset = (entry.node, entry.category, entry.consistency, entry.error, entry.keyspace_name, entry.operation, entry.source, entry.table_name, entry.username)
|
|
|
|
if fields_subset in unique:
|
|
continue
|
|
|
|
unique.add(fields_subset)
|
|
deduplicated_entries.append(entry)
|
|
deduplicated_entries_dict[mode] = deduplicated_entries
|
|
|
|
return deduplicated_entries_dict
|
|
|
|
def get_audit_log_dict(self, session):
|
|
"""_summary_
|
|
returns a dictionary mapping audit mode name to a sorted list of audit log,
|
|
the logs are sorted by the event times (time-uuid) with the node as tie breaker.
|
|
"""
|
|
consistency_level = ConsistencyLevel.QUORUM if len(self.cluster.nodelist()) > 1 else ConsistencyLevel.ONE
|
|
log_dict = self.helper.get_audit_log_dict(session, consistency_level)
|
|
logger.debug(f"get_audit_log_dict: {log_dict}")
|
|
return log_dict
|
|
|
|
# This assert is added just in order to still fail the test if the order of columns is changed, this is an implied assumption
|
|
def assert_audit_row_fields(self, row):
|
|
expected_fields = ["date", "node", "event_time", "category", "consistency", "error", "keyspace_name", "operation", "source", "table_name", "username"]
|
|
assert list(row._fields) == expected_fields
|
|
|
|
def assert_audit_row_eq( # noqa: PLR0913
|
|
self,
|
|
row,
|
|
category,
|
|
statement,
|
|
table="",
|
|
ks="ks",
|
|
user="anonymous",
|
|
cl="ONE",
|
|
error=False,
|
|
):
|
|
self.assert_audit_row_fields(row)
|
|
assert row.node in map(lambda x: x.address(), self.cluster.nodelist())
|
|
assert row.category == category
|
|
assert row.consistency == cl
|
|
assert row.error == error
|
|
assert row.keyspace_name == ks
|
|
assert row.operation == statement
|
|
assert row.source == "127.0.0.1"
|
|
assert row.table_name == table
|
|
assert row.username == user
|
|
|
|
def get_audit_entries_count_dict(self, session) -> dict[str, int]:
|
|
"""_summary_
|
|
returns a dictionary mapping audit mode name to the count of audit log entries for that mode.
|
|
"""
|
|
reg_dict = self.get_audit_log_dict(session)
|
|
reg_dict = self.filter_out_noise(reg_dict, filter_out_all_auth=True, filter_out_use=True)
|
|
for mode, reg_list in reg_dict.items():
|
|
logger.debug(f"Printing audit {mode} content:")
|
|
for row in reg_list:
|
|
logger.debug(" %s", row)
|
|
return { mode: len(reg_list) for mode, reg_list in reg_dict.items() }
|
|
|
|
@staticmethod
|
|
def token_in_range(token, start_token, end_token):
|
|
if end_token < start_token:
|
|
# the case where the end_token circles back to negative values
|
|
# {
|
|
# "start_token": "9216493101126045011",
|
|
# "end_token": "-9213846192999778351",
|
|
# "endpoints": [
|
|
# "172.19.0.6",
|
|
# "172.19.0.2",
|
|
# "172.19.0.3"
|
|
# ]
|
|
# }
|
|
return token >= start_token or token <= end_token
|
|
else:
|
|
return token >= start_token and token <= end_token
|
|
|
|
def get_partitions_for_token(self, ks, table, token):
|
|
partitions = set()
|
|
for node in self.cluster.nodelist():
|
|
result = run_rest_api(node, f"/storage_service/describe_ring/{ks}", api_method="GET", params={"table": table})
|
|
ranges = result.json()
|
|
for token_range in ranges:
|
|
start_token = int(token_range["start_token"])
|
|
end_token = int(token_range["end_token"])
|
|
if self.token_in_range(token, start_token, end_token):
|
|
logger.info(f"Found token {token} in token_range: {token_range}")
|
|
for e in token_range["endpoints"]:
|
|
partitions.add(e)
|
|
return partitions
|
|
|
|
def get_audit_partitions_for_operation(self, session, operation):
|
|
logger.info(f"get_audit_partitions_for_operation: {operation}")
|
|
rows = rows_to_list(session.execute(f"SELECT token(date, node) FROM audit.audit_log WHERE operation = %s ALLOW FILTERING", [operation]))
|
|
if len(rows) == 0:
|
|
return None
|
|
token = rows[0][0]
|
|
|
|
paritions = self.get_partitions_for_token("audit", "audit_log", token)
|
|
|
|
assert len(paritions) > 0
|
|
by_address = {node.address(): node for node in self.cluster.nodelist()}
|
|
return list(map(lambda p: by_address[p], paritions))
|
|
|
|
@contextmanager
|
|
def assert_exactly_n_audit_entries_were_added(self, session: Session, expected_entries: int):
|
|
counts_before = self.get_audit_entries_count_dict(session)
|
|
yield
|
|
counts_after = self.get_audit_entries_count_dict(session)
|
|
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
|
|
for mode, count_before in counts_before.items():
|
|
count_after = counts_after[mode]
|
|
assert count_after == count_before + expected_entries, f"Expected {expected_entries} new audit entries, but got {count_after - count_before} new entries"
|
|
|
|
@contextmanager
|
|
def assert_no_audit_entries_were_added(self, session):
|
|
counts_before = self.get_audit_entries_count_dict(session)
|
|
yield
|
|
counts_after = self.get_audit_entries_count_dict(session)
|
|
assert set(counts_before.keys()) == set(counts_after.keys()), f"audit modes changed (before: {list(counts_before.keys())} after: {list(counts_after.keys())})"
|
|
for mode, count_before in counts_before.items():
|
|
count_after = counts_after[mode]
|
|
assert count_before == count_after, f"audit entries count changed (before: {count_before} after: {count_after})"
|
|
|
|
def execute_and_validate_audit_entry( # noqa: PLR0913
|
|
self,
|
|
session: Session,
|
|
query: Any,
|
|
category: str,
|
|
audit_settings: dict[str, str] = AuditTester.audit_default_settings,
|
|
table: str = "",
|
|
ks: str = "ks",
|
|
cl: str = "ONE",
|
|
user: str = "anonymous",
|
|
expected_error: Any = None,
|
|
bound_values: list[Any] | None = None,
|
|
expect_new_audit_entry: bool = True,
|
|
expected_operation: str | None = None,
|
|
session_for_audit_entry_validation: Session | None = None,
|
|
):
|
|
"""
|
|
Execute a query and validate that an audit entry was added to the audit
|
|
log table. Use the audit_settings parameter in combination with category
|
|
to determine if the audit entry should be added or not. If the audit
|
|
entry is expected, validate that the audit entry's content is as
|
|
expected.
|
|
"""
|
|
|
|
# In some cases, provided session does not have access to the audit
|
|
# table. In that case, session_for_audit_entry_validation should be
|
|
# provided.
|
|
if session_for_audit_entry_validation is None:
|
|
session_for_audit_entry_validation = session
|
|
|
|
if category in audit_settings["audit_categories"].split(",") and expect_new_audit_entry:
|
|
operation = query if expected_operation is None else expected_operation
|
|
error = expected_error is not None
|
|
|
|
expected_entries = [AuditEntry(category, cl, error, ks, operation, table, user)]
|
|
else:
|
|
expected_entries = []
|
|
|
|
with self.assert_entries_were_added(session_for_audit_entry_validation, expected_entries):
|
|
if expected_error is None:
|
|
res = session.execute(query, bound_values)
|
|
else:
|
|
assert_invalid(session, query, expected=expected_error)
|
|
res = None
|
|
|
|
return res
|
|
|
|
# Filter out queries that can appear in random moments of the tests,
|
|
# such as LOGINs and USE statements.
|
|
def filter_out_noise(self, rows_dict, filter_out_all_auth=False, filter_out_cassandra_auth=False, filter_out_use=False) -> dict[str, list[AuditEntry]]:
|
|
for mode, rows in rows_dict.items():
|
|
if filter_out_all_auth:
|
|
rows = [row for row in rows if row.category != "AUTH"]
|
|
if filter_out_cassandra_auth:
|
|
rows = [row for row in rows if not (row.category == "AUTH" and row.username == "cassandra")]
|
|
if filter_out_use:
|
|
rows = [row for row in rows if "USE " not in row.operation]
|
|
rows_dict[mode] = rows
|
|
return rows_dict
|
|
|
|
@contextmanager
|
|
def assert_entries_were_added(self, session: Session, expected_entries: list[AuditEntry], merge_duplicate_rows: bool = True, filter_out_cassandra_auth: bool = False):
|
|
# Get audit entries before executing the query, to later compare with
|
|
# audit entries after executing the query.
|
|
set_of_rows_before_dict = dict[str, set[AuditEntry]]()
|
|
rows_before_dict = self.get_audit_log_dict(session)
|
|
for mode, rows_before in rows_before_dict.items():
|
|
set_of_rows_before = set(rows_before)
|
|
assert len(set_of_rows_before) == len(rows_before), f"audit {mode} contains duplicate rows: {rows_before}"
|
|
set_of_rows_before_dict[mode] = set_of_rows_before
|
|
yield
|
|
|
|
new_rows_dict = dict[str, list[AuditEntry]]()
|
|
def is_number_of_new_rows_correct():
|
|
rows_after_dict = self.get_audit_log_dict(session)
|
|
set_of_rows_after_dict = dict[str, set[AuditEntry]]()
|
|
for mode, rows_after in rows_after_dict.items():
|
|
set_of_rows_after = set(rows_after)
|
|
assert len(set_of_rows_after) == len(rows_after), f"audit {mode} contains duplicate rows: {rows_after}"
|
|
set_of_rows_after_dict[mode] = set_of_rows_after
|
|
|
|
nonlocal new_rows_dict
|
|
for mode, rows_after in rows_after_dict.items():
|
|
rows_before = rows_before_dict[mode]
|
|
new_rows_dict[mode] = rows_after[len(rows_before) :]
|
|
assert set(new_rows_dict[mode]) == set_of_rows_after_dict[mode] - set_of_rows_before_dict[mode], f"new rows are not the last rows in the audit table: rows_after={rows_after}, set_of_rows_after_dict[{mode}]={set_of_rows_after_dict[mode]}, set_of_rows_before_dict[{mode}]={set_of_rows_before_dict[mode]}"
|
|
|
|
if merge_duplicate_rows:
|
|
new_rows_dict = self.deduplicate_audit_entries(new_rows_dict)
|
|
|
|
auth_not_expected = (len([entry for entry in expected_entries if entry.category == "AUTH"]) == 0)
|
|
use_not_expected = (len([entry for entry in expected_entries if "USE " in entry.statement]) == 0)
|
|
|
|
new_rows_dict = self.filter_out_noise(
|
|
new_rows_dict,
|
|
filter_out_all_auth=auth_not_expected,
|
|
filter_out_cassandra_auth=filter_out_cassandra_auth,
|
|
filter_out_use=use_not_expected
|
|
)
|
|
|
|
for new_rows in new_rows_dict.values():
|
|
assert len(new_rows) <= len(expected_entries)
|
|
if len(new_rows) != len(expected_entries):
|
|
return False
|
|
|
|
return True
|
|
|
|
wait_for(is_number_of_new_rows_correct, timeout=60)
|
|
for mode, new_rows in new_rows_dict.items():
|
|
sorted_new_rows = sorted(new_rows, key=lambda row: (row.node, row.category, row.consistency, row.error, row.keyspace_name, row.operation, row.source, row.table_name, row.username))
|
|
assert len(sorted_new_rows) == len(expected_entries)
|
|
for row, entry in zip(sorted_new_rows, sorted(expected_entries)):
|
|
self.assert_audit_row_eq(row, entry.category, entry.statement, entry.table, entry.ks, entry.user, entry.cl, entry.error)
|
|
|
|
def verify_keyspace(self, audit_settings=None, helper=None):
|
|
"""
|
|
CREATE KEYSPACE, USE KEYSPACE, ALTER KEYSPACE, DROP KEYSPACE statements
|
|
"""
|
|
session = self.prepare(create_keyspace=False, audit_settings=audit_settings, helper=helper)
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(session, query, category, audit_settings, **kwargs)
|
|
|
|
execute_and_validate_audit_entry(
|
|
"CREATE KEYSPACE ks WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
|
|
category="DDL",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
'USE "ks"',
|
|
category="DML",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
"ALTER KEYSPACE ks WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false",
|
|
category="DDL",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
"DROP KEYSPACE ks",
|
|
category="DDL",
|
|
)
|
|
|
|
# Test that the audit entries are not added if the keyspace is not
|
|
# specified in the audit_keyspaces setting.
|
|
keyspaces = audit_settings["audit_keyspaces"].split(",") if "audit_keyspaces" in audit_settings else []
|
|
assert "ks2" not in keyspaces
|
|
query_sequence = [
|
|
"CREATE KEYSPACE ks2 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
|
|
'USE "ks2"',
|
|
"ALTER KEYSPACE ks2 WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false",
|
|
"DROP KEYSPACE ks2",
|
|
]
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
for query in query_sequence:
|
|
session.execute(query)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_using_non_existent_keyspace(self, helper_class):
|
|
"""
|
|
Test tha using a non-existent keyspace generates an audit entry with an
|
|
error field set to True.
|
|
"""
|
|
|
|
with helper_class() as helper:
|
|
session = self.prepare(create_keyspace=False, helper=helper)
|
|
|
|
self.execute_and_validate_audit_entry(
|
|
session,
|
|
'USE "ks"', # ks doesn't exist because create_keyspace=False in prepare
|
|
category="DML",
|
|
expected_error=InvalidRequest,
|
|
)
|
|
|
|
def verify_table(self, audit_settings=AuditTester.audit_default_settings, helper=None, table_prefix="test", overwrite_audit_tables=False):
|
|
"""
|
|
CREATE TABLE, ALTER TABLE, TRUNCATE TABLE, DROP TABLE statements
|
|
"""
|
|
first_table = f"{table_prefix}_1_{uuid.uuid4().hex[:4]}"
|
|
second_table = f"{table_prefix}_2_{uuid.uuid4().hex[:4]}"
|
|
|
|
if helper is None:
|
|
helper = AuditBackendTable()
|
|
|
|
if overwrite_audit_tables:
|
|
audit_settings["audit_tables"] = f"ks.{first_table}, ks.{second_table}"
|
|
|
|
session = self.prepare(audit_settings=audit_settings, helper=helper)
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(session, query, category, audit_settings, **kwargs)
|
|
|
|
execute_and_validate_audit_entry(
|
|
f"CREATE TABLE {first_table} (k int PRIMARY KEY, v1 int)",
|
|
category="DDL",
|
|
table=first_table,
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"CREATE TABLE {second_table} (k int, c1 int, v1 int, PRIMARY KEY (k, c1)) WITH COMPACT STORAGE",
|
|
category="DDL",
|
|
table=second_table,
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"ALTER TABLE {first_table} ADD v2 int",
|
|
category="DDL",
|
|
table=first_table,
|
|
)
|
|
|
|
for table in [first_table, second_table]:
|
|
for i in range(10):
|
|
if table == first_table:
|
|
columns = "(k, v1, v2)"
|
|
else:
|
|
columns = "(k, c1, v1)"
|
|
|
|
execute_and_validate_audit_entry(
|
|
f"INSERT INTO {table} {columns} VALUES ({i}, {i}, {i})",
|
|
category="DML",
|
|
table=f"{table}",
|
|
)
|
|
|
|
res = execute_and_validate_audit_entry(
|
|
f"SELECT * FROM {table}",
|
|
category="QUERY",
|
|
table=f"{table}",
|
|
)
|
|
assert sorted(rows_to_list(res)) == [[i, i, i] for i in range(10)], res
|
|
|
|
execute_and_validate_audit_entry(
|
|
f"TRUNCATE {table}",
|
|
category="DML",
|
|
table=f"{table}",
|
|
)
|
|
|
|
res = execute_and_validate_audit_entry(
|
|
f"SELECT * FROM {table}",
|
|
category="QUERY",
|
|
table=f"{table}",
|
|
)
|
|
assert rows_to_list(res) == [], res
|
|
|
|
execute_and_validate_audit_entry(
|
|
f"DROP TABLE {table}",
|
|
category="DDL",
|
|
table=f"{table}",
|
|
)
|
|
|
|
execute_and_validate_audit_entry(
|
|
f"SELECT * FROM {table}",
|
|
category="QUERY",
|
|
table=f"{table}",
|
|
expected_error=InvalidRequest,
|
|
expect_new_audit_entry=False,
|
|
)
|
|
|
|
# Test that the audit entries are not added if the keyspace is not
|
|
# specified in the audit_keyspaces setting.
|
|
keyspaces = audit_settings["audit_keyspaces"].split(",") if "audit_keyspaces" in audit_settings else []
|
|
assert "ks2" not in keyspaces
|
|
query_sequence = [
|
|
"CREATE KEYSPACE ks2 WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true",
|
|
f"CREATE TABLE ks2.{first_table} (k int PRIMARY KEY, v1 int)",
|
|
f"ALTER TABLE ks2.{first_table} ADD v2 int",
|
|
f"INSERT INTO ks2.{first_table} (k, v1, v2) VALUES (1, 1, 1)",
|
|
f"SELECT * FROM ks2.{first_table}",
|
|
f"TRUNCATE ks2.{first_table}",
|
|
f"DROP TABLE ks2.{first_table}",
|
|
]
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
for query in query_sequence:
|
|
session.execute(query)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_keyspace(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_keyspace(audit_settings=AuditTester.audit_default_settings, helper=helper)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_keyspace_extra_parameter(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,DML,DDL,DCL", "audit_keyspaces": "ks", "extra_parameter": "new"}, helper=helper)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_keyspace_many_ks(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "a,b,c,ks"}, helper=helper)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_keyspace_table_not_exists(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_keyspace(audit_settings={"audit": "table", "audit_categories": "DML,DDL", "audit_keyspaces": "ks", "audit_tables": "ks.fake"}, helper=helper)
|
|
|
|
def test_audit_type_none(self):
|
|
"""
|
|
'audit': None
|
|
CREATE KEYSPACE, USE KEYSPACE, ALTER KEYSPACE, DROP KEYSPACE statements
|
|
check audit KS not created
|
|
"""
|
|
|
|
audit_settings = {"audit": None, "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
session = self.prepare(create_keyspace=False, audit_settings=audit_settings)
|
|
|
|
session.execute("CREATE KEYSPACE ks WITH replication = { 'class':'SimpleStrategy', 'replication_factor':1} AND DURABLE_WRITES = true")
|
|
|
|
session.execute("USE ks")
|
|
|
|
session.execute("ALTER KEYSPACE ks WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND DURABLE_WRITES = false")
|
|
|
|
session.execute("DROP KEYSPACE ks")
|
|
assert_invalid(session, "use audit;", expected=InvalidRequest)
|
|
|
|
def test_audit_type_invalid(self):
|
|
"""
|
|
'audit': invalid
|
|
check node not started
|
|
"""
|
|
self.fixture_dtest_setup.allow_log_errors = True
|
|
|
|
audit_settings = {"audit": "invalid", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
cluster = self.cluster
|
|
|
|
cluster.set_configuration_options(values=audit_settings)
|
|
|
|
try:
|
|
cluster.populate(1).start(no_wait=True)
|
|
except (NodeError, RuntimeError):
|
|
pass
|
|
|
|
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit': invalid\)"
|
|
self.ignore_log_patterns.append(expected_error)
|
|
self.cluster.nodes["node1"].watch_log_for(expected_error)
|
|
|
|
def test_composite_audit_type_invalid(self):
|
|
"""
|
|
'audit': table,syslog,invalid
|
|
check node not started
|
|
"""
|
|
self.fixture_dtest_setup.allow_log_errors = True
|
|
|
|
audit_settings = {"audit": "table,syslog,invalid", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
|
|
cluster = self.cluster
|
|
|
|
cluster.set_configuration_options(values=audit_settings)
|
|
|
|
try:
|
|
cluster.populate(1).start(no_wait=True)
|
|
except (NodeError, RuntimeError):
|
|
pass
|
|
|
|
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit': invalid\)"
|
|
self.ignore_log_patterns.append(expected_error)
|
|
self.cluster.nodes["node1"].watch_log_for(expected_error)
|
|
|
|
# TODO: verify that the syslog file doesn't exist
|
|
def test_audit_empty_settings(self):
|
|
"""
|
|
'audit': none
|
|
check node started, ks audit not created
|
|
"""
|
|
session = self.prepare(create_keyspace=False, audit_settings={"audit": "none"})
|
|
assert_invalid(session, "use audit;", expected=InvalidRequest)
|
|
|
|
def test_composite_audit_empty_settings(self):
|
|
"""
|
|
'audit': table,syslog,none
|
|
check node started, ks audit not created
|
|
"""
|
|
session = self.prepare(create_keyspace=False, audit_settings={"audit": "table,syslog,none"})
|
|
assert_invalid(session, "use audit;", expected=InvalidRequest)
|
|
|
|
def test_audit_audit_ks(self):
|
|
"""
|
|
'audit_keyspaces': 'audit'
|
|
check node started, ks audit created
|
|
"""
|
|
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "audit"}
|
|
session = self.prepare(create_keyspace=False, audit_settings=audit_settings)
|
|
|
|
self.execute_and_validate_audit_entry(session, query=self.AUDIT_LOG_QUERY, category="QUERY", ks="audit", table="audit_log", audit_settings=audit_settings)
|
|
|
|
@pytest.mark.single_node
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_categories_invalid(self, helper_class):
|
|
"""
|
|
'audit_categories': invalid
|
|
check node not started
|
|
"""
|
|
with helper_class() as helper:
|
|
self.fixture_dtest_setup.allow_log_errors = True
|
|
|
|
audit_settings = {"audit": "table", "audit_categories": "INVALID", "audit_keyspaces": "ks"}
|
|
|
|
cluster = self.cluster
|
|
|
|
cluster.set_configuration_options(values=audit_settings)
|
|
cluster.force_wait_for_cluster_start = False
|
|
|
|
try:
|
|
cluster.populate(1).start(no_wait=True)
|
|
except NodeError:
|
|
pass
|
|
expected_error = r"Startup failed: audit::audit_exception \(Bad configuration: invalid 'audit_categories': INVALID\)"
|
|
self.ignore_log_patterns.append(expected_error)
|
|
self.cluster.nodes["node1"].watch_log_for(expected_error)
|
|
|
|
# compact storage is current required for all tests that call verify_table
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
def test_audit_table(self):
|
|
self.verify_table(audit_settings=AuditTester.audit_default_settings, table_prefix="test_audit_table")
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
def test_audit_table_extra_parameter(self):
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks", "extra_parameter": "new"}, table_prefix="test_audit_table_extra_parameter")
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
def test_audit_table_audit_keyspaces_empty(self):
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL"}, table_prefix="test_audit_table_audit_keyspaces_empty", overwrite_audit_tables=True)
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
def test_audit_table_no_ks(self):
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL"}, table_prefix="test_audit_table_no_ks", overwrite_audit_tables=True)
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
def test_audit_categories_part1(self):
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "AUTH,QUERY,DDL"}, table_prefix="test_audit_categories_part1", overwrite_audit_tables=True)
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_categories_part2(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH,DCL", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part2")
|
|
|
|
@pytest.mark.cluster_options(enable_create_table_with_compact_storage=True)
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_audit_categories_part3(self, helper_class):
|
|
with helper_class() as helper:
|
|
self.verify_table(audit_settings={"audit": "table", "audit_categories": "DDL, ADMIN,AUTH", "audit_keyspaces": "ks"}, helper=helper, table_prefix="test_audit_categories_part3")
|
|
|
|
PasswordMaskingCase = namedtuple("PasswordMaskingCase", ["name", "password", "new_password"])
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_user_password_masking(self, helper_class):
|
|
"""
|
|
CREATE USER, ALTER USER, DROP USER statements
|
|
"""
|
|
with helper_class() as helper:
|
|
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs, user="cassandra", ks="")
|
|
|
|
tests = [self.PasswordMaskingCase("user1", "secret", "Secret^%$#@!"), self.PasswordMaskingCase("user2", "", "")]
|
|
for username, password, new_password in tests:
|
|
execute_and_validate_audit_entry(
|
|
f"CREATE USER {username} WITH PASSWORD '{password}'",
|
|
category="DCL",
|
|
expected_operation=f"CREATE USER {username} WITH PASSWORD '***'",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"ALTER USER {username} WITH PASSWORD '{new_password}'",
|
|
category="DCL",
|
|
expected_operation=f"ALTER USER {username} WITH PASSWORD '***'",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"DROP USER {username}",
|
|
category="DCL",
|
|
)
|
|
|
|
def test_negative_audit_records_auth(self):
|
|
"""
|
|
Test that failed AUTH attempts are audited.
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra")
|
|
|
|
expected_entry = AuditEntry(category="AUTH", statement="LOGIN", table="", ks="", user="wrong_user", cl="", error=True)
|
|
with self.assert_entries_were_added(session, [expected_entry], filter_out_cassandra_auth=True):
|
|
try:
|
|
bad_session = self.exclusive_cql_connection(self.cluster.nodelist()[0], user="wrong_user", password="wrong_password")
|
|
pytest.fail()
|
|
except NoHostAvailable as e:
|
|
errors = e.errors.values()
|
|
assert len(errors) == 1
|
|
error = next(iter(errors))
|
|
assert isinstance(error, AuthenticationFailed)
|
|
|
|
def test_negative_audit_records_admin(self):
|
|
"""
|
|
Test that failed ADMIN statements are audited.
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra")
|
|
session.execute("CREATE ROLE test_role")
|
|
|
|
stmt = "ATTACH SERVICE_LEVEL test_service_level TO test_role"
|
|
|
|
expected_entry = AuditEntry(category="ADMIN", table="", ks="", user="cassandra", cl="ONE", error=True, statement=stmt)
|
|
|
|
with self.assert_entries_were_added(session, [expected_entry]):
|
|
assert_invalid(session, stmt, expected=InvalidRequest)
|
|
|
|
def test_negative_audit_records_ddl(self):
|
|
"""
|
|
Test that failed DDL statements are audited.
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra")
|
|
|
|
stmt = "CREATE KEYSPACE ks WITH replication = { 'class':'NetworkTopologyStrategy', 'replication_factor': 3 }"
|
|
|
|
expected_entry = AuditEntry(category="DDL", table="", ks="ks", user="cassandra", cl="ONE", error=True, statement=stmt)
|
|
|
|
with self.assert_entries_were_added(session, [expected_entry]):
|
|
assert_invalid(session, stmt, expected=AlreadyExists)
|
|
|
|
def test_negative_audit_records_dml(self):
|
|
"""
|
|
Test that failed DML statements are audited.
|
|
"""
|
|
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
session = self.prepare(user="cassandra", password="cassandra", audit_settings=audit_settings)
|
|
|
|
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int, v2 int)")
|
|
|
|
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1, v2) VALUES (1, 1, 1)", consistency_level=ConsistencyLevel.TWO)
|
|
|
|
expected_entry = AuditEntry(category="DML", table="test1", ks="ks", user="cassandra", cl="TWO", error=True, statement=stmt.query_string)
|
|
|
|
with self.assert_entries_were_added(session, [expected_entry]):
|
|
assert_invalid(session, stmt, expected=Unavailable)
|
|
|
|
def test_negative_audit_records_dcl(self):
|
|
"""
|
|
Test that failed DCL statements are audited.
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra")
|
|
|
|
stmt = "GRANT SELECT ON ALL KEYSPACES TO test_role"
|
|
|
|
expected_entry = AuditEntry(category="DCL", table="", ks="", user="cassandra", cl="ONE", error=True, statement=stmt)
|
|
|
|
with self.assert_entries_were_added(session, [expected_entry]):
|
|
assert_invalid(session, stmt, expected=InvalidRequest)
|
|
|
|
def test_negative_audit_records_query(self):
|
|
"""
|
|
Test that failed QUERY statements are audited.
|
|
"""
|
|
audit_settings = {"audit": "table", "audit_categories": "ADMIN,AUTH,QUERY,DML,DDL,DCL", "audit_keyspaces": "ks"}
|
|
session = self.prepare(audit_settings=audit_settings)
|
|
|
|
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v int)")
|
|
|
|
stmt = SimpleStatement("SELECT * FROM ks.test1", consistency_level=ConsistencyLevel.TWO)
|
|
|
|
expected_entry = AuditEntry(category="QUERY", table="test1", ks="ks", user="anonymous", cl="TWO", error=True, statement=stmt.query_string)
|
|
|
|
with self.assert_entries_were_added(session, [expected_entry]):
|
|
assert_invalid(session, stmt, expected=Unavailable)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_role_password_masking(self, helper_class):
|
|
"""
|
|
CREATE ROLE, ALTER ROLE, DROP ROLE statements
|
|
"""
|
|
|
|
with helper_class() as helper:
|
|
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs, user="cassandra", ks="")
|
|
|
|
tests = [self.PasswordMaskingCase("role1", "Secret!@#$", "Secret^%$#@!"), self.PasswordMaskingCase("role2", "", "")]
|
|
for role_name, password, new_password in tests:
|
|
execute_and_validate_audit_entry(
|
|
f"CREATE ROLE {role_name} WITH PASSWORD = '{password}'",
|
|
category="DCL",
|
|
expected_operation=f"CREATE ROLE {role_name} WITH PASSWORD = '***'",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"ALTER ROLE {role_name} WITH PASSWORD = '{new_password}'",
|
|
category="DCL",
|
|
expected_operation=f"ALTER ROLE {role_name} WITH PASSWORD = '***'",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
f"DROP ROLE {role_name}",
|
|
category="DCL",
|
|
)
|
|
|
|
def test_login(self):
|
|
"""
|
|
USER LOGIN
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
|
|
session.execute("CREATE USER test WITH PASSWORD 'test'")
|
|
|
|
expected_audit_entries = [AuditEntry(category="AUTH", statement="LOGIN", user="test", table="", ks="", cl="", error=False)]
|
|
|
|
with self.assert_entries_were_added(session, expected_audit_entries, filter_out_cassandra_auth=True):
|
|
self.prepare(user="test", password="test", create_keyspace=False)
|
|
|
|
def test_cassandra_login(self):
|
|
"""
|
|
Test user login to default (cassandra) user
|
|
"""
|
|
session = self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
|
|
expected_audit_entries = [AuditEntry(category="AUTH", statement="LOGIN", user="cassandra", table="", ks="", cl="", error=False)]
|
|
|
|
with self.assert_entries_were_added(session, expected_audit_entries, filter_out_cassandra_auth=False):
|
|
self.prepare(user="cassandra", password="cassandra", create_keyspace=False)
|
|
|
|
def test_categories(self):
|
|
"""
|
|
Test filtering audit categories
|
|
"""
|
|
session = self.prepare(experimental=True, audit_settings={"audit": "table", "audit_categories": "DML", "audit_keyspaces": "ks"})
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(session, query, category, self.audit_default_settings, **kwargs)
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
|
|
session.execute("ALTER TABLE test1 ADD v2 int")
|
|
|
|
for i in range(10):
|
|
execute_and_validate_audit_entry(
|
|
f"INSERT INTO test1 (k, v1, v2) VALUES ({i}, {i}, {i})",
|
|
category="DML",
|
|
table="test1",
|
|
)
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
res = sorted(session.execute("SELECT * FROM test1"))
|
|
assert rows_to_list(res) == [[i, i, i] for i in range(10)], res
|
|
|
|
execute_and_validate_audit_entry(
|
|
"TRUNCATE test1",
|
|
category="DML",
|
|
table="test1",
|
|
)
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
res = session.execute("SELECT * FROM test1")
|
|
assert rows_to_list(res) == [], res
|
|
|
|
session.execute("DROP TABLE test1")
|
|
|
|
def _get_attempt_count(self, session: Session, *, execution_profile=EXEC_PROFILE_DEFAULT, consistency_level: ConsistencyLevel = ConsistencyLevel.ONE) -> int:
|
|
# dtest env is using FlakyRetryPolicy which has `max_retries` attribute
|
|
cl_profile = session.execution_profile_clone_update(execution_profile, consistency_level=consistency_level)
|
|
policy = cl_profile.retry_policy
|
|
retries = getattr(policy, "max_retries", None)
|
|
assert retries is not None
|
|
return 1 + retries
|
|
|
|
def _test_insert_failure_doesnt_report_success_assign_nodes(self, session: Session = None):
|
|
all_nodes: set[ScyllaNode] = set(self.cluster.nodelist())
|
|
assert len(all_nodes) == 7
|
|
|
|
address_to_node = {node.address(): node for node in all_nodes}
|
|
node_to_audit_nodes = {}
|
|
|
|
for index, node in enumerate(all_nodes):
|
|
with self.patient_exclusive_cql_connection(node) as conn:
|
|
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES (1000, 1000)", consistency_level=ConsistencyLevel.THREE)
|
|
conn.execute(stmt)
|
|
audit_nodes = self.get_audit_partitions_for_operation(session, stmt.query_string)
|
|
node_to_audit_nodes[index] = set([node.address() for node in audit_nodes])
|
|
|
|
all_nodes_addresses = set([node.address() for node in all_nodes])
|
|
# the number here is arbitrary
|
|
for i in range(256):
|
|
stmt = SimpleStatement(f"INSERT INTO ks.test1 (k, v1) VALUES ({i}, 1337)", consistency_level=ConsistencyLevel.THREE)
|
|
session.execute(stmt)
|
|
attempt_count = self._get_attempt_count(session, consistency_level=ConsistencyLevel.THREE)
|
|
|
|
token = rows_to_list(session.execute(f"SELECT token(k) FROM ks.test1 WHERE k = {i}"))[0][0]
|
|
|
|
partitions = self.get_partitions_for_token("ks", "test1", token)
|
|
|
|
for index, audit_nodes in node_to_audit_nodes.items():
|
|
common = audit_nodes & partitions
|
|
if not common:
|
|
insert_node = all_nodes_addresses - partitions - audit_nodes
|
|
if len(all_nodes_addresses) != 7 or len(partitions) != 3 or len(audit_nodes) != 3 or len(insert_node) != 1:
|
|
raise pytest.skip(f"Failed to assign nodes for insert failure test")
|
|
audit_partition_nodes = [address_to_node[address] for address in audit_nodes]
|
|
insert_node = address_to_node[insert_node.pop()]
|
|
kill_node = address_to_node[partitions.pop()]
|
|
return audit_partition_nodes, insert_node, kill_node, stmt.query_string, attempt_count
|
|
|
|
return [], [], None, None, None
|
|
|
|
@pytest.mark.exclude_errors("audit - Unexpected exception when writing log with: node_ip")
|
|
def test_insert_failure_doesnt_report_success(self):
|
|
"""
|
|
Test that if an insert fails, the audit log doesn't report the insert
|
|
as successful.
|
|
|
|
The test works by creating a table with RF=3, and then stopping one of
|
|
the nodes. Then, an insert is executed with CL=THREE. This insert will
|
|
fail, since the node that is stopped is a replica for the inserted
|
|
partition. The test verifies that the audit log doesn't report the
|
|
insert as successful and reports unsuccessful inserts as expected.
|
|
"""
|
|
cluster_topology = {"datacenter1": {"rack1": 3, "rack2": 2, "rack3": 2}}
|
|
session: Session = self.prepare(nodes=cluster_topology, rf=3)
|
|
|
|
with self.patient_exclusive_cql_connection(self.cluster.nodelist()[0]) as conn:
|
|
stmt = SimpleStatement("CREATE TABLE ks.test1 (k int PRIMARY KEY, v1 int)")
|
|
with self.assert_exactly_n_audit_entries_were_added(session, 1):
|
|
conn.execute(stmt)
|
|
|
|
audit_paritition_nodes, insert_node, node_to_stop, query_to_fail, query_fail_count = self._test_insert_failure_doesnt_report_success_assign_nodes(session=session)
|
|
|
|
# TODO: remove the loop when scylladb#24473 is fixed
|
|
# We call get_host_id only to cache host_id
|
|
for node in audit_paritition_nodes + [insert_node] + [node_to_stop]:
|
|
node.cluster.manager.get_host_id(node.server_id)
|
|
|
|
if len(audit_paritition_nodes) != 3 or node_to_stop is None or insert_node is None:
|
|
raise pytest.skip(f"Failed to assign nodes for insert failure test")
|
|
|
|
for audit_node in audit_paritition_nodes:
|
|
logger.debug(f"audit_paritition_nodes: {audit_node.name} {audit_node.address()}")
|
|
logger.debug(f"node_to_stop: {node_to_stop.name} {node_to_stop.address()}")
|
|
logger.debug(f"insert_node: {insert_node.name} {insert_node.address()}")
|
|
|
|
with self.patient_exclusive_cql_connection(insert_node) as conn:
|
|
stmt = SimpleStatement("INSERT INTO ks.test1 (k, v1) VALUES (1, 1)", consistency_level=ConsistencyLevel.THREE)
|
|
with self.assert_exactly_n_audit_entries_were_added(session, 1):
|
|
conn.execute(stmt)
|
|
|
|
node_to_stop.stop(wait_other_notice=True)
|
|
with self.patient_exclusive_cql_connection(insert_node) as conn:
|
|
with pytest.raises(Unavailable):
|
|
stmt = SimpleStatement(query_to_fail, consistency_level=ConsistencyLevel.THREE)
|
|
conn.execute(stmt)
|
|
pytest.fail("Expected insert to fail")
|
|
node_to_stop.start(wait_for_binary_proto=True, wait_other_notice=True)
|
|
|
|
rows_dict = dict[str, list[AuditEntry]]()
|
|
timestamp_before = datetime.datetime.now()
|
|
for i in itertools.count(start=1):
|
|
if datetime.datetime.now() - timestamp_before > datetime.timedelta(seconds=60):
|
|
pytest.fail(f"audit log not updated after {i} iterations")
|
|
|
|
rows_dict = self.get_audit_log_dict(session)
|
|
# We need to satisfy the end state condition for all audit modes.
|
|
# If any audit mode is not done yet, continue polling.
|
|
all_modes_done = True
|
|
for mode, rows in rows_dict.items():
|
|
rows_with_error = [row for row in rows if row.error and row.operation == query_to_fail]
|
|
if len(rows_with_error) == query_fail_count:
|
|
logger.info(f"audit mode {mode} log updated after {i} iterations ({i / 10}s)")
|
|
assert rows_with_error[0].error is True
|
|
assert rows_with_error[0].consistency == "THREE"
|
|
|
|
# We expect the initial insert to be in the audit log.
|
|
# it is executed in _test_insert_failure_doesnt_report_success_assign_nodes
|
|
rows_without_error = [row for row in rows if row.operation == query_to_fail and not row.error]
|
|
assert len(rows_without_error) == 1
|
|
else:
|
|
# An audit mode is not done yet, early exit to continue polling.
|
|
all_modes_done = False
|
|
break
|
|
if all_modes_done:
|
|
break
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_prepare(self, helper_class):
|
|
"""Test prepare statement"""
|
|
with helper_class() as helper:
|
|
session = self.prepare(helper=helper)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE cf (
|
|
k varchar PRIMARY KEY,
|
|
c int,
|
|
)
|
|
"""
|
|
)
|
|
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
query = "INSERT INTO cf (k, c) VALUES (?, ?);"
|
|
pq = session.prepare(query)
|
|
|
|
self.execute_and_validate_audit_entry(
|
|
session,
|
|
pq,
|
|
bound_values=["foo", 4],
|
|
category="DML",
|
|
expected_operation="INSERT INTO cf (k, c) VALUES (?, ?);",
|
|
table="cf",
|
|
)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_permissions(self, helper_class):
|
|
"""Test user permissions"""
|
|
|
|
with helper_class() as helper:
|
|
session = self.prepare(user="cassandra", password="cassandra", helper=helper)
|
|
session.execute("CREATE TABLE test1 (k int PRIMARY KEY, v1 int)")
|
|
session.execute("CREATE USER test WITH PASSWORD 'test'")
|
|
session.execute("GRANT SELECT ON ks.test1 TO test")
|
|
session.execute("INSERT INTO test1 (k, v1) VALUES (1, 1)")
|
|
|
|
logging.info("Waiting for appling permissions")
|
|
for srv in self.cluster.manager.running_servers():
|
|
asyncio.run(read_barrier(self.cluster.manager.api, srv.ip_addr))
|
|
|
|
test_session = self.patient_cql_connection(self.cluster.nodelist()[0], user="test", password="test")
|
|
|
|
def execute_and_validate_audit_entry(query, category, **kwargs):
|
|
return self.execute_and_validate_audit_entry(test_session, query, category, session_for_audit_entry_validation=session, user="test", **kwargs)
|
|
|
|
execute_and_validate_audit_entry(
|
|
"SELECT * FROM ks.test1",
|
|
category="QUERY",
|
|
table="test1",
|
|
)
|
|
execute_and_validate_audit_entry(
|
|
"INSERT INTO ks.test1 (k, v1) VALUES (2, 2)",
|
|
category="DML",
|
|
table="test1",
|
|
expected_error=Unauthorized,
|
|
)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_batch(self, helper_class):
|
|
"""
|
|
BATCH statement
|
|
"""
|
|
with helper_class() as helper:
|
|
session = self.prepare(helper=helper)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE test8 (
|
|
userid text PRIMARY KEY,
|
|
name text,
|
|
password text
|
|
)
|
|
"""
|
|
)
|
|
|
|
batch_query = SimpleStatement(
|
|
"""
|
|
BEGIN BATCH
|
|
INSERT INTO test8 (userid, password, name) VALUES ('user1', 'ch@ngem3b', 'second user');
|
|
UPDATE test8 SET password = 'ps22dhds' WHERE userid = 'user3';
|
|
INSERT INTO test8 (userid, password) VALUES ('user4', 'ch@ngem3c');
|
|
DELETE name FROM test8 WHERE userid = 'user1';
|
|
APPLY BATCH;
|
|
""",
|
|
consistency_level=ConsistencyLevel.QUORUM,
|
|
)
|
|
|
|
expected_audit_operations = [
|
|
"INSERT INTO test8 (userid, password, name) VALUES (user1, ch@ngem3b, second user)",
|
|
"UPDATE test8 SET password = ps22dhds WHERE userid = user3",
|
|
"INSERT INTO test8 (userid, password) VALUES (user4, ch@ngem3c)",
|
|
"DELETE name FROM test8 WHERE userid = user1",
|
|
]
|
|
expected_entries = list(map(lambda query: AuditEntry(category="DML", statement=query, table="test8", ks="ks", user="anonymous", cl="QUORUM", error=False), expected_audit_operations))
|
|
|
|
with self.assert_entries_were_added(session, expected_entries, merge_duplicate_rows=False):
|
|
session.execute(batch_query)
|
|
|
|
def test_service_level_statements(self):
|
|
"""
|
|
Test auditing service level statements - ones that use the ADMIN audit category.
|
|
"""
|
|
audit_settings = {"audit": "table", "audit_categories": "ADMIN"}
|
|
session = self.prepare(user="cassandra", password="cassandra", audit_settings=audit_settings, jvm_args=["--smp", "1"])
|
|
|
|
# Create role to which a service level can be attached.
|
|
session.execute("CREATE ROLE test_role")
|
|
|
|
query_sequence = [
|
|
"CREATE SERVICE_LEVEL test_service_level WITH SHARES = 1",
|
|
"ATTACH SERVICE_LEVEL test_service_level TO test_role",
|
|
"DETACH SERVICE_LEVEL FROM test_role",
|
|
"LIST SERVICE_LEVEL test_service_level",
|
|
"LIST ALL SERVICE_LEVELS",
|
|
"LIST ATTACHED SERVICE_LEVEL OF test_role",
|
|
"LIST ALL ATTACHED SERVICE_LEVELS",
|
|
"ALTER SERVICE_LEVEL test_service_level WITH SHARES = 2",
|
|
"DROP SERVICE_LEVEL test_service_level",
|
|
]
|
|
|
|
# Execute previously defined service level statements.
|
|
# Validate that the audit log contains the expected entries.
|
|
for query in query_sequence:
|
|
self.execute_and_validate_audit_entry(session, query, category="ADMIN", audit_settings=audit_settings, ks="", user="cassandra")
|
|
|
|
# Create a session with the ADMIN category disabled to validate that
|
|
# the service level statements are not audited in that case.
|
|
session = self.prepare(user="cassandra", password="cassandra", audit_settings={"audit_categories": "QUERY,DML,DDL,DCL"}, reload_config=True)
|
|
|
|
# Execute previously defined service level statements.
|
|
# Validate that the audit log does not contain any entries.
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
for query in query_sequence:
|
|
session.execute(query)
|
|
|
|
class AuditConfigChanger:
|
|
class ExpectedResult(enum.Enum):
|
|
SUCCESS = 1
|
|
FAILURE_UNPARSABLE_VALUE = 2
|
|
FAILURE_UNUPDATABLE_PARAM = 3
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
pass
|
|
|
|
def change_config(self, test, settings, expected_result):
|
|
assert False, "change_config not implemented in AuditConfigChanger"
|
|
|
|
def verify_change(self, node, param, value, from_mark, expected_result):
|
|
current_value = requests.get(f"http://{node.address()}:10000/v2/config/{param}").text.replace('"', "")
|
|
match expected_result:
|
|
case self.ExpectedResult.SUCCESS:
|
|
assert current_value == value, f"response: {current_value}, expected: {value}"
|
|
node.watch_log_for(r"Audit configuration is updated", from_mark=from_mark, timeout=60)
|
|
case self.ExpectedResult.FAILURE_UNPARSABLE_VALUE:
|
|
assert current_value == value, f"response: {current_value}, expected: {value}"
|
|
node.watch_log_for(r"Audit configuration update failed ", from_mark=from_mark, timeout=60)
|
|
case self.ExpectedResult.FAILURE_UNUPDATABLE_PARAM:
|
|
assert current_value != value, f"response: {current_value}, expected different than: {value}"
|
|
|
|
class AuditSighupConfigChanger(AuditConfigChanger):
|
|
def change_config(self, test, settings, expected_result):
|
|
for node in test.cluster.nodelist():
|
|
logger.info(f"Changing config via manager.server_update_config: Node={node.address()} settings={settings}")
|
|
mark = node.mark_log()
|
|
for param, value in settings.items():
|
|
mark_per_param = node.mark_log()
|
|
logger.info(f"server_update_config: param={param} value={value}")
|
|
node.cluster.manager.server_update_config(node.server_id, param, value)
|
|
node.watch_log_for(r"completed re-reading configuration file", from_mark=mark_per_param, timeout=60)
|
|
for param, value in settings.items():
|
|
self.verify_change(node, param, value, mark, expected_result)
|
|
|
|
class AuditCqlConfigChanger(AuditConfigChanger):
|
|
def change_config(self, test, settings, expected_result):
|
|
for node in test.cluster.nodelist():
|
|
mark = node.mark_log()
|
|
for param in settings:
|
|
with test.patient_exclusive_cql_connection(node) as session:
|
|
logger.debug(f"Changing config via CQL: Node={node.address()} param={param} value={settings[param]}")
|
|
try:
|
|
session.execute(f"UPDATE system.config SET value='{settings[param]}' where name='{param}';")
|
|
except WriteFailure:
|
|
assert expected_result == self.ExpectedResult.FAILURE_UNUPDATABLE_PARAM, f"CQL execution failed but expected_result: {expected_result}"
|
|
for param in settings:
|
|
self.verify_change(node, param, settings[param], mark, expected_result)
|
|
|
|
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_config_liveupdate(self, helper_class, audit_config_changer):
|
|
"""
|
|
Test liveupdate config changes in audit.
|
|
Liveupdate categories, tables, and keyspaces and confirm proper audit behavior.
|
|
Execute with both audit sinks (Table, Syslog) and both liveupdate methods (SIGHUP, CQL).
|
|
Use multi-node configuration (nodes=3).
|
|
"""
|
|
with helper_class() as helper, audit_config_changer() as config_changer:
|
|
self.fixture_dtest_setup.allow_log_errors = True
|
|
|
|
default_categories = copy.deepcopy(self.audit_default_settings["audit_categories"])
|
|
enabled_but_empty_audit_config = helper.update_audit_settings(self.audit_default_settings, modifiers={"audit_categories": "", "audit_tables": "", "audit_keyspaces": ""})
|
|
|
|
session = self.prepare(helper=helper, nodes=3, audit_settings=enabled_but_empty_audit_config)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE test_config_lifeupdate (
|
|
userid text PRIMARY KEY,
|
|
name text,
|
|
password text
|
|
)
|
|
"""
|
|
)
|
|
|
|
auditted_query = SimpleStatement("INSERT INTO test_config_lifeupdate (userid, password, name) VALUES ('user2', 'password2', 'second user');", consistency_level=ConsistencyLevel.QUORUM)
|
|
expected_new_entries = [AuditEntry(category="DML", statement=auditted_query.query_string, table="test_config_lifeupdate", ks="ks", user="anonymous", cl="QUORUM", error=False)]
|
|
|
|
# Started with enabled_but_empty_audit_config config: no auditing
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
session.execute(auditted_query)
|
|
|
|
# Config modified with correct categories, multiple tables, and multiple keyspaces: audit works
|
|
config_changer.change_config(self, {"audit_categories": default_categories, "audit_tables": "test.table1,test.table2", "audit_keyspaces": "ks,ks2,ks3"}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
|
|
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
|
|
session.execute(auditted_query)
|
|
|
|
# Conifg modified with invalid categories: audit works, because previous audit configuration is used
|
|
config_changer.change_config(self, {"audit_categories": "INVALID_CATEGORIES"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNPARSABLE_VALUE)
|
|
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
|
|
session.execute(auditted_query)
|
|
|
|
# Conifg modified with valid categories but invalid tables and and non-existing keyspaces: no auditing
|
|
config_changer.change_config(
|
|
self, {"audit_categories": default_categories, "audit_tables": "invalid.table.twodots", "audit_keyspaces": "non-existing"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNPARSABLE_VALUE
|
|
)
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
session.execute(auditted_query)
|
|
|
|
# Config modified with valid tables and empty keyspaces: audit works
|
|
config_changer.change_config(self, {"audit_tables": "ks.test_config_lifeupdate", "audit_keyspaces": ""}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
|
|
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
|
|
session.execute(auditted_query)
|
|
|
|
# Set empty categories, tables, keyspaces: no auditing
|
|
config_changer.change_config(self, {"audit_categories": "", "audit_tables": "", "audit_keyspaces": ""}, expected_result=self.AuditConfigChanger.ExpectedResult.SUCCESS)
|
|
with self.assert_no_audit_entries_were_added(session):
|
|
session.execute(auditted_query)
|
|
|
|
@pytest.mark.parametrize("audit_config_changer", [AuditSighupConfigChanger, AuditCqlConfigChanger])
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_config_no_liveupdate(self, helper_class, audit_config_changer):
|
|
"""
|
|
Test audit config parameters that don't allow config changes.
|
|
Modification of "audit", "audit_unix_socket_path", and "audit_syslog_write_buffer_size" should be forbidden.
|
|
"""
|
|
with helper_class() as helper, audit_config_changer() as config_changer:
|
|
self.fixture_dtest_setup.allow_log_errors = True
|
|
session = self.prepare(helper=helper)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE test_config_no_lifeupdate (
|
|
userid text PRIMARY KEY,
|
|
name text,
|
|
password text
|
|
)
|
|
"""
|
|
)
|
|
|
|
auditted_query = SimpleStatement("INSERT INTO test_config_no_lifeupdate (userid, password, name) VALUES ('user2', 'password2', 'second user');", consistency_level=ConsistencyLevel.QUORUM)
|
|
expected_new_entries = [AuditEntry(category="DML", statement=auditted_query.query_string, table="test_config_no_lifeupdate", ks="ks", user="anonymous", cl="QUORUM", error=False)]
|
|
|
|
# Modifications of "audit", "audit_unix_socket_path", "audit_syslog_write_buffer_size" are forbidden and will fail
|
|
config_changer.change_config(self, {"audit": "none"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
|
|
config_changer.change_config(self, {"audit_unix_socket_path": "/path/"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
|
|
config_changer.change_config(self, {"audit_syslog_write_buffer_size": "123123123"}, expected_result=self.AuditConfigChanger.ExpectedResult.FAILURE_UNUPDATABLE_PARAM)
|
|
|
|
# Despite unsuccesful attempts to change config, audit works as expected
|
|
with self.assert_entries_were_added(session, expected_new_entries, merge_duplicate_rows=False):
|
|
session.execute(auditted_query)
|
|
|
|
@pytest.mark.parametrize("helper_class", [AuditBackendTable, AuditBackendSyslog, AuditBackendComposite])
|
|
def test_parallel_syslog_audit(self, helper_class):
|
|
"""
|
|
Test that cluster doesn't fail if multiple queries are audited in parallel
|
|
"""
|
|
with helper_class() as helper:
|
|
audit_settings = helper.update_audit_settings(self.audit_default_settings, modifiers={"audit_keyspaces": "ks,kss"})
|
|
session = self.prepare(helper=helper, audit_settings=audit_settings)
|
|
|
|
session.execute("CREATE KEYSPACE kss WITH replication = { 'class':'NetworkTopologyStrategy', 'replication_factor': 3 }")
|
|
run_in_parallel([{"func": lambda: session.execute("use ks;")}, {"func": lambda: session.execute("use kss;")}] * 1000)
|