Files
scylladb/test/pylib/dockerized_service.py
Calle Wilund 3e8a9a0beb pytest: use ephemeral port publish for docker mock servers
Changes dockerized_service to use ephermal port publish, and
query the published port from podman/docker.
Modifies client code to use slightly changed usage syntax.
2026-03-11 12:32:01 +01:00

122 lines
5.2 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import shutil
import itertools
import asyncio
import pathlib
import re
import os
from typing import Callable
class DockerizedServer:
"""class for running an external dockerized service image, typically mock server"""
# pylint: disable=too-many-instance-attributes
newid = itertools.count(start=1).__next__ # Sequential unique id
def __init__(self, image, tmpdir, logfilenamebase,
success_string : Callable[[str, int], bool] | str,
failure_string : Callable[[str, int], bool] | str,
docker_args : Callable[[str, int], list[str]] | list[str] = [],
image_args : Callable[[str, int], list[str]] | list[str] = [],
host = '127.0.0.1',
port = None):
self.image = image
self.host = host
self.tmpdir = tmpdir
self.logfilenamebase = logfilenamebase
self.docker_args: Callable[[str, int], list[str]] = (lambda host,port : docker_args) if isinstance(docker_args, list) else docker_args
self.image_args: Callable[[str, int], list[str]] = (lambda host,port : image_args) if isinstance(image_args, list) else image_args
self.is_success_line = lambda line, port : success_string in line if isinstance(success_string, str) else success_string
self.is_failure_line = lambda line, port : failure_string in line if isinstance(failure_string, str) else failure_string
self.logfile = None
self.port = None
self.proc = None
self.service_port = port
async def start(self):
"""Starts docker image on a random port"""
exe = pathlib.Path(next(exe for exe in [shutil.which(path)
for path in ["podman", "docker"]]
if exe is not None)).resolve()
sid = f"{os.getpid()}-{DockerizedServer.newid()}"
name = f'{self.logfilenamebase}-{sid}'
while True:
logfilename = (pathlib.Path(self.tmpdir) / name).with_suffix(".log")
self.logfile = logfilename.open("wb")
docker_args = self.docker_args(self.host, self.service_port)
image_args = self.image_args(self.host, self.service_port)
args = ["run", "--name", name, "--rm" ]
if self.service_port is None:
args = args + ["-P"]
else:
args = args + ["-p", str(self.service_port)]
args = args + docker_args + [self.image] + image_args
proc = await asyncio.create_subprocess_exec(exe, *args, stderr=self.logfile)
failed = False
# In any sane world we would just pipe stderr to a pipe and launch a background
# task to just readline from there to both check the start message as well as
# add it to the log (preferrably via logger).
# This works fine when doing this in a standalone python script.
# However, for some reason, when run in a pytest fixture, the pipe will fill up,
# without or reader waking up and doing anyhing, and for any test longer than very
# short, we will fill the stderr buffer and hang.
# I cannot figure out how to get around this, so we workaround it
# instead by directing stderr to a log file, and simply repeatedly
# try to read the info from this file until we are happy.
async with asyncio.timeout(120):
done = False
while not done and not failed:
with logfilename.open("r") as f:
for line in f:
if self.is_success_line(line, self.service_port):
print(f'Got start message: {line}')
done = True
break
if self.is_failure_line(line, self.service_port):
print(f'Got fail message: {line}')
failed = True
break
if failed:
self.logfile.close()
await proc.wait()
continue
check_proc = await asyncio.create_subprocess_exec(exe
, *["container", "port", name]
, stdout=asyncio.subprocess.PIPE
)
while True:
data = await check_proc.stdout.readline()
if not data:
break
s = data.decode()
m = re.search(r"\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+)", s)
if m:
self.port = int(m.group(1))
await check_proc.wait()
if not self.port:
proc.kill()
raise RuntimeError("Could not query port from container")
self.proc = proc
break
async def stop(self):
"""Stops docker image"""
if self.proc:
self.proc.terminate()
await self.proc.wait()
if self.logfile:
self.logfile.close()