The universalasync.wrap function doesn't preserve the type information, which confuses the VS Code Pylance plugin and makes code navigation hard. In this commit we fix the problem by adding a typed wrapped around universalasync.wrap. Fixes: scylladb/scylladb#26639
170 lines
7.0 KiB
Python
170 lines
7.0 KiB
Python
# Copyright 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
|
|
from __future__ import annotations
|
|
|
|
from test.pylib.util import universalasync_typed_wrap
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
|
|
if TYPE_CHECKING:
|
|
from asyncio import AbstractEventLoop
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Callable
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@universalasync_typed_wrap
|
|
class ScyllaLogFile:
|
|
"""Browse a Scylla log file.
|
|
|
|
Based on scylla-ccm implementation of log browsing.
|
|
"""
|
|
def __init__(self, thread_pool: ThreadPoolExecutor, logfile_path: str | Path):
|
|
self.thread_pool = thread_pool # used for asynchronous IO operations
|
|
self.file = Path(logfile_path)
|
|
if not self.file.is_file():
|
|
pytest.fail(f"Log file {self.file.name} does not exist")
|
|
|
|
async def _run_in_executor[T, **P](self,
|
|
func: Callable[P, T],
|
|
*args: P.args,
|
|
loop: AbstractEventLoop | None = None) -> T:
|
|
if loop is None:
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(self.thread_pool, func, *args)
|
|
|
|
async def mark(self) -> int:
|
|
"""Return "a mark" to the current position of this node Scylla log.
|
|
|
|
This is for use with the from_mark parameter of watch_log_for method, allowing to watch the log from
|
|
the position when this method was called.
|
|
"""
|
|
return (await self._run_in_executor(self.file.stat)).st_size
|
|
|
|
async def wait_for(self,
|
|
*exprs: str | re.Pattern[str],
|
|
from_mark: int | None = None,
|
|
timeout: float = 600) -> tuple[int, list[tuple[str, re.Match[str]]]]:
|
|
"""Wait until all patterns appear in the log until the timeout expires.
|
|
|
|
The `exprs` are regular expressions.
|
|
|
|
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
|
|
|
|
The `timeout` (in seconds) may be long, because successful tests will not wait for it. Note, however, that
|
|
long timeouts will make xfailing tests slow.
|
|
|
|
Return a tuple with the last read position and list of tuples with matched lines and re.Match instances.
|
|
"""
|
|
logger.debug("Waiting for log message(s): %s", exprs)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
exprs = [re.compile(pattern) for pattern in exprs]
|
|
matches = []
|
|
|
|
with self.file.open(encoding="utf-8") as log_file:
|
|
if from_mark is not None:
|
|
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
|
|
|
|
async with asyncio.timeout(timeout):
|
|
line = ""
|
|
while exprs:
|
|
# Because it may take time for the log message to be flushed, and sometimes we may want to look
|
|
# for messages about various delayed events, this function doesn't give up when it reaches
|
|
# the end of file, and rather retries until a given timeout.
|
|
line += await self._run_in_executor(log_file.readline, loop=loop)
|
|
if line:
|
|
for pattern in exprs.copy():
|
|
if match := pattern.search(line):
|
|
logger.debug("Found log message: %s", line)
|
|
matches.append((line, match))
|
|
exprs.remove(pattern)
|
|
if line[-1] == "\n":
|
|
line = ""
|
|
else:
|
|
await asyncio.sleep(0.01)
|
|
|
|
return await self._run_in_executor(log_file.tell, loop=loop), matches
|
|
|
|
async def grep(self,
|
|
expr: str | re.Pattern[str],
|
|
filter_expr: str | re.Pattern[str] | None = None,
|
|
from_mark: int | None = None,
|
|
max_count: int = 0) -> list[tuple[str, re.Match[str]]]:
|
|
"""Search the log for lines matching the regular expression.
|
|
|
|
If `filter_expr` argument is given, only lines which do not match it are returned.
|
|
|
|
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
|
|
|
|
If `max_count` is greater than 0, return list of matches when it'll reach `max_count` length.
|
|
|
|
Return a list of tuples (line, match), where line is the full line from the log, and match is the re.Match[str]
|
|
object for the matching expression.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
|
|
expr = re.compile(expr)
|
|
filter_func = re.compile(filter_expr).search if filter_expr else lambda _: False
|
|
matches = []
|
|
|
|
with self.file.open(encoding="utf-8") as log_file:
|
|
if from_mark:
|
|
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
|
|
while line := await self._run_in_executor(log_file.readline, loop=loop):
|
|
if match := not filter_func(line) and expr.search(line):
|
|
matches.append((line, match))
|
|
if len(matches) == max_count:
|
|
break
|
|
|
|
return matches
|
|
|
|
async def grep_for_errors(self,
|
|
distinct_errors: bool = False,
|
|
from_mark: int | None = None) -> list[str] | list[list[str]]:
|
|
"""Search the log for messages with ERROR level.
|
|
|
|
If `distinct_errors` is False then collect all messages after found one till a first message with INFO level.
|
|
Otherwise, add just matched line to a return list.
|
|
|
|
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
|
|
|
|
Return a list of error messages. Error message can be just one line or a list of lines.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Each line in scylla-*.log starts with log level, so, use re.match to search from the beginning of each line.
|
|
error_pattern = re.compile(r"ERROR\b")
|
|
info_pattern = re.compile(r"INFO\b")
|
|
matches = []
|
|
|
|
with self.file.open(encoding="utf-8") as log_file:
|
|
if from_mark:
|
|
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
|
|
line = await self._run_in_executor(log_file.readline, loop=loop)
|
|
while line:
|
|
if error_pattern.match(line):
|
|
if distinct_errors:
|
|
if line not in matches:
|
|
matches.append(line)
|
|
else:
|
|
matches.append([line])
|
|
while line := await self._run_in_executor(log_file.readline, loop=loop):
|
|
if info_pattern.match(line):
|
|
break
|
|
matches[-1].append(line)
|
|
continue
|
|
line = await self._run_in_executor(log_file.readline, loop=loop)
|
|
|
|
return matches
|