test.py: refactor resource_gather.py

Refactor resource_gather.py to not create the initial cgroup when the process it's already in it. This will allow not going deeper, creating again and again the same cgroup with each test.py execution when the terminal isn't closed.
Add creation of own event loop in case it's not exists. This needed to be able to work with
test.py that creates loop and with pytest that not create loop.
This commit is contained in:
Andrei Chekun
2025-04-09 12:11:35 +02:00
parent f279625f59
commit ca615af407

View File

@@ -36,6 +36,7 @@ if TYPE_CHECKING:
from test.pylib.suite.base import Test as TestPyTest
logger = logging.getLogger(__name__)
@lru_cache(maxsize=None)
def get_cgroup() -> Path:
@@ -45,7 +46,10 @@ def get_cgroup() -> Path:
# Extract the relative cgroup for the process and make it absolute and add where the test.py process should be
# placed in.
# This can be used to manipulate the cgroup's controllers
return Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}/initial")
cgroup_name = Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}")
if cgroup_name.stem != 'resource_gather':
cgroup_name = cgroup_name / 'resource_gather'
return cgroup_name
CGROUP_INITIAL = get_cgroup()
@@ -54,6 +58,13 @@ CGROUP_TESTS = CGROUP_INITIAL.parent / 'tests'
class ResourceGather(ABC):
def __init__(self, test: TestPyTest):
# get the event loop for the current thread or create a new one if there's none
try:
self.loop = asyncio.get_running_loop()
self.own_loop = False
except RuntimeError:
self.loop = asyncio.new_event_loop()
self.own_loop = True
self.test = test
self.db_path = self.test.suite.log_dir / DEFAULT_DB_NAME
standardized_name = self.test.shortname.replace("/", "_")
@@ -62,6 +73,10 @@ class ResourceGather(ABC):
)
self.logger = logging.getLogger(__name__)
def __del__(self):
if self.own_loop:
self.loop.close()
def make_cgroup(self) -> None:
pass
@@ -83,7 +98,7 @@ class ResourceGather(ABC):
class ResourceGatherOff(ResourceGather):
def cgroup_monitor(self, test_event: Event) -> Task:
return asyncio.create_task(no_monitor())
return self.loop.create_task(no_monitor())
class ResourceGatherOn(ResourceGather):
@@ -109,11 +124,14 @@ class ResourceGatherOn(ResourceGather):
test_metrics.time_start = datetime.fromtimestamp(self.test.time_start)
test_metrics.time_end = datetime.fromtimestamp(self.test.time_end)
test_metrics.success = self.test.success
with open(self.cgroup_path / 'memory.peak', 'r') as file:
test_metrics.memory_peak = file.read()
memory_peak = self.cgroup_path / 'memory.peak'
if memory_peak.exists():
with open(memory_peak, 'r') as file:
test_metrics.memory_peak = file.read()
if (self.cgroup_path / 'cpu.stat').exists():
with open(self.cgroup_path / 'cpu.stat', 'r', ) as file:
cpu_stat = self.cgroup_path / 'cpu.stat'
if cpu_stat.exists():
with open(cpu_stat, 'r', ) as file:
self._parse_cpu_stat(file, test_metrics)
return test_metrics
@@ -123,15 +141,21 @@ class ResourceGatherOn(ResourceGather):
def put_process_to_cgroup(self) -> None:
super().put_process_to_cgroup()
pid = os.getpid()
with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup:
cgroup.write(str(pid))
try:
pid = os.getpid()
with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup:
cgroup.write(str(pid))
except Exception as e:
logger.warning('Test %s is not moved to cgroup: %s', self.test, e)
def remove_cgroup(self) -> None:
os.rmdir(self.cgroup_path)
try:
os.rmdir(self.cgroup_path)
except OSError as e:
logger.warning(f'Can\'t delete cgroup directory: {e.strerror}' )
def cgroup_monitor(self, test_event: Event) -> Task:
return asyncio.create_task(self._monitor_cgroup(test_event))
return self.loop.create_task(self._monitor_cgroup(test_event))
async def _monitor_cgroup(self, test_event: Event) -> None:
"""Continuously monitors CPU and memory utilization."""
@@ -201,28 +225,31 @@ def setup_cgroup(is_required: bool) -> None:
subprocess.run(['sudo', 'chown', '-R', f"{getpass.getuser()}:{getpass.getuser()}", '/sys/fs/cgroup'],
check=True)
configured = False
for directory in [CGROUP_INITIAL, CGROUP_TESTS]:
if directory.exists():
os.rmdir(directory)
directory.mkdir()
with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f:
processes = [x.strip() for x in f.readlines()]
for process in processes:
with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f:
f.write(str(process))
if not directory.exists():
directory.mkdir()
else:
configured = True
with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f:
controllers = f.readline()
controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" ")))
if not configured:
with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f:
processes = [line.strip() for line in f.readlines()]
with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
for process in processes:
with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f:
f.write(str(process))
with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f:
controllers = f.readline()
controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" ")))
with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
async def monitor_resources(cancel_event: Event, stop_event: Event, tmpdir: Path) -> None: