test: audit: fix parsing of syslog messages

Before this commit, there were following issues with parsing of syslog
messages in audit tests:
 - `line_to_row()` function was never called
 - `line_to_row()` was not prepared for changes introduced in
    scylladb#23099 (i.e. key=value pairs)
 - `line_to_row()` didn't handle newlines in queries
 - `line_to_row()` didn't handle "\\" escaping in queries

 Due to the aforementioned issues, the syslog audit tests were failing.
 This commit fixes all of those issues, by parsing each audit syslog
 message using a regexp.
This commit is contained in:
Andrzej Jackowski
2025-06-23 12:01:56 +02:00
parent c8ab5928a3
commit f85e738b11

View File

@@ -11,6 +11,7 @@ import enum
import itertools
import logging
import os.path
import re
import socket
import socketserver
import tempfile
@@ -256,20 +257,24 @@ class AuditBackendSyslog(AuditBackend):
@override
def get_audit_log_list(self, session, consistency_level):
lines = self.unix_socket_listener.get_lines()
return lines
entries = []
for line in lines:
entries.append(self.line_to_row(line))
return entries
def line_to_row(self, line):
metadata, data = line.split(": ", 1)
elems = list(map(lambda x: x[1:], data.split('", ')))
elems[-1] = elems[-1][:-1]
date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
data = "".join(data.splitlines()) # Remove newlines
fields = ["node", "category", "cl", "error", "keyspace", "query", "client_ip", "table", "username"]
regexp = ", ".join(f"{field}=\"(?P<{field}>.*)\"" for field in fields)
match = re.match(regexp, data)
node = elems[0].split(":")[0]
source = elems[6].split(":")[0]
# static uuid is used to keep the same format as the table audit
date = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
node = match.group("node").split(":")[0]
statement = match.group("query").replace("\\", "")
source = match.group("client_ip").split(":")[0]
event_time = uuid.UUID("2e4bc246-fea1-11ee-b73f-51b0926539af")
t = self.named_tuple_factory(date, node, event_time, elems[1], elems[2], elems[3] == "true", elems[4], elems[5], source, elems[7], elems[8])
t = self.named_tuple_factory(date, node, event_time, match.group("category"), match.group("cl"), match.group("error") == "true", match.group("keyspace"), statement, source, match.group("table"), match.group("username"))
return t
def update_socket_path(self):