Files
scylladb/test/pylib/db/writer.py
Evgeniy Naydanov 8ea49092b7 test.py: metrics: add host_id suffix to .db file
CI can run several test.py sessions on different machines (builders) for
one build and, and to be not overwritten, .db file with metrics need to
have some unique name: add host_id as we already do for .xml report in
run_pytest()

Also add host_id columns to metric tables in case we will somehow
aggregate .db files.
2025-08-19 11:33:11 +00:00

211 lines
6.2 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import datetime
import sqlite3
import os
from typing import List
from multiprocessing import Lock
from contextlib import contextmanager
from attr import AttrsInstance, asdict
from test import HOST_ID
TESTS_TABLE = 'tests'
METRICS_TABLE = 'test_metrics'
SYSTEM_RESOURCE_METRICS_TABLE = 'system_resource_metrics'
CGROUP_MEMORY_METRICS_TABLE = 'cgroup_memory_metrics'
DEFAULT_DB_NAME = f'sqlite_{HOST_ID}.db'
DATE_TIME_TEMPLATE = '%Y-%m-%d %H:%M:%S.%f'
create_table = [
f'''
CREATE TABLE IF NOT EXISTS {TESTS_TABLE} (
id INTEGER PRIMARY KEY,
host_id VARCHAR(5) NOT NULL,
architecture VARCHAR(15) NOT NULL,
directory VARCHAR(255),
mode VARCHAR(15) NOT NULL,
run_id INTEGER,
test_name VARCHAR(255) NOT NULL
);
''',
f'''
CREATE TABLE IF NOT EXISTS {METRICS_TABLE} (
id INTEGER PRIMARY KEY,
test_id INT NOT NULL,
host_id VARCHAR(5) NOT NULL,
user_sec REAL,
system_sec REAL,
usage_sec REAL,
memory_peak INTEGER,
time_taken REAL,
time_start DATETIME,
time_end DATETIME,
success BOOLEAN,
FOREIGN KEY(test_id) REFERENCES {TESTS_TABLE}(id)
);
''',
f'''
CREATE TABLE IF NOT EXISTS {SYSTEM_RESOURCE_METRICS_TABLE} (
id INTEGER PRIMARY KEY,
host_id VARCHAR(5) NOT NULL,
memory REAL,
cpu REAL,
timestamp DATETIME
);
''',
f'''
CREATE TABLE IF NOT EXISTS {CGROUP_MEMORY_METRICS_TABLE} (
id INTEGER PRIMARY KEY,
test_id INT NOT NULL,
host_id VARCHAR(5) NOT NULL,
memory REAL,
timestamp DATETIME,
FOREIGN KEY(test_id) REFERENCES {TESTS_TABLE}(id)
);
'''
]
def adapt_datetime_iso(val):
"""Adapt datetime.datetime to timezone-naive ISO 8601 date."""
return val.isoformat()
sqlite3.register_adapter(datetime.datetime, adapt_datetime_iso)
class SQLiteWriter:
_lock = Lock()
def __init__(self, database_path):
"""
Initializes the SQLWriter object.
Args:
database_path: Path to the SQLite database file.
"""
self.database_path = database_path
self.pid = os.getpid()
self._connection = None
self._cursor = None
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('PRAGMA foreign_keys=ON')
cursor.execute('PRAGMA synchronous=off')
for table in create_table:
cursor.execute(table)
conn.commit()
@contextmanager
def get_connection(self):
"""
Context manager for getting a database connection.
Ensures proper handling of connections per process and automatic closing.
"""
current_pid = os.getpid()
# If we're in a new process or don't have a connection, create one
if self._connection is None or self.pid != current_pid:
if self._connection is not None:
self._connection.close()
self._connection = sqlite3.connect(
self.database_path,
detect_types=sqlite3.PARSE_DECLTYPES,
timeout=30
)
self.pid = current_pid
try:
yield self._connection
except Exception as e:
self._connection.rollback()
raise e
@contextmanager
def get_cursor(self):
"""
Context manager for getting a database cursor.
Ensures proper transaction handling and automatic commits.
"""
with self.get_connection() as conn:
cursor = conn.cursor()
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
def write_row(self, model, table_name: str) -> int:
"""
Inserts a single row of data into the specified table.
Args:
model: A AttrsInstance object with a data to insert.
table_name: Name of the table where data is being written.
Return:
int: Returns the ID of the inserted record
"""
with self._lock:
with self.get_cursor() as cursor:
data = asdict(model)
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
values = tuple(data.values())
sql_query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
cursor.execute(sql_query, values)
return cursor.lastrowid
def write_multiple_rows(self, data_list: List[AttrsInstance], table_name: str) -> None:
"""
Inserts multiple rows of data into the specified table.
Args:
data_list: A list of AttrsInstance objects, each representing a row of data.
table_name: Name of the table where data is being written.
"""
for model in data_list:
self.write_row(model, table_name)
def write_row_if_not_exist(self, model, table_name: str) -> int:
"""
Writes a row to the table if it doesn't exist, otherwise returns the existing row's ID.
Args:
model: A AttrsInstance object with a data to insert.
table_name: Name of the table where data is being written.
Return:
int: Returns the ID of the existing or newly inserted record
"""
with self._lock:
with self.get_cursor() as cursor:
data = asdict(model)
values = tuple(data.values())
# Construct the SQL query to retrieve the ID if the record exists
select_query = f"""
SELECT id FROM {table_name} WHERE {
' AND '.join([f"{col} = ?" for col in data.keys()])
}
"""
cursor.execute(select_query, values)
existing_row = cursor.fetchone()
if existing_row:
return existing_row[0]
else:
return self.write_row(model, table_name)