diff --git a/test/pylib/resource_gather.py b/test/pylib/resource_gather.py index 3f9b56fce4..dd5ff22cc0 100644 --- a/test/pylib/resource_gather.py +++ b/test/pylib/resource_gather.py @@ -36,6 +36,7 @@ if TYPE_CHECKING: from test.pylib.suite.base import Test as TestPyTest +logger = logging.getLogger(__name__) @lru_cache(maxsize=None) def get_cgroup() -> Path: @@ -45,7 +46,10 @@ def get_cgroup() -> Path: # Extract the relative cgroup for the process and make it absolute and add where the test.py process should be # placed in. # This can be used to manipulate the cgroup's controllers - return Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}/initial") + cgroup_name = Path(f"/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}") + if cgroup_name.stem != 'resource_gather': + cgroup_name = cgroup_name / 'resource_gather' + return cgroup_name CGROUP_INITIAL = get_cgroup() @@ -54,6 +58,13 @@ CGROUP_TESTS = CGROUP_INITIAL.parent / 'tests' class ResourceGather(ABC): def __init__(self, test: TestPyTest): + # get the event loop for the current thread or create a new one if there's none + try: + self.loop = asyncio.get_running_loop() + self.own_loop = False + except RuntimeError: + self.loop = asyncio.new_event_loop() + self.own_loop = True self.test = test self.db_path = self.test.suite.log_dir / DEFAULT_DB_NAME standardized_name = self.test.shortname.replace("/", "_") @@ -62,6 +73,10 @@ class ResourceGather(ABC): ) self.logger = logging.getLogger(__name__) + def __del__(self): + if self.own_loop: + self.loop.close() + def make_cgroup(self) -> None: pass @@ -83,7 +98,7 @@ class ResourceGather(ABC): class ResourceGatherOff(ResourceGather): def cgroup_monitor(self, test_event: Event) -> Task: - return asyncio.create_task(no_monitor()) + return self.loop.create_task(no_monitor()) class ResourceGatherOn(ResourceGather): @@ -109,11 +124,14 @@ class ResourceGatherOn(ResourceGather): test_metrics.time_start = datetime.fromtimestamp(self.test.time_start) test_metrics.time_end = datetime.fromtimestamp(self.test.time_end) test_metrics.success = self.test.success - with open(self.cgroup_path / 'memory.peak', 'r') as file: - test_metrics.memory_peak = file.read() + memory_peak = self.cgroup_path / 'memory.peak' + if memory_peak.exists(): + with open(memory_peak, 'r') as file: + test_metrics.memory_peak = file.read() - if (self.cgroup_path / 'cpu.stat').exists(): - with open(self.cgroup_path / 'cpu.stat', 'r', ) as file: + cpu_stat = self.cgroup_path / 'cpu.stat' + if cpu_stat.exists(): + with open(cpu_stat, 'r', ) as file: self._parse_cpu_stat(file, test_metrics) return test_metrics @@ -123,15 +141,21 @@ class ResourceGatherOn(ResourceGather): def put_process_to_cgroup(self) -> None: super().put_process_to_cgroup() - pid = os.getpid() - with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup: - cgroup.write(str(pid)) + try: + pid = os.getpid() + with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup: + cgroup.write(str(pid)) + except Exception as e: + logger.warning('Test %s is not moved to cgroup: %s', self.test, e) def remove_cgroup(self) -> None: - os.rmdir(self.cgroup_path) + try: + os.rmdir(self.cgroup_path) + except OSError as e: + logger.warning(f'Can\'t delete cgroup directory: {e.strerror}' ) def cgroup_monitor(self, test_event: Event) -> Task: - return asyncio.create_task(self._monitor_cgroup(test_event)) + return self.loop.create_task(self._monitor_cgroup(test_event)) async def _monitor_cgroup(self, test_event: Event) -> None: """Continuously monitors CPU and memory utilization.""" @@ -201,28 +225,31 @@ def setup_cgroup(is_required: bool) -> None: subprocess.run(['sudo', 'chown', '-R', f"{getpass.getuser()}:{getpass.getuser()}", '/sys/fs/cgroup'], check=True) + configured = False for directory in [CGROUP_INITIAL, CGROUP_TESTS]: - if directory.exists(): - os.rmdir(directory) - directory.mkdir() - - with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f: - processes = [x.strip() for x in f.readlines()] - - for process in processes: - with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f: - f.write(str(process)) + if not directory.exists(): + directory.mkdir() + else: + configured = True - with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f: - controllers = f.readline() - controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" "))) + if not configured: + with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f: + processes = [line.strip() for line in f.readlines()] - with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f: - f.write(controllers) + for process in processes: + with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f: + f.write(str(process)) - with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f: - f.write(controllers) + with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f: + controllers = f.readline() + controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" "))) + + with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f: + f.write(controllers) + + with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f: + f.write(controllers) async def monitor_resources(cancel_event: Event, stop_event: Event, tmpdir: Path) -> None: