From ca615af407370b8d9e1bed484f51cd3325877d20 Mon Sep 17 00:00:00 2001 From: Andrei Chekun Date: Wed, 9 Apr 2025 12:11:35 +0200 Subject: [PATCH] test.py: refactor resource_gather.py Refactor resource_gather.py to not create the initial cgroup when the process it's already in it. This will allow not going deeper, creating again and again the same cgroup with each test.py execution when the terminal isn't closed. Add creation of own event loop in case it's not exists. This needed to be able to work with test.py that creates loop and with pytest that not create loop. --- test/pylib/resource_gather.py | 83 +++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 28 deletions(-) diff --git a/test/pylib/resource_gather.py b/test/pylib/resource_gather.py index 3f9b56fce4..dd5ff22cc0 100644 --- a/test/pylib/resource_gather.py +++ b/test/pylib/resource_gather.py @@ -36,6 +36,7 @@ if TYPE_CHECKING: from test.pylib.suite.base import Test as TestPyTest +logger = logging.getLogger(__name__) @lru_cache(maxsize=None) def get_cgroup() -> Path: @@ -45,7 +46,10 @@ def get_cgroup() -> Path: # Extract the relative cgroup for the process and make it absolute and add where the test.py process should be # placed in. # This can be used to manipulate the cgroup's controllers - return Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}/initial") + cgroup_name = Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}") + if cgroup_name.stem != 'resource_gather': + cgroup_name = cgroup_name / 'resource_gather' + return cgroup_name CGROUP_INITIAL = get_cgroup() @@ -54,6 +58,13 @@ CGROUP_TESTS = CGROUP_INITIAL.parent / 'tests' class ResourceGather(ABC): def __init__(self, test: TestPyTest): + # get the event loop for the current thread or create a new one if there's none + try: + self.loop = asyncio.get_running_loop() + self.own_loop = False + except RuntimeError: + self.loop = asyncio.new_event_loop() + self.own_loop = True self.test = test self.db_path = self.test.suite.log_dir / DEFAULT_DB_NAME standardized_name = self.test.shortname.replace("/", "_") @@ -62,6 +73,10 @@ class ResourceGather(ABC): ) self.logger = logging.getLogger(__name__) + def __del__(self): + if self.own_loop: + self.loop.close() + def make_cgroup(self) -> None: pass @@ -83,7 +98,7 @@ class ResourceGather(ABC): class ResourceGatherOff(ResourceGather): def cgroup_monitor(self, test_event: Event) -> Task: - return asyncio.create_task(no_monitor()) + return self.loop.create_task(no_monitor()) class ResourceGatherOn(ResourceGather): @@ -109,11 +124,14 @@ class ResourceGatherOn(ResourceGather): test_metrics.time_start = datetime.fromtimestamp(self.test.time_start) test_metrics.time_end = datetime.fromtimestamp(self.test.time_end) test_metrics.success = self.test.success - with open(self.cgroup_path / 'memory.peak', 'r') as file: - test_metrics.memory_peak = file.read() + memory_peak = self.cgroup_path / 'memory.peak' + if memory_peak.exists(): + with open(memory_peak, 'r') as file: + test_metrics.memory_peak = file.read() - if (self.cgroup_path / 'cpu.stat').exists(): - with open(self.cgroup_path / 'cpu.stat', 'r', ) as file: + cpu_stat = self.cgroup_path / 'cpu.stat' + if cpu_stat.exists(): + with open(cpu_stat, 'r', ) as file: self._parse_cpu_stat(file, test_metrics) return test_metrics @@ -123,15 +141,21 @@ class ResourceGatherOn(ResourceGather): def put_process_to_cgroup(self) -> None: super().put_process_to_cgroup() - pid = os.getpid() - with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup: - cgroup.write(str(pid)) + try: + pid = os.getpid() + with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup: + cgroup.write(str(pid)) + except Exception as e: + logger.warning('Test %s is not moved to cgroup: %s', self.test, e) def remove_cgroup(self) -> None: - os.rmdir(self.cgroup_path) + try: + os.rmdir(self.cgroup_path) + except OSError as e: + logger.warning(f'Can\'t delete cgroup directory: {e.strerror}' ) def cgroup_monitor(self, test_event: Event) -> Task: - return asyncio.create_task(self._monitor_cgroup(test_event)) + return self.loop.create_task(self._monitor_cgroup(test_event)) async def _monitor_cgroup(self, test_event: Event) -> None: """Continuously monitors CPU and memory utilization.""" @@ -201,28 +225,31 @@ def setup_cgroup(is_required: bool) -> None: subprocess.run(['sudo', 'chown', '-R', f"{getpass.getuser()}:{getpass.getuser()}", '/sys/fs/cgroup'], check=True) + configured = False for directory in [CGROUP_INITIAL, CGROUP_TESTS]: - if directory.exists(): - os.rmdir(directory) - directory.mkdir() - - with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f: - processes = [x.strip() for x in f.readlines()] - - for process in processes: - with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f: - f.write(str(process)) + if not directory.exists(): + directory.mkdir() + else: + configured = True - with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f: - controllers = f.readline() - controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" "))) + if not configured: + with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f: + processes = [line.strip() for line in f.readlines()] - with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f: - f.write(controllers) + for process in processes: + with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f: + f.write(str(process)) - with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f: - f.write(controllers) + with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f: + controllers = f.readline() + controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" "))) + + with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f: + f.write(controllers) + + with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f: + f.write(controllers) async def monitor_resources(cancel_event: Event, stop_event: Event, tmpdir: Path) -> None: