From 00a55abdd6a04bcfe3ced3ae1333d9d13b103cb2 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Sun, 26 Nov 2023 13:08:36 +0200 Subject: [PATCH] code coverage: Add libraries for coverage handling Coverage handling is divided into 3 steps: 1. Generation of profiling data from a run of an instrumented file (which this patch doesn't cover) 2. Processing of profiling data, which involves indexing the profile and producing the data in some format that can be manipulated and unified. 3. Generate some reporting based on this data. The following patch is aiming to deal with the last two steps by providing a cli and a library for this end. This patch adds two libraries: 1. `coverage_utils.py` which is a library for manipulating coverage data, it also contains a cli for the (assumed) most common operations that are needed in order to eventually generate coverage reporting. 2. `lcov_utils.py` - which is a library to deal with lcov format data, which is a textual form containing a source dependant coverage data. An example of such manipulation can be `coverage diff` operation which produces a set like difference operation. cov_a - cov_b = diff where diff is an lcov formated file containing coverage data for code cov_a that is not covered at all in cov_b. The libraries and cli main goal is to provide a unified way to handle coverage data in a way that can be easily scriptable and extensible. This will pave the way for automating the coverage reporting and processing in test.py and in jenkins piplines (for example to also process dtest or sct coverage reporting) Signed-off-by: Eliran Sinvani --- test/pylib/coverage_utils.py | 1583 ++++++++++++++++++++++++++++++++++ test/pylib/lcov_utils.py | 1202 ++++++++++++++++++++++++++ 2 files changed, 2785 insertions(+) create mode 100755 test/pylib/coverage_utils.py create mode 100644 test/pylib/lcov_utils.py diff --git a/test/pylib/coverage_utils.py b/test/pylib/coverage_utils.py new file mode 100755 index 0000000000..c9d8cc2f35 --- /dev/null +++ b/test/pylib/coverage_utils.py @@ -0,0 +1,1583 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024-present ScyllaDB +# +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import argparse +import asyncio +import multiprocessing +from pathlib import Path, PurePath +from typing import ( + Union, + List, + Dict, + Iterable, + Optional, + Coroutine, + Mapping, + Any, + List, + Self, +) +import re +from enum import Enum +from asyncio.subprocess import PIPE, DEVNULL +from asyncio import Semaphore +from collections.abc import Iterable as IterableType +import os +from collections import namedtuple +from itertools import repeat +from functools import wraps, partial +import logging +import sys +import inspect + +# So the module can be imported outside of this directory +sys.path.insert(0, os.path.dirname(__file__)) +import lcov_utils + +del sys.path[0] +import concurrent.futures +from urllib.parse import quote, unquote + +# NOTE: A lot of the functions in this file uses the form: func(*, param1, param2....) +# This was intentionally done in order to prevent calling those function with positional +# arguments until the API stabilizes. + +# Type definitions for annotation +PathLike = Union[Path, str] +ConcurrencyParam = Optional[Union[int, Semaphore]] +LoggerType = Union[logging.Logger, logging.LoggerAdapter] + +# Logging support for debugging +COVERAGE_TOOLS_LOGGER = logging.getLogger("coverage_utils.py") +COVERAGE_TOOLS_LOGGER.addHandler(logging.NullHandler()) +COVERAGE_TOOLS_LOGGER.propagate = False + +# Utility functions for tracing +_tid = 0 + + +def unique_trace_id(): + global _tid + _tid += 1 + return _tid + + +def trace_function_call( + func, sig: inspect.Signature, __logger__: LoggerType, *args, **kwargs +): + """Extracts the parameters of a function and traces the + call into logger (With trace level) + """ + logger = __logger__ + tid = unique_trace_id() + bound = sig.bind(*args, **kwargs) + non_defaults = list(bound.arguments.keys()) + bound.apply_defaults() + log_str = f"Function call ({tid}):\n{func.__qualname__}(" + for param, value in bound.arguments.items(): + log_str += ( + f"\n\t{param} " + + ("" if param in non_defaults else "(default)") + + f"= {value}," + ) + log_str += ")" + logger.debug(log_str) + return tid + + +def traced_func(f): + """ + A tracing helper which logs function name and parameters so it + is easy to debug this module. + It is quite a modest one: + If the function gets a logger parameter, it will use it to log the function name and params (if it is not None). + Else it becomes a no-op. + The logging is done with debug level for logs and error level for errors (only if debug is enabled in the logger) + """ + sig = inspect.signature(f) + has_logger_param = "logger" in sig.parameters + # if the function takes a logger param + # use it. + if not has_logger_param: + return f + else: + + def extract_logger(*args, **kwargs): + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + return bound.arguments["logger"] + + if inspect.isasyncgenfunction(f): + + @wraps(f) + async def trace_wrapper(*args, **kwargs): + logger: LoggerType = extract_logger(*args, **kwargs) + if not logger or not logger.isEnabledFor(logging.DEBUG): + return await f(*args, **kwargs) + else: + try: + tid = trace_function_call(f, sig, logger, *args, **kwargs) + return await f(*args, **kwargs) + except Exception as e: + logger.error( + f"Function {f.__qualname__} ({tid})exception:\n{e}" + ) + raise e + + return trace_wrapper + else: + + @wraps(f) + def trace_wrapper(*args, **kwargs): + logger: LoggerType = extract_logger(*args, **kwargs) + if not logger or not logger.isEnabledFor(logging.DEBUG): + return f(*args, **kwargs) + else: + tid = trace_function_call(f, sig, logger, *args, **kwargs) + try: + return f(*args, **kwargs) + except Exception as e: + logger.error( + f"Function {f.__qualname__} ({tid})exception:\n{e}" + ) + raise e + + return trace_wrapper + + +_create_subprocess_exec_sig = inspect.signature(asyncio.create_subprocess_exec) + + +@wraps(asyncio.create_subprocess_exec) +async def create_subprocess_exec(*args, logger = COVERAGE_TOOLS_LOGGER, **kwargs): + if not logger or not logger.isEnabledFor(logging.DEBUG): + return await asyncio.create_subprocess_exec(*args, **kwargs) + tid = trace_function_call( + asyncio.create_subprocess_exec, + _create_subprocess_exec_sig, + logger, + *args, + **kwargs, + ) + bound = _create_subprocess_exec_sig.bind(*args, **kwargs) + bound.apply_defaults() + logger.debug( + f"asyncio.create_subprocess_exec ({tid}) going to run:\n" + f"{bound.arguments['program']} {' '.join([str(arg) for arg in bound.arguments['args']])}" + ) + try: + return await asyncio.create_subprocess_exec(*args, **kwargs) + except Exception as e: + logger.error(f"Function asyncio.create_subprocess_exec ({tid})exception:\n{e}") + raise e + + +_create_subprocess_shell_sig = inspect.signature(asyncio.create_subprocess_shell) + + +@wraps(asyncio.create_subprocess_shell) +async def create_subprocess_shell(*args, logger = COVERAGE_TOOLS_LOGGER, **kwargs): + if not logger or not logger.isEnabledFor(logging.DEBUG): + return await asyncio.create_subprocess_shell(*args, **kwargs) + tid = trace_function_call( + asyncio.create_subprocess_shell, + _create_subprocess_shell_sig, + logger, + *args, + **kwargs, + ) + bound = _create_subprocess_shell_sig.bind(*args, **kwargs) + bound.apply_defaults() + logger.debug( + f"asyncio.create_subprocess_shell ({tid}) going to run:\n" + f"{bound.arguments['cmd']}" + ) + try: + return await asyncio.create_subprocess_shell(*args, **kwargs) + except Exception as e: + logger.error(f"Function asyncio.create_subprocess_shell ({tid})exception:\n{e}") + raise e + + +# A set of commands to be used by the FileType enumeration +CONSUME_FIRST_INPUT_FIELD = "cut -d ',' -f1" +IS_PROFILED_COMMAND = ( + "test `eu-readelf -S {} | grep -E '__llvm_cov(map|fun)*[ ]+PROGBITS' | wc -l` -ge 2" +) +IS_DEBUG_ONLY_COMMAND = "eu-readelf -S {} | grep '.text[ ]*NOBITS'" + + +class FileType(Enum): + EXEC_BIN = (re.compile(r"ELF .* executable"), 0) + # This will never match + DEBUG_ONLY_EXEC_BIN = (re.compile(r"(?!x)x"), 1) + # This will never match + PROFILED_EXEC_BIN = (re.compile(r"(?!x)x"), 2) + # This will never match + DEBUG_ONLY_PROFILED_EXEC_BIN = (re.compile(r"(?!x)x"), 3) + EXEC_SO = (re.compile(r"ELF .* shared object"), 4) + # This will never match + DEBUG_ONLY_EXEC_SO = (re.compile(r"(?!x)x"), 5) + # This will never match + PROFILED_EXEC_SO = (re.compile(r"(?!x)x"), 6) + # This will never match + DEBUG_ONLY_PROFILED_EXEC_SO = (re.compile(r"(?!x)x"), 7) + RAW_PROFILE = (re.compile(r".*LLVM raw profile data.*"), 8) + INDEXED_PROFILE = (re.compile(r".*LLVM indexed profile data.*"), 9) + HTML = (re.compile(r".*HTML document.*"), 10) + # keep this definition last so every unrecognized enum will get this + UNRECOGNIZED = (re.compile(r".*"), 100) + + @staticmethod + async def get_file_description(f: PathLike) -> str: + """Gets the file type description from the shell `file` utility + Args: + f (PathLike): _description_ + + Raises: + RuntimeError: if `file` ends with an error + + Returns: + str: file description + """ + f = Path(f) + proc = await create_subprocess_shell( + " | ".join([f"file -b {f}", CONSUME_FIRST_INPUT_FIELD]), + stdout = PIPE, + stderr = PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError( + f"Type info for file {f} couldn't be retrieved: {stderr}" + ) + return stdout.decode().strip() + + @staticmethod + @traced_func + async def is_profiled(f: PathLike) -> bool: + """Checks if the file is profiled by poking into + the sections in the elf + + Args: + f (PathLike): The file to check for profile + + Returns: + bool: True if the file is profiled, False otherwise + """ + proc = await create_subprocess_shell( + IS_PROFILED_COMMAND.format(f), stdout = DEVNULL, stderr = DEVNULL + ) + await proc.wait() + return proc.returncode == 0 + + @staticmethod + @traced_func + async def is_debug_only(f: PathLike) -> bool: + """Checks if the elf file is debug only (no code) by poking into + the sections in the elf + + Args: + f (PathLike): The file to check for profile + + Returns: + bool: True if the file is profiled, False otherwise + """ + proc = await create_subprocess_shell( + IS_DEBUG_ONLY_COMMAND.format(f), stdout = DEVNULL, stderr = DEVNULL + ) + await proc.wait() + return proc.returncode == 0 + + @staticmethod + async def get_file_type(f: PathLike) -> Self: + """Tries to classify the type of a file (one of FileType). + + Args: + f (PathLike): The path to the file to be classified + + Returns: + FileType: One of filetype, if the file couldn't be recognized, + FileType.UNRECOGNIZED will be returned. + """ + file_description = await FileType.get_file_description(f) + for ft in FileType: + if m := ft.value[0].fullmatch(file_description): + if ft in ELF_TYPES: + profiled = "PROFILED_" if await FileType.is_profiled(f) else "" + debug_only = ( + "DEBUG_ONLY_" if await FileType.is_debug_only(f) else "" + ) + return FileType[debug_only + profiled + ft.name] + return ft + return FileType.UNRECOGNIZED + + +ELF_TYPES = [FileType.EXEC_BIN, FileType.EXEC_SO] +PROFILED_ELF_TYPES = [FileType.PROFILED_EXEC_BIN, FileType.PROFILED_EXEC_SO] + +# Convenience functions + +@traced_func +async def gather_limited_concurrency( + *args, + semaphore: Semaphore, + logger: LoggerType = COVERAGE_TOOLS_LOGGER, + **kwargs, +) -> Iterable[Any]: + """A wrapper around asyncio.gather that limits concurrency for the submitted tasks. + This function takes parameters as asyncio.gather with an additional mandatory `semaphore` param that + is the semaphore that will limit the tasks concurrency. + Note: The function looks a bit convoluted, but it is actually a nuance that arises from the semaphore. + Since asyncio.gather, doesn't stop already submitted tasks, it can cause deadlock on the semaphore. So + we should make sure that in case of a failure the all tasks are terminated unconditionally. + """ + + async def with_semaphore(coro: Coroutine): + async with semaphore: + return await coro + + coros = [asyncio.ensure_future(with_semaphore(coro)) for coro in args] + try: + return await asyncio.gather(*coros, **kwargs) + except: + raise + finally: + [coro.cancel() for coro in coros] + +GET_BIN_ID_SHELL_CMD = "eu-readelf -n {} | grep 'Build ID:' | head -1" +@traced_func +async def get_binary_id( + *, path: PathLike, logger: LoggerType = COVERAGE_TOOLS_LOGGER +) -> str: + """A function to get the binary id of an ELF file + + Args: + path (PathLike): A path to the file who's id to extract + logger (LoggerType, optional): The logger to which log information. Defaults to COVERAGE_TOOLS_LOGGER. + + Raises: + RuntimeError: If the readelf command fails for some reason + + Returns: + str: The found id if it exists else None + """ + cmd = GET_BIN_ID_SHELL_CMD.format(path) + proc = await create_subprocess_shell(cmd, stdout = PIPE, stderr = PIPE) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError(f"could not read {path} build id: {stderr.decode()}") + id_fields = stdout.decode().splitlines() + if len(id_fields) == 0: + return None + return id_fields[-1].strip().split(" ")[-1] + +@traced_func +async def get_binary_ids_map( + *, + paths: Iterable[PathLike], + filter: Optional[Iterable[FileType]] = None, + with_types : bool = False, + semaphore: Semaphore = Semaphore(1), + logger: LoggerType = COVERAGE_TOOLS_LOGGER, +) -> Union[Mapping[Path, str], Mapping[Path, tuple[str, FileType]]]: + """Maps given files to their build ids, for paths it recursively finds elfs contained in them + and maps them. + Args: + paths (Iterable[PathLike]): paths for single files or directories or iterable of such: + for every path given: + 1. if it is a file, then it's id will be be mapped + 2. if it is a directory, the directory will be scanned recursively, for elf files + and their ids will be mapped + semaphore (Semaphore, optional): A concurrency limiter of the operation. Defaults to Semaphore(1) (no concurrency). + logger (LoggerType, optional): The logger into which the log information. Defaults to COVERAGE_TOOLS_LOGGER. + Raises: + FileNotFoundError: in case some of the paths for the id scanning + doesn't exist. + + Returns: + Mapping[Path, str]: A mapping from the elf files (Path) to their id. It is assumed the id is unique, however, + duplicate files or stripped and unstriped versions of the same file will have the same build id. + """ + + paths = [Path(p) for p in paths] + dont_exist = [f for f in paths if not f.exists()] + if len(dont_exist) > 0: + err = f"some of the paths for id mappings doesn't exist: {dont_exist}" + logger.error(err) + raise FileNotFoundError(err) + dirs = list({path for path in paths if path.is_dir()}) + files = {path for path in paths if not path.is_dir()} + files_per_dir = [[f for f in dir.rglob("*") if f.is_file() and os.access(f, os.X_OK)] for dir in dirs] + files.update({f for dirfiles in files_per_dir for f in dirfiles}) + files = list(files) + types = await gather_limited_concurrency(*(FileType.get_file_type(f) for f in files), semaphore = semaphore, logger = logger) + if filter: + filter = list(filter) + files = [f for f,ft in zip(files, types) if ft in filter] + build_ids = await gather_limited_concurrency( + *(get_binary_id(path = f, logger = logger) for f in files), + semaphore = semaphore, + logger = logger, + ) + if with_types: + files_to_ids_map = {file: build_id for build_id, file in zip(zip(build_ids, types), files)} + else : + files_to_ids_map = {file: build_id for build_id, file in zip(build_ids, files)} + + return files_to_ids_map + + +GET_PROFILED_BINARIES_SHELL_CMD = ( + "llvm-profdata show --binary-ids {} | sed -n '/^Binary IDs:/,/^.*:/p' | tail -n +2" +) + + +@traced_func +async def get_profiled_binary_ids( + *, path: PathLike, logger: LoggerType = COVERAGE_TOOLS_LOGGER +) -> List[str]: + """For a given file (assumed to be llvm profile, either raw or indexed), get the profiled binary ids. + The reason that can be more than one is if this is a merged profile. + + Args: + path (PathLike): A path to the llvm profile + logger (LoggerType, optional): logger to which log information. Defaults to COVERAGE_TOOLS_LOGGER. + + Raises: + RuntimeError: If llvm-profdata fails for some reason, or, if no profiled binary id couldn't be found + in the profile. + + Returns: + List[str]: A list of binary ids profiled in this profile. + """ + + proc = await create_subprocess_shell( + GET_PROFILED_BINARIES_SHELL_CMD.format(path), + stdout = PIPE, + stderr = PIPE, + stdin = DEVNULL, + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError( + f"Could not get profiled file ids from {path}: {stderr.decode()}" + ) + info = [l.strip() for l in stdout.decode().splitlines()] + if len(info) == 0: + raise RuntimeError( + f"Could not get profiled file ids from {path}: No values found" + ) + return info + + +# The best way to merge profiles is by the file build id that they map, somewhen in the future, +# it might also be desirable to merge profiles from different binaries, but lcov format does it better +# as it is source dependant so for now we will stick to it. +# if more than one id is contained in one of the files, it is going to be merged only with files that contains +# the same composition of ids. +MergeProfilesResult = namedtuple( + "MergeProfilesResult", ["generated_profiles", "error_undeleted_files", "errors"] +) + + +# TODO: Add a "smart" option which will remap symbols and avoid collision in the index +@traced_func +async def merge_profiles( + *, + profiles: Iterable[PathLike], + path_for_merged: PathLike = Path(), + sparse: bool = True, + clear_on_success: bool = False, + semaphore: Semaphore = Semaphore(1), + logger: Union[logging.Logger, logging.LoggerAdapter] = COVERAGE_TOOLS_LOGGER, +) -> MergeProfilesResult: + """A function which takes a list of profiles and merges them by exact profiled binaries match. + The llvm toolchain collection already contains: "llvm profdata merge" however, this function is still necessary + because merging profdata files that contains same named symbols but different binaries are + ambiguous, the ambiguity is only "solved" when merging lcov files since lcov maps lines of code instead of symbols. + + Args: + profiles (Iterable[PathLike]): A list of profiles to merge + path_for_merged (PathLike, optional): A path to a directory for the merged files. Defaults to Path(). + clear_on_success (bool, optional): Remove the original profiles on success. Defaults to False. + semaphore (ConcurrencyParam, optional): concurrency limitation for the operation. Defaults to Semaphore(1) (no concurrency). + logger (Union[logging.Logger, logging.LoggerAdapter], optional): The logger to which log information. Defaults to COVERAGE_TOOLS_LOGGER. + + Returns: + MergeProfilesResult: A result containing the new profiles list, undeleted files due to errors (if clear_on_success is True) + and a list of errors that happened during merge. + It is the user responsibility to the result for errors and act accordingly. + """ + profiles = [Path(p) for p in profiles] + path_for_merged = Path(path_for_merged) + profile_ids = await gather_limited_concurrency( + *(get_profiled_binary_ids(path = profile, logger = logger) for profile in profiles), + semaphore = semaphore, + logger = logger, + ) + [ids.sort() for ids in profile_ids] + profile_merge_map = {} + for ids, profile in zip(profile_ids, profiles): + profile_merge_map.setdefault(tuple(ids), set()).add(profile) + + async def do_merge_profile( + ids: Iterable[str], profiles: Iterable[PathLike] + ): + destination_profile = path_for_merged / ("_".join(ids) + ".profdata") + params = ( + ["merge"] + + (["--sparse"] if sparse else []) + + list(profiles) + + ["-o", destination_profile] + ) + logger.debug( + f"running command: llvm-profdata {' '.join([str(p) for p in params])}" + ) + proc = await create_subprocess_exec( + "llvm-profdata", *params, stderr = PIPE, stdout = DEVNULL + ) + _, stderr = await proc.communicate() + error_undeleted_files = [] + if proc.returncode != 0: + if clear_on_success: + error_undeleted_files.extend(profiles) + return MergeProfilesResult( + [], + error_undeleted_files, + [RuntimeError(f"Could not merge {profiles}: {stderr.decode()}")], + ) + if clear_on_success: + [profile.unlink() for profile in profiles] + return MergeProfilesResult([destination_profile], [], []) + + merging_tasks = [ + do_merge_profile(ids, profiles) for ids, profiles in profile_merge_map.items() + ] + merging_results = await gather_limited_concurrency(*merging_tasks, semaphore = semaphore) + return MergeProfilesResult( + sum([mpr.generated_profiles for mpr in merging_results], []), + sum([mpr.error_undeleted_files for mpr in merging_results], []), + sum([mpr.errors for mpr in merging_results], []), + ) + + +@traced_func +async def profdata_to_lcov( + *, + profiles: Iterable[PathLike], + excludes: Iterable[str] = [], + compilation_dir: Union[PathLike, None] = None, + known_file_ids: Dict[PathLike, str] = {}, + id_search_paths: Iterable[PathLike] = [], + clear_on_success: bool = False, + update_known_ids: bool = True, + semaphore: Semaphore = Semaphore(1), + logger: LoggerType = COVERAGE_TOOLS_LOGGER, +): + """A function to convert an indexed profiles to lcov files. + + Args: + profiles (Iterable[PathLike]): The profiles to be converted to lcov format + excludes (Iterable[str], optional): A list of regex file filter to exclude from the conversion, + an example might be library source code that is not interesting and can bias the coverage + metrics. Defaults to []. + compilation_dir (Union[PathLike, None], optional): The path from which the compilation executed, + this is for files that have been compiled with relative directories embedded. Defaults to None. + known_file_ids (Dict[PathLike, str], optional): A map of known binary file ids to use for the + conversion. Defaults to {}. + id_search_paths (Iterable[PathLike], optional): A list of paths to search binaries in order + to facilitate the conversion. Defaults to []. + clear_on_success (bool, optional): Whether to remove the llvm profiles on successful conversion or not. + Defaults to False. + update_known_ids (bool, optional): Whether to update the known ids map given in `known_file_ids` by + the user. Defaults to True. + semaphore (Semaphore, optional): Concurrency limitation for the operation. Defaults to Semaphore(1) (no concurrency). + logger (LoggerType, optional): logger to which log information. Defaults to COVERAGE_TOOLS_LOGGER. + + Raises: + RuntimeError: If some of the binaries for the conversion couldn't be found (weren't present in `known_file_ids` nor + weren't contained in any of `id_search_paths`), or, if the conversion itself failed for some reason. + + """ + found_ids = await get_binary_ids_map( + paths = id_search_paths, filter = PROFILED_ELF_TYPES, semaphore = semaphore, logger = logger + ) + if not update_known_ids: + known_file_ids = dict(known_file_ids) + + known_file_ids.update(found_ids) + excludes = list(excludes) + exclude_params = sum(map(lambda exclude: ["-ignore-filename-regex", exclude], excludes), []) + profiles = [Path(p) for p in profiles] + # logger.debug(f"going to convert {profiles}") + per_profile_ids = await gather_limited_concurrency( + *(get_profiled_binary_ids(path = profile, logger = logger) for profile in profiles), + semaphore = semaphore, + logger = logger, + ) + # validate that we know all of the files that created the profiles + profile_ids_set = set() + [profile_ids_set.update(ids) for ids in per_profile_ids] + known_ids_set = set(known_file_ids.values()) + id_to_files_map = {v: k for k, v in known_file_ids.items()} + if not profile_ids_set.issubset(known_ids_set): + missing_ids = profile_ids_set.difference(known_ids_set) + raise RuntimeError( + f"Some of the profiles contain ids which their files are not known {missing_ids}" + ) + constant_conversion_params = ["export", "--format", "lcov"] + exclude_params + if compilation_dir is not None: + constant_conversion_params += ["--compilation-dir", Path(compilation_dir)] + constant_conversion_params += ["-instr-profile"] + + async def do_profdata_to_lcov(profile, profile_ids): + objects_params = " -object ".join( + [str(id_to_files_map[id]) for id in profile_ids] + ).split(" ") + conversion_params = constant_conversion_params + [profile] + objects_params + with open(profile.with_suffix(".info"), "w") as lcov_file: + logger.debug( + f"command: llvm-cov {' '.join([str(p) for p in conversion_params])}" + ) + proc = await create_subprocess_exec( + "llvm-cov", *conversion_params, stdout = lcov_file, stderr = PIPE + ) + _, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError( + f"Failed to create {profile.with_suffix('.info')} : {stderr.decode()}" + ) + if clear_on_success: + profile.unlink() + + conversion_tasks = [ + do_profdata_to_lcov(profile, profile_ids) + for profile, profile_ids in zip(profiles, per_profile_ids) + ] + await gather_limited_concurrency(*conversion_tasks, semaphore = semaphore) + + +LCOV_INCLUDE_BRANCH_DATA_PARAMS = ["--rc", "lcov_branch_coverage=1"] +LCOV_TAG_WITH_TEST_SHELL_CMD = ( + "sed 's/^TN:.*/TN:{test_name}/g' {input_lcov} > {output_lcov}" +) + +@traced_func +async def lcov_combine_traces( + *, + lcovs: Iterable[PathLike], + output_lcov: Optional[PathLike] = None, + test_tag: Optional[str] = None, + clear_on_success: bool = False, + files_per_chunk: Union[int, None] = None, + semaphore: Semaphore = Semaphore(1), + logger: LoggerType = COVERAGE_TOOLS_LOGGER, +): + """A function to combine lcov traces, the main advantage of this function over just running the lcov command + from the command line is that this function can parallelize the process, especially when a lot of lcov files are + merged. + + Args: + lcovs (Iterable[PathLike]): A list of source lcov trace files to merge + output_lcov (PathLike): the final output lcov file + branch_coverage (bool, optional): Wether to include branch coverage data or not (if exists). Defaults to True. + files_per_chunk (Union[int, None], optional): How many files to combine per parallel task. Defaults to None. + concurrency (ConcurrencyParam, optional): A concurrency limiting parameter for the execution. Defaults to None. + logger (LoggerType, optional): A logger to which log information. Defaults to COVERAGE_TOOLS_LOGGER. + + Raises: + RuntimeError: If one of the parallel merges fails for any reason. + """ + loop = asyncio.get_running_loop() + + lcovs = [Path(lcov) for lcov in lcovs] + + if files_per_chunk is None or files_per_chunk > len(lcovs): + files_per_chunk = len(lcovs) + + def merge_lcovs(lcov_spec: List[Union[lcov_utils.LcovFile, Path]]): + lcov_objs: List[lcov_utils.LcovFile] = [] + if (len(lcov_spec) == 1) and isinstance(lcov_spec[0], lcov_utils.LcovFile): + return lcov_spec[0] + for lcov in lcov_spec: + if isinstance(lcov, Path): + lcov_objs.append(lcov_utils.LcovFile(lcov)) + else: + lcov_objs.append(lcov) + lcov_result = lcov_utils.LcovFile() + + for lcov in lcov_objs: + if test_tag: + lcov.tag_with_test(test_tag) + lcov_result.union(lcov) + return lcov_result + + files_to_merge = lcovs + # Consume all of the available concurrency in the semaphore + concurrency = 0 + while not semaphore.locked(): + await semaphore.acquire() + concurrency += 1 + try: + with concurrent.futures.ThreadPoolExecutor(concurrency) as executor: + while len(files_to_merge) > 1: + files_to_merge = [ + files_to_merge[i : i + files_per_chunk] + for i in range(0, len(files_to_merge), files_per_chunk) + ] + merge_funcs = [partial(merge_lcovs, chunk) for chunk in files_to_merge] + # We use "normal" gather here since we have the concurrency limited by the executor + files_to_merge = await asyncio.gather( + *(loop.run_in_executor(executor, func) for func in merge_funcs) + ) + result: List[lcov_utils.LcovFile] = await loop.run_in_executor( + executor, partial(merge_lcovs, files_to_merge) + ) + if output_lcov: + result.write(output_lcov) + if clear_on_success: + for lcov in lcovs: + if isinstance(lcov, Path): + lcov.unlink() + else: + return result + finally: + # Release all consumed concurrency back into the semaphore + for _ in range(concurrency): + semaphore.release() + +@traced_func +async def html_fixup(*, html_dir: Path): + """Fix genhtml generated links, there is a bug in genhtml where it doesn't properly encode links to + files names that contain url illegal characters + """ + html_files = [f for f in html_dir.rglob("*.html") if f.is_file()] + href_re = re.compile(r'href=".*\.html">') + for html_file in html_files: + with open(html_file, "r") as f: + content = f.read() + hrefs = href_re.findall(content) + hrefs_to_replace = {} + for href in hrefs: + if href in hrefs_to_replace: + continue + new_href = 'href="' + quote(unquote(href[6:-2])) + href[-2:] + if new_href != href: + hrefs_to_replace[href] = new_href + if len(hrefs_to_replace) > 0: + for old_href, new_href in hrefs_to_replace.items(): + content = content.replace(old_href, new_href) + with open(html_file, "w") as f: + f.write(content) + + +@traced_func +async def merge_profiles_cmd(args): + profiles = [Path(p) for p in args.profiles] + result_path = Path(args.result_path) + if not result_path.exists(): + result_path.mkdir(parents = True, exist_ok = True) + await merge_profiles( + profiles = profiles, + path_for_merged = result_path, + clear_on_success = args.clear_on_success, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER, + ) + + +@traced_func +async def prof_to_lcov_cmd(args): + known_ids = await get_binary_ids_map( + paths = [Path(bsp) for bsp in args.binary_search_path], + filter = PROFILED_ELF_TYPES, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER, + ) + profiles = [Path(p) for p in args.profiles] + excludes = set() if args.excludes is None else set(args.excludes) + if args.excludes_file: + excludes.update( + { + line + for line in open(args.excludes_file, "r").read().split("\n") + if line and not line.startswith("#") + } + ) + excludes = list(excludes) + await profdata_to_lcov( + profiles = profiles, + excludes = excludes, + compilation_dir = args.compilation_dir, + known_file_ids = known_ids, + clear_on_success = args.clear_on_success, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER, + ) + + +@traced_func +async def merge_lcov_files_cmd(args): + output_trace = Path(args.output_trace) + lcovs = [Path(lcov) for lcov in args.lcov_files] + merged: lcov_utils.LcovFile = await lcov_combine_traces( + lcovs = lcovs, + test_tag = args.testname, + clear_on_success = args.clear_on_success, + files_per_chunk = args.files_per_chunk, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER, + ) + if args.filter: + exclude_line = args.exclude_line if args.exclude_line else None + exclude_start = args.exclude_start if args.exclude_start else None + exclude_end = args.exclude_end if args.exclude_end else None + exclude_branch = args.exclude_branch if args.exclude_branch else None + exclude_branch_start = ( + args.exclude_branch_start if args.exclude_branch_start else None + ) + exclude_branch_end = ( + args.exclude_branch_end if args.exclude_branch_end else None + ) + assert not (bool(exclude_start) ^ bool(exclude_end)) + assert not (bool(exclude_branch_start) ^ bool(exclude_branch_end)) + if exclude_start: + assert exclude_start != exclude_end + if exclude_branch_start: + assert exclude_branch_start != exclude_branch_end + merged.filter_by_source_tags( + LCOV_EXCL_LINE = exclude_line, + LCOV_EXCL_START = exclude_start, + LCOV_EXCL_STOP = exclude_end, + LCOV_EXCL_BR_LINE = exclude_branch, + LCOV_EXCL_BR_START = exclude_branch_start, + LCOV_EXCL_BR_STOP = exclude_branch_end, + ) + merged.write(output_trace) + if args.clear_on_success: + [lcov.unlink() for lcov in lcovs if lcov.exists()] + + +@traced_func +async def list_build_ids_cmd(args): + paths = [Path(p) for p in args.paths] + file_to_id_map = await get_binary_ids_map( + paths = paths, + with_types = True, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER + ) + max_id_len = ( + max( + *[ + len(id) if id is not None else len("Not found") + for (id,_) in file_to_id_map.values() + ] + ) + + 5 + ) + fmt_str = f"{{id: <{max_id_len}}}{{file}}{' '*5}({{ftype}})" + for file, (id, ftype) in file_to_id_map.items(): + print( + fmt_str.format( + id = id if id is not None else "Not Found", + file = str(file), + ftype = ftype.name, + ) + ) + +@traced_func +async def coverage_diff_cmd(args): + diff_trace = await lcov_combine_traces( + lcovs = args.diff_tracefiles, + files_per_chunk = args.files_per_chunk, + clear_on_success = False, + semaphore = args.concurrency, + logger = COVERAGE_TOOLS_LOGGER, + ) + base_lcov = lcov_utils.LcovFile(args.base_tracefile) + base_lcov.difference(diff_trace) + base_lcov.write(Path(args.output_trace)) + + +@traced_func +async def coverage_intersection_cmd(args): + trace_files = list(args.tracefiles) + result = lcov_utils.LcovFile(trace_files[0]) + for trace_file in trace_files[1:]: + result.intersection(lcov_utils.LcovFile(trace_file)) + result.write(args.output_trace) + + +@traced_func +async def coverage_symmetric_diff_cmd(args): + result = lcov_utils.LcovFile(args.tracefiles[0]) + result.symmetric_difference(lcov_utils.LcovFile(args.tracefiles[1])) + result.write(args.output_trace) + + +async def patch_coverage_cmd(args): + base_commit = args.base_commit + proc = await create_subprocess_shell("git diff --quiet") + await proc.wait() + output_dir: Path = args.output_dir + dirty = bool(proc.returncode != 0) + if base_commit: # if we were given base commits we should generate the patches + + def get_next_numbered_patch_name(): + last_split = (list(sorted(output_dir.glob("*.patch")))[-1]).stem + last_split_len = len(last_split) + next_fn = str(int(last_split) + 1) + next_fn = ("0" * (last_split_len - len(next_fn))) + next_fn + return next_fn + + if args.merge: + proc = await create_subprocess_shell( + "git show --summary HEAD | grep -q ^Merge" + ) + await proc.wait() + assert ( + proc.returncode == 0 + ), "Head is not a merge commit but --merge/-m option was given" + if dirty: + assert ( + args.dirty + ), "The git repository has modified files but --dirty option not given" + output_dir.mkdir(parents = True, exist_ok = True) + if args.clear_output_dir: + [f.unlink() for f in list(output_dir.glob("*.patch"))] + else: + assert ( + len(list(output_dir.glob("*.patch"))) == 0 + ), f"{output_dir} is not empty, patches should be created in a directory that doesn't contain any *.patch files" + coverage_commit = "" + # if this is a merge commit we should: + # 1. generate the commits leading up to HEAD^ from the fork point + # 2. generate the diff commit between HEAD^ and HEAD + if args.merge: + coverage_commit = f"HEAD^{args.merge_parent}" + proc = await create_subprocess_shell( + f"git merge-base {base_commit} {coverage_commit}", stdout = PIPE + ) + stdout, _ = await proc.communicate() + assert proc.returncode == 0, "Couldn't determine the fork point of HEAD" + base_commit = stdout.decode().strip() + + proc = await create_subprocess_shell( + f"git log -m --first-parent -p -z --reverse --format=medium {base_commit}..{coverage_commit} | sed 's/\\x00/\\x01\\n/g' | csplit -s --prefix=\"{output_dir}/\" --suffix=\"%02d.patch\" --suppress-matched - $'/\\x01/' '{{*}}'" + ) + await proc.wait() + proc = await create_subprocess_shell( + f"git log -m --first-parent --reverse --format=medium {base_commit}..{coverage_commit} --format=\"%s\" | sed -E 's/(.{1,60}).*/\1/g'", + stdout = PIPE, + ) + stdout, _ = await proc.communicate() + names = stdout.decode().splitlines() + if args.merge: + next_patch = output_dir / f"{get_next_numbered_patch_name()}.patch" + proc = await create_subprocess_shell( + f"git log HEAD -1 --format=medium > {next_patch}" + ) + await proc.wait() + proc = await create_subprocess_shell( + f"git diff {coverage_commit}..HEAD >> {next_patch}" + ) + await proc.wait() + proc = await create_subprocess_shell( + f"git log HEAD -1 --format=medium --format=\"%s\" | sed -E 's/(.{1,60}).*/\1/g'", + stdout = PIPE, + ) + stdout, _ = await proc.communicate() + merge_name = stdout.decode().splitlines()[0] + names.append(merge_name) + + # git log --format="%s" | sed -E 's/(.{1,60}).*/\1/g' + names = [name.replace("/", "\\") for name in names] + patches = list(output_dir.glob("*.patch")) + patches.sort() + dirty_name = get_next_numbered_patch_name() + patches = [ + f.rename(f.with_stem(f.stem + " - " + p_name)) + for f, p_name in zip(patches, names) + ] + if dirty: + dirty_name = dirty_name + " - uncommitted changes.patch" + dirty_file = output_dir / dirty_name + proc = await create_subprocess_shell(f"git diff > '{dirty_file}'") + await proc.wait() + patches.append(dirty_file) + else: # Patches already generated we should only map them + patches = list(output_dir.glob("*.patch")) + patches.sort() + remapped_tracefile = lcov_utils.LcovFile(args.tracefile) + remapped_tracefile.remap_to_patches(patches) + if args.pseudo_patch_for_uncovered: + covered_patches = {key[1] for key in remapped_tracefile.records.keys()} + uncovered_patches = set(patches) - covered_patches + for uncovered_patch in uncovered_patches: + preamble = "This patch either touched only unprofiled code, was overridden entirely by a later patch or only removed lines" + pseudo_patch = uncovered_patch.with_suffix(".patch.uncovered") + proc = await create_subprocess_shell( + f"cat <(echo '{preamble}') <(cat '{uncovered_patch}') > '{pseudo_patch}'" + ) + await proc.wait() + pseudo_record = lcov_utils.LcovRecord() + pseudo_record.source_file = pseudo_patch + pseudo_record.line_hits[1] = 1 + remapped_tracefile.add_record(pseudo_record) + remapped_tracefile.write(args.output_trace) + + +async def html_fixup_cmd(args): + await html_fixup(html_dir = args.html_dir) + + +async def genhtml_cmd(args): + genhtml_options = ["genhtml", "--output-directory", f"'{args.output_dir}'"] + if not args.verbose: + genhtml_options.append("--quiet") + if args.title is not None: + genhtml_options.extend(["--title", f"'{args.title}'"]) + if not args.no_legend: + genhtml_options.append("--legend") + if not args.no_function_coverage: + genhtml_options.append("--function-coverage") + if not args.no_branch_coverage: + genhtml_options.append("--branch-coverage") + if not args.no_cpp_demangle: + genhtml_options.append("--demangle-cpp") + if args.ignore_errors: + genhtml_options.extend(["--ignore-errors", "'source'"]) + if args.negate: + genhtml_options.append("--missed") + genhtml_options.extend(["--rc", f"'genhtml_hi_limit={args.high_limit}'"]) + genhtml_options.extend(["--rc", f"'genhtml_med_limit={args.med_limit}'"]) + genhtml_options.extend([str(f) for f in args.tracefiles]) + proc = await create_subprocess_shell(" ".join(genhtml_options)) + await proc.wait() + await html_fixup(html_dir = args.output_dir) + + +def recursively_print_help(p: argparse.ArgumentParser, indent = 0): + help_lines = p.format_help().splitlines() + help_lines = [("\t" * indent) + l for l in help_lines] + print("\n".join(help_lines)) + subparsers_actions = [ + action + for action in p._actions + if isinstance(action, argparse._SubParsersAction) + ] + sp_action: argparse._SubParsersAction + for sp_action in subparsers_actions: + for sp in sp_action.choices.values(): + recursively_print_help(sp, indent = indent + 1) + + +async def print_help(args): + recursively_print_help(args.parser) + + +async def main(): + root = logging.getLogger() + root.setLevel(logging.DEBUG) + handler = logging.StreamHandler(sys.stderr) + formatter = logging.Formatter( + "%(asctime)s - %(filename)s:%(lineno)d - %(module)s:%(funcName)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + COVERAGE_TOOLS_LOGGER.addHandler(handler) + root.addHandler(handler) + COVERAGE_TOOLS_LOGGER.setLevel(logging.DEBUG) + parser = argparse.ArgumentParser( + description = "A collection of tools to handle llvm coverage data", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--log-level", + choices = logging._nameToLevel.keys(), + action = "store", + default = logging._levelToName[logging.INFO], + help = f"logging level of the tool", + ) + parser.add_argument( + "--concurrency", + action = "store", + default = max(int(multiprocessing.cpu_count() * 0.75), 1), + type = int, + help = "The concurrency to use for parallel operations.", + ) + parser.set_defaults(func = print_help, parser = parser) + subparsers = parser.add_subparsers() + llvm_profile_commands = subparsers.add_parser( + "llvm-profiles", help = "Handle and manipulate llvm profiles" + ) + llvm_profile_commands.set_defaults(func = print_help, parser = llvm_profile_commands) + llvm_commands_subparsers = llvm_profile_commands.add_subparsers() + + # List build ids can help determine the right build id for parsing llvm profiles + list_build_ids_parser = llvm_commands_subparsers.add_parser( + "list-build-ids", help = "list build ids for the given paths" + ) + list_build_ids_parser.add_argument( + "paths", + metavar = "PATH", + help = "A folder to be searched for (recursively) or a specific file to get an build id for", + action = "store", + type = Path, + nargs = "+", + ) + list_build_ids_parser.set_defaults(func = list_build_ids_cmd) + + # Convert raw profiles into a profdata + raw_to_indexed_parser = llvm_commands_subparsers.add_parser( + "merge", + help = "merge and convert raw and indexed profiles to a unified profile", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + raw_to_indexed_parser.add_argument( + "profiles", + metavar = "PROFILE", + help = "a raw or indexed profile to merge into the output merged profile", + action = "store", + type = Path, + nargs = "+", + ) + raw_to_indexed_parser.add_argument( + "result_path", + metavar = "PROFDATA", + type = Path, + help = "The path to which the merged indexed data will saved", + ) + raw_to_indexed_parser.add_argument( + "-c", + "--clear-on-success", + action = "store_true", + default = False, + help = "Wether to clear the raw profiles on success", + ) + raw_to_indexed_parser.set_defaults(func = merge_profiles_cmd) + + # Convert indexed profiles into lcov trace files + prof_to_lcov_parser = llvm_commands_subparsers.add_parser( + "to-lcov", + help = "convert indexed profiles into lcov trace files", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + prof_to_lcov_parser.add_argument( + "profiles", + metavar = "PROFILE", + help = "an indexed profile to be converted to an lcov trace file", + action = "store", + type = Path, + nargs = "+", + ) + prof_to_lcov_parser.add_argument( + "-c", + "--clear-on-success", + action = "store_true", + default = False, + help = "Wether to clear the indexed profiles on success", + ) + prof_to_lcov_parser.add_argument( + "--exclude", + "-e", + action = "append", + type = str, + dest = "excludes", + help = "Regex patterns for excluding files from coverage", + ) + prof_to_lcov_parser.add_argument( + "--excludes-file", + "--ef", + action = "store", + type = Path, + help = "A file containing a list of regexes to exclude", + ) + prof_to_lcov_parser.add_argument( + "-b", + "--binary-search-path", + type = Path, + action = "append", + required = True, + help = "The path or paths to search for binaries to use in the conversion.", + ) + prof_to_lcov_parser.add_argument( + "--compilation-dir", + action = "store", + type = Path, + default = Path(), + help = "The compilation directory for the binaries (for files compiled with relative paths mapping)", + ) + prof_to_lcov_parser.set_defaults(func = prof_to_lcov_cmd) + + lcov_trace_commands = subparsers.add_parser( + "lcov-tools", help = "Handle and manipulate lcov tracefiles" + ) + lcov_trace_commands_subparsers = lcov_trace_commands.add_subparsers() + lcov_trace_commands.set_defaults(func = print_help, parser = lcov_trace_commands) + merge_lcov_files_parser = lcov_trace_commands_subparsers.add_parser( + "union", + help = "Merges several (or single) lcov file into another trace file. If testname is given, the resulting lcov file will be tagged with " + "this name, else if will just merge the files similarly to 'lcov -a...' command. Files can also be filtered (see 'man lcovrc')", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + merge_lcov_files_parser.add_argument( + "lcov_files", + metavar = "TRACE_FILE", + help = "an lcov trace file to be merged into the output trace file", + action = "store", + type = Path, + nargs = "+", + ) + merge_lcov_files_parser.add_argument( + "output_trace", + metavar = "OUTPUT_TRACE_FILE", + type = Path, + help = "The path to which the merged lcov trace data will saved", + ) + merge_lcov_files_parser.add_argument( + "-c", + "--clear-on-success", + action = "store_true", + default = False, + help = "Whether to clear the original lcov files upon success", + ) + merge_lcov_files_parser.add_argument( + "--testname", + action = "store", + type = str, + default = None, + help = "An optional testname to tag all records in the output", + ) + merge_lcov_files_parser.add_argument( + "--files-per-chunk", + action = "store", + type = int, + default = 4, + help = "The maximal number of files to merge at once (for performance tweaking)", + ) + merge_lcov_files_parser.add_argument( + "--filter", + "-f", + action = "store_true", + help = "Apply filter to the result (see: 'man lcovrc')", + ) + merge_lcov_files_parser.add_argument( + "--exclude-line", + "--el", + action = "store", + type = str, + help = "Tag for line exclusion, empty string for None (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_LINE_DEFAULT, + ) + merge_lcov_files_parser.add_argument( + "--exclude-start", + "--es", + action = "store", + type = str, + help = "Tag for line exclusion block start, empty string for None, (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_START_DEFAULT, + ) + merge_lcov_files_parser.add_argument( + "--exclude-end", + "--ee", + action = "store", + type = str, + help = "Tag for line exclusion block end, empty string for None (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_STOP_DEFAULT, + ) + merge_lcov_files_parser.add_argument( + "--exclude-branch", + "--eb", + action = "store", + type = str, + help = "Tag branch exclusion, empty string for None (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_BR_LINE_DEFAULT, + ) + merge_lcov_files_parser.add_argument( + "--exclude-branch-start", + "--ebs", + action = "store", + type = str, + help = "Tag for branch exclusion block start, empty string for None (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_BR_START_DEFAULT, + ) + merge_lcov_files_parser.add_argument( + "--exclude-branch-end", + "--ebe", + action = "store", + type = str, + help = "Tag for branch exclusion block end, empty string for None (when --filter is given)", + default = lcov_utils.LcovFile.LCOV_EXCL_BR_STOP_DEFAULT, + ) + merge_lcov_files_parser.set_defaults(func = merge_lcov_files_cmd) + + lcov_diff_parser = lcov_trace_commands_subparsers.add_parser( + "diff", + help = "computes the diff between two or more coverage files (lines that are covered by first but not others)", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + lcov_diff_parser.add_argument( + "--output-trace", + "-o", + help = "The output file to write the result into", + type = Path, + required = True, + ) + lcov_diff_parser.add_argument( + "--files-per-chunk", + action = "store", + type = int, + default = 4, + help = "The max number of files to merge at once (for performance tweaking)", + ) + lcov_diff_parser.add_argument( + "base_tracefile", + action = "store", + type = Path, + help = "The base line trace - the file which we want to diff with all others", + ) + lcov_diff_parser.add_argument( + "diff_tracefiles", + action = "store", + type = Path, + nargs = "+", + help = "The tracefiles to subtracted from the base trace", + ) + lcov_diff_parser.set_defaults(func = coverage_diff_cmd) + + lcov_intersection_parser = lcov_trace_commands_subparsers.add_parser( + "intersection", + help = "computes the intersection between two or more coverage files (lines that are covered by all trace files)", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + lcov_intersection_parser.add_argument( + "--output-trace", + "-o", + help = "The output file to write the result into", + type = Path, + required = True, + ) + lcov_intersection_parser.add_argument( + "--files-per-chunk", + action = "store", + type = int, + default = 4, + help = "The max number of files to merge at once (for performance tweaking)", + ) + lcov_intersection_parser.add_argument( + "tracefiles", + action = "store", + type = Path, + nargs = "+", + help = "The tracefiles to subtract from the base trace", + ) + lcov_intersection_parser.set_defaults(func = coverage_intersection_cmd) + + lcov_symmetric_diff_parser = lcov_trace_commands_subparsers.add_parser( + "symmetric-dff", + help = "computes the symmetric difference between two traces (line covered by either trace but not both)", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + lcov_symmetric_diff_parser.add_argument( + "--output-trace", + "-o", + help = "The output file to write the result into", + type = Path, + required = True, + ) + lcov_symmetric_diff_parser.add_argument( + "tracefiles", + action = "store", + type = Path, + nargs = 2, + help = "The tracefiles to be subtracted", + ) + lcov_symmetric_diff_parser.set_defaults(func = coverage_symmetric_diff_cmd) + + patch_coverage_parser = lcov_trace_commands_subparsers.add_parser( + "git-patch-coverage", + help = "Transform a a source coverage tracefile into a patch coverage tracefile", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + patch_coverage_parser.add_argument( + "--output-trace", + "-o", + help = "The output file to write the result into", + type = Path, + required = True, + ) + patch_coverage_parser.add_argument( + "--tracefile", + "-t", + type = Path, + action = "store", + help = "The tracefile to transform", + ) + patch_coverage_parser.add_argument( + "--base-commit", + "-b", + type = str, + default = None, + action = "store", + help = "The base commit for the patch coverage", + ) + patch_coverage_parser.add_argument( + "--output-dir", + "-d", + type = Path, + required = True, + action = "store", + help = "The directory to create the patches in", + ) + patch_coverage_parser.add_argument( + "--dirty", + action = "store_true", + help = "Wether to include a final meta patch which is the uncommitted changes to the environment," + "if this parameter is not given and there are uncommitted changes in the repo, the command will fail. (not including untracked files)", + ) + patch_coverage_parser.add_argument( + "--clear-output-dir", + "-c", + action = "store_true", + help = "Remove any previous .patch file in output dir", + ) + patch_coverage_parser.add_argument( + "--pseudo-patch-for-uncovered", + "-p", + action = "store_true", + help = "Create a pseudo patch for uncovered patch files and make them appear as 100%% covered", + ) + patch_coverage_parser.add_argument( + "--merge", "-m", action = "store_true", help = "Coverage report for merge commit" + ) + patch_coverage_parser.add_argument( + "--merge-parent", + "--mp", + type = int, + default = 2, + action = "store", + choices = [1, 2], + help = "The parent of the merge commit to generate coverege for", + ) + patch_coverage_parser.set_defaults(func = patch_coverage_cmd) + + genhtml_parser = lcov_trace_commands_subparsers.add_parser( + "genhtml", + help = "Generate and fixup html pages", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + genhtml_parser.add_argument( + "--output-dir", + "-o", + type = Path, + required = True, + action = "store", + help = "The folder in which to generate the html report", + ) + genhtml_parser.add_argument( + "--verbose", + "-v", + action = "store_true", + help = "Output everything instead of just warnings and errors", + ) + genhtml_parser.add_argument( + "--title", + "-t", + action = "store", + type = str, + default = None, + help = "The title of the coverage run for example: 'Unit tests run'", + ) + genhtml_parser.add_argument( + "--no-legend", "--nl", action = "store_true", help = "Don't create legend" + ) + genhtml_parser.add_argument( + "--no-function-coverage", + "--nf", + action = "store_true", + help = "Don't report function coverage", + ) + genhtml_parser.add_argument( + "--no-branch-coverage", + "--nb", + action = "store_true", + help = "Don't report branch coverage", + ) + genhtml_parser.add_argument( + "--no-cpp-demangle", + "--nd", + action = "store_true", + help = "Don't demangle function names", + ) + genhtml_parser.add_argument( + "--ignore-errors", + "-i", + action = "store_true", + help = "Ignore 'source wasn't found' errors", + ) + genhtml_parser.add_argument( + "--negate", + "-n", + action = "store_true", + help = "Make the report missed coverage centric instead of coverage centric", + ) + genhtml_parser.add_argument( + "--high-limit", + "--hl", + action = "store", + type = int, + default = 90, + help = "The high limit for reporting (high coverage)", + ) + genhtml_parser.add_argument( + "--med-limit", + "--ml", + action = "store", + type = int, + default = 75, + help = "The medium limit for reporting (medium coverage)", + ) + genhtml_parser.add_argument( + "tracefiles", + metavar = "tracefile", + action = "store", + type = Path, + nargs = "+", + help = "The tracefiles to generate the reports from", + ) + genhtml_parser.set_defaults(func = genhtml_cmd) + html_fixup_parser = lcov_trace_commands_subparsers.add_parser( + "html-fixup", + help = "Fix genhtml broken links for non standard file name (for example that contains '#')", + formatter_class = argparse.ArgumentDefaultsHelpFormatter, + ) + html_fixup_parser.add_argument( + "--html-dir", + "-d", + type = Path, + required = True, + action = "store", + help = "The folder containing the html report generated by htmlgen", + ) + + html_fixup_parser.set_defaults(func = html_fixup_cmd) + + help_parser = subparsers.add_parser("help", help = "Print a full help message") + help_parser.set_defaults(func = print_help, parser = parser) + args = parser.parse_args() + args.concurrency = Semaphore(args.concurrency) + loglevel = logging._nameToLevel[args.log_level] + COVERAGE_TOOLS_LOGGER.setLevel(loglevel) + [handler.setLevel(loglevel) for handler in COVERAGE_TOOLS_LOGGER.handlers] + await args.func(args) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test/pylib/lcov_utils.py b/test/pylib/lcov_utils.py new file mode 100644 index 0000000000..91236c4f30 --- /dev/null +++ b/test/pylib/lcov_utils.py @@ -0,0 +1,1202 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2024-present ScyllaDB +# +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import unidiff +from typing import ( + List, + OrderedDict as OrderedDictType, + Tuple, + Callable, + Union, + TextIO, + Self, + Optional, + Mapping, +) +from collections import OrderedDict +from pathlib import Path +import unidiff.patch +from unidiff import PatchSet, PatchedFile +from unidiff.patch import Hunk, Line +import copy +from itertools import repeat, accumulate + +# TN: test name +# SF: source file path +# FN: line number,function name +# FNF: number functions found +# FNH: number hit +# BRDA: branch data: line, block, (expressions,count)+ +# BRF: branches found +# DA: line number, hit count +# LF: lines found +# LH: lines hit. + + +# This monkey patching is done for two main reasons: +# 1. In order to allow deep copying of the objects (which is the main purpose) +# 2. For efficiency, instead of creating a function that will compare the strings +# every time. +class MakeLcovRouter(type): + def __new__(cls, name, bases, dct): + routes = {} + dct_keys = list(dct.keys()) + for match in dct["routes"]: + keys = [key for key in dct_keys if key.endswith("_" + match)] + assert len(keys) == 1 + routes[match] = dct[keys[0]] + dct["routes"] = routes + + def route(self, route, *args, **kwargs): + return self.routes[route](self, *args, **kwargs) + + dct["route"] = route + return super().__new__(cls, name, bases, dct) + + +class LcovRecord(metaclass = MakeLcovRouter): + routes = [ + "TN", + "SF", + "FN", + "FNDA", + "FNF", + "FNH", + "BRDA", + "BRF", + "BRH", + "DA", + "LF", + "LH", + "end_of_record", + ] + + def __init__(self) -> None: + self._test_name: Optional[str] = None + self.source_file: Optional[Path] = None + self.line_hits: dict[int, int] = dict() + + self.function_hits: dict[Tuple(int, str), int] = dict() + self.functions_to_lines: dict[str, int] = dict() + self.branch_hits: dict[Tuple[int, int, int], Optional[int]] = dict() + self.sealed: bool = False + self.FNF = None + self.FNH = None + self.BRF = None + self.BRH = None + self.LF = None + self.LH = None + + def empty(self): + return ( + len(self.line_hits) + len(self.function_hits) + len(self.branch_hits) + ) == 0 + + @property + def test_name(self): + return self._test_name if self._test_name else "" + + @property + def functions_found(self): + return len(self.function_hits) + + @property + def functions_hit(self): + return sum([int(hits and hits > 0) for hits in self.function_hits.values()]) + + @property + def branches_found(self): + return len(self.branch_hits) + + @property + def branches_hit(self): + return sum([int(bool(hits and hits > 0)) for hits in self.branch_hits.values()]) + + @property + def lines_found(self): + return len(self.line_hits) + + @property + def lines_hit(self): + return sum([int(hits and hits > 0) for hits in self.line_hits.values()]) + + def add(self, type_str: str, fields: List[str]): + assert not self.sealed + return self.route(type_str, fields) + + def add_TN(self, fields: List[str]) -> bool: + assert self._test_name is None + self._test_name = fields[0] + return False + + def add_SF(self, fields: List[str]) -> bool: + assert self.source_file is None + self.source_file = Path(fields[0]) + return False + + def add_FN(self, fields: List[str]) -> bool: + line, func_name = fields + line = int(line) + assert func_name not in self.functions_to_lines + self.functions_to_lines[func_name] = line + self.function_hits.setdefault((line, func_name), 0) + return False + + def add_FNDA(self, fields: List[str]) -> bool: + hits, func_name = fields + line = self.functions_to_lines[func_name] + hits = int(hits) + self.function_hits[(line, func_name)] = hits + return False + + def add_FNF(self, fields: List[str]) -> bool: + self.FNF = int(fields[0]) + return False + + def add_FNH(self, fields: List[str]) -> bool: + self.FNH = int(fields[0]) + return False + + def add_BRDA(self, fields: List[str]) -> bool: + line, block, branch, count = fields + line = int(line) + block = int(block) + branch = int(branch) + count = int(count) if count != "-" else None + self.branch_hits.setdefault((line, block, branch), count) + return False + + def add_BRF(self, fields: List[str]) -> bool: + self.BRF = int(fields[0]) + return False + + def add_BRH(self, fields: List[str]) -> bool: + self.BRH = int(fields[0]) + return False + + def add_DA(self, fields: List[str]) -> bool: + line, hits = fields + line = int(line) + hits = int(hits) + self.line_hits.setdefault(line, hits) + return False + + def add_LF(self, fields: List[str]) -> bool: + self.LF = int(fields[0]) + return False + + def add_LH(self, fields: List[str]) -> bool: + self.LH = int(fields[0]) + return False + + def add_end_of_record(self, fields: List[str]) -> bool: + self.sealed = True + self.validate_integrity() + self._refresh_functions_to_lines() + return True + + def remove_lines(self, line_numbers: List[int]): + self.validate_integrity() + for line_number in line_numbers: + if line_number in self.line_hits: + del self.line_hits[line_number] + functions_to_remove = list( + { + (line, func_name) + for func_name, line in self.functions_to_lines.items() + if line in line_numbers + } + ) + for key in functions_to_remove: + del self.function_hits[key] + del self.functions_to_lines[key[1]] + branches_to_remove = [ + branch for branch in self.branch_hits if branch[0] in line_numbers + ] + for branch in branches_to_remove: + del self.branch_hits[branch] + self.validate_integrity() + + def remove_line(self, line_number: int): + self.remove_lines([line_number]) + + def remove_branches(self, branch_line_numbers: List[int]): + branch_keys_to_remove = { + key for key in self.branch_hits.keys() if key[0] in branch_line_numbers + } + for branch_to_remove in branch_keys_to_remove: + del self.branch_hits[branch_to_remove] + + def remove_branch(self, branch_line): + self.remove_branches([branch_line]) + + def validate_integrity(self): + assert ( + len(set(self.functions_to_lines.values()) - set(self.line_hits.keys())) == 0 + ) + assert ( + len( + set([x[0] for x in self.branch_hits.keys()]) + - set(self.line_hits.keys()) + ) + == 0 + ) + + def get_lines(self) -> set[int]: + return set(self.line_hits.keys()) + + def filter_lines(self, lines: List[int]): + self.remove_lines(self.get_lines().difference(set(lines))) + self._refresh_functions_to_lines() + + def remap_lines(self, lines_mapping: Mapping[int, int]): + # Validate that no two lines are mapped to the same target + assert len(lines_mapping) == len(set(lines_mapping.values())) + # First filter all the None mapped lines + lines_to_keep = self.get_lines().intersection(set(lines_mapping.keys())) + self.filter_lines(lines_to_keep) + line_hits = self.line_hits + self.line_hits = dict() + for line, hits in line_hits.items(): + new_line = lines_mapping[line] + self.line_hits[new_line] = hits + function_hits = self.function_hits + self.function_hits = dict() + for (line, func_name), hits in function_hits.items(): + new_key = (lines_mapping[line], func_name) + self.function_hits[new_key] = hits + branch_hits = self.branch_hits + self.branch_hits = dict() + for (line, block, branch), count in branch_hits.items(): + new_key = (lines_mapping[line], block, branch) + self.branch_hits[new_key] = count + + def transform_line_hitrates(self, transform: Callable[[Optional[int]], int]): + for line in self.line_hits.keys(): + self.line_hits[line] = transform(self.line_hits[line]) + + def transform_function_hitrates(self, transform: Callable[[Optional[int]], int]): + for key in self.function_hits.keys(): + self.function_hits[key] = transform(self.function_hits[key]) + + def transform_branch_hitrates(self, transform: Callable[[Optional[int]], int]): + for key in self.branch_hits.keys(): + self.branch_hits[key] = transform(self.branch_hits[key]) + + def transform_hitrates(self, transform: Callable[[Optional[int]], int]): + self.transform_line_hitrates(transform) + self.transform_function_hitrates(transform) + self.transform_branch_hitrates(transform) + + def _get_branches_line_hitrate(self) -> Mapping[int, int]: + this_branch_line_hits = dict() + for key in self.branch_hits.keys(): + this_branch_line_hits.setdefault(key[0], 0) + if self.branch_hits[key] is not None: + this_branch_line_hits[key[0]] += self.branch_hits[key] + return this_branch_line_hits + + def _refresh_functions_to_lines(self): + self.functions_to_lines = { + func: line for line, func in self.function_hits.keys() + } + + def union(self, other: Self) -> Self: + """an in place version of the union operation, + the semantics are that the hitrates are combined for + shared lines, functions and branches. + Arguments: + other {Self} -- the other component to union with + """ + for line in other.line_hits.keys(): + if line not in self.line_hits: + self.line_hits[line] = other.line_hits[line] + else: + self.line_hits[line] += other.line_hits[line] + for key in other.function_hits.keys(): + if key not in self.function_hits: + self.function_hits[key] = other.function_hits[key] + else: + self.function_hits[key] += other.function_hits[key] + self._refresh_functions_to_lines() + for key in other.branch_hits.keys(): + if key not in self.branch_hits or self.branch_hits[key] is None: + self.branch_hits[key] = other.branch_hits[key] + elif other.branch_hits[key] is not None: + self.branch_hits[key] += other.branch_hits[key] + return self + + def intersection(self, other: Self) -> Self: + """an in place version of the intersection operation. + The semantics are, everything that is covered by both components + is kept and merged, everything that is covered by only one component is + removed from coverage completely (not just the hitrate). + The coverage is measured in lines, and functions but for brunches it is + measured in lines since there is not much meaning to only partial branch + instead the specific branch that should be removed is set to None. + Note: Self intersection is not an identity, it will remove any uncovered + lines, functions and (line) branches completely. + Arguments: + other {Self} -- the other component to intersect with + """ + covered_lines = dict() + lines_to_merge = self.get_lines().intersection(other.get_lines()) + lines_to_merge = [ + line + for line in lines_to_merge + if self.line_hits[line] > 0 and other.line_hits[line] > 0 + ] + if other == self: + for line in lines_to_merge: + covered_lines[line] = self.line_hits[line] + else: + for line in lines_to_merge: + covered_lines[line] = self.line_hits[line] + other.line_hits[line] + self.line_hits = covered_lines + covered_functions = dict() + functions_to_merge = set(self.function_hits.keys()).intersection( + set(other.function_hits.keys()) + ) + functions_to_merge = [ + (line, func_name) + for line, func_name in functions_to_merge + if self.function_hits[(line, func_name)] > 0 + and other.function_hits[(line, func_name)] > 0 + ] + if other == self: + for key in functions_to_merge: + covered_functions[key] = self.function_hits[key] + else: + for key in functions_to_merge: + covered_functions[key] = ( + self.function_hits[key] + other.function_hits[key] + ) + self.function_hits = covered_functions + self._refresh_functions_to_lines() + covered_branches = dict() + # for branches, count hits per line + this_branch_line_hits = dict() + for key in self.branch_hits.keys(): + if self.branch_hits[key] is not None: + this_branch_line_hits.setdefault(key[0], 0) + this_branch_line_hits[key[0]] += self.branch_hits[key] + other_branch_line_hits = dict() + for key in other.branch_hits.keys(): + if other.branch_hits[key] is not None: + other_branch_line_hits.setdefault(key[0], 0) + other_branch_line_hits[key[0]] += other.branch_hits[key] + this_branch_line_hits = { + key + for key in this_branch_line_hits.keys() + if this_branch_line_hits[key] > 0 + } + other_branch_line_hits = { + key + for key in other_branch_line_hits.keys() + if other_branch_line_hits[key] > 0 + } + branches_lines_to_merge = this_branch_line_hits.intersection( + other_branch_line_hits + ) + for key in self.branch_hits.keys(): + if key[0] not in branches_lines_to_merge: + continue + this_hits = self.branch_hits[key] + other_hits = other.branch_hits[key] + if this_hits is None and other_hits is None: + covered_branches[key] = None + elif this_hits is None: + covered_branches[key] = other_hits + elif other_hits is None: + covered_branches[key] = this_hits + else: + covered_branches[key] = this_hits + other_hits + self.branch_hits = covered_branches + return self + + def difference(self, other: Self) -> Self: + """an in place version of the difference operation + The semantics are everything that is covered by this component + but not the other. For branches it is calculated per line, not + per branch so it can be that some uncovered branches are still + indicated but not complete lines that are not evaluated. + For branches: + If the branch is only covered by this component, the hit rate will + be preserved, if it is covered by both or neither it will be None and if only covered + by other it will be 0. + Arguments: + other {Self} -- the other component to intersect with + """ + # first remove every line that is not covered by self (at all) + self.remove_lines([line for line, hits in self.line_hits.items() if hits <= 0]) + # remove every line that is covered by both + this_covered_lines = { + key for key in self.line_hits.keys() if self.line_hits[key] > 0 + } + other_covered_lines = { + key for key in other.line_hits.keys() if other.line_hits[key] > 0 + } + lines_to_remove = this_covered_lines.intersection(other_covered_lines) + self.remove_lines(lines_to_remove) + self._refresh_functions_to_lines() + # first remove every function that is not covered by self (at all) + for key in list(self.function_hits.keys()): + if self.function_hits[key] <= 0: + del self.function_hits[key] + this_covered_functions = { + key for key in self.function_hits.keys() if self.function_hits[key] > 0 + } + other_covered_functions = { + key for key in other.function_hits.keys() if other.function_hits[key] > 0 + } + # the remove all functions that are covered by both + functions_to_remove = this_covered_functions.intersection( + other_covered_functions + ) + for key in functions_to_remove: + del self.function_hits[key] + self._refresh_functions_to_lines() + # first remove every line that is not hit at all + branch_line_hits = self._get_branches_line_hitrate() + branches_lines_with_hits = { + line for key, line in branch_line_hits.items() if branch_line_hits[key] > 0 + } + for key in list(self.branch_hits.keys()): + if key[0] not in branch_line_hits: + del self.branch_hits[key] + for key, hits in list(self.branch_hits.items()): + covered_by_this = bool(self.branch_hits[key]) + covered_by_other = ( + bool(other.branch_hits[key]) if key in other.branch_hits else False + ) + covered_by_both = covered_by_this and covered_by_other + # covered by both + if covered_by_both: + self.branch_hits[key] = None + elif covered_by_this: # Only covered by this + pass + elif covered_by_other: + self.branch_hits[key] = 0 + else: # covered by neither + self.branch_hits[key] = None + return self + + def symmetric_difference(self, other: Self) -> Self: + """an in place version of the symmetric difference operation + Arguments: + other {Self} -- the other component to intersect with + """ + intersection = self.__and__(other) + self.difference(intersection).union(other.__sub__(intersection)) + return self + + def __and__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.intersection(other) + return new_component + + def __or__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.union(other) + return new_component + + def __sub__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.difference(other) + return new_component + + def __xor__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.symmetric_difference(other) + return new_component + + def write(self, f: TextIO): + # Test Name + f.write("TN:" + self.test_name + "\n") + # Source File + f.write("SF:" + str(self.source_file) + "\n") + # functions + if self.functions_found > 0: + functions_and_lines = list(self.function_hits.keys()) + functions_and_lines.sort() + [f.write(f"FN:{line},{func}\n") for line, func in functions_and_lines] + [ + f.write(f"FNDA:{int(self.function_hits[key])},{key[1]}\n") + for key in functions_and_lines + ] + # Function data is outputted regardless to function mapping (?) + f.write(f"FNF:{self.functions_found}\n") + f.write(f"FNH:{self.functions_hit}\n") + # branches + if self.branches_found > 0: + sorted_branches = list(self.branch_hits.keys()) + sorted_branches.sort() + for key in sorted_branches: + line, block, branch = key + count = self.branch_hits[key] + f.write( + f"BRDA:{line},{block},{branch},{count if count is not None else '-'}\n" + ) + f.write(f"BRF:{self.branches_found}\n") + f.write(f"BRH:{self.branches_hit}\n") + # lines + if self.lines_found > 0: + [f.write(f"DA:{line},{count}\n") for line, count in self.line_hits.items()] + f.write(f"LF:{self.lines_found}\n") + f.write(f"LH:{self.lines_hit}\n") + f.write("end_of_record\n") + + @staticmethod + def get_type_and_fields(line: str) -> Tuple[str, List[str]]: + parts = line.split(":", maxsplit = 1) + if len(parts) == 1: + parts.append("") + return parts[0].strip(), [field.strip() for field in parts[1].split(",")] + + def add_line(self, line: str) -> bool: + type_str, fields = self.get_type_and_fields(line) + return self.add(type_str, fields) + + def __eq__(self, other: Self): + if isinstance(other, type(self)): + if self.test_name != other.test_name: + return False + if self.source_file != other.source_file: + return False + elif self.line_hits != other.line_hits: + return False + elif self.function_hits != other.function_hits: + return False + elif self.branch_hits != other.branch_hits: + return False + else: + return True + else: + return False + + +class LcovFile: + LCOV_EXCL_LINE_DEFAULT = "LCOV_EXCL_LINE" + LCOV_EXCL_START_DEFAULT = "LCOV_EXCL_START" + LCOV_EXCL_STOP_DEFAULT = "LCOV_EXCL_STOP" + LCOV_EXCL_BR_LINE_DEFAULT = "LCOV_EXCL_BR_LINE" + LCOV_EXCL_BR_START_DEFAULT = "LCOV_EXCL_BR_START" + LCOV_EXCL_BR_STOP_DEFAULT = "LCOV_EXCL_BR_STOP" + EMPTY_LCOV_PSEUDO_FILE = Path("this_lcov_is_empty") + + def __init__( + self, + coverage_file: Optional[Path] = None, + filter_by_tags: bool = False, + LCOV_EXCL_LINE = LCOV_EXCL_LINE_DEFAULT, + LCOV_EXCL_START = LCOV_EXCL_START_DEFAULT, + LCOV_EXCL_STOP = LCOV_EXCL_STOP_DEFAULT, + LCOV_EXCL_BR_LINE = LCOV_EXCL_BR_LINE_DEFAULT, + LCOV_EXCL_BR_START = LCOV_EXCL_BR_START_DEFAULT, + LCOV_EXCL_BR_STOP = LCOV_EXCL_BR_STOP_DEFAULT, + ): + self.records: OrderedDictType[Tuple[str, Path], LcovRecord] = OrderedDict() + self.filter_by_tags = filter_by_tags + self.LCOV_EXCL_LINE = LCOV_EXCL_LINE + self.LCOV_EXCL_START = LCOV_EXCL_START + self.LCOV_EXCL_STOP = LCOV_EXCL_STOP + self.LCOV_EXCL_BR_LINE = LCOV_EXCL_BR_LINE + self.LCOV_EXCL_BR_START = LCOV_EXCL_BR_START + self.LCOV_EXCL_BR_STOP = LCOV_EXCL_BR_STOP + if coverage_file: + self.load(coverage_file) + + def __eq__(self, other: Self): + if isinstance(other, type(self)): + return dict(self.records.items()) == dict(other.records.items()) + else: + return False + + def load(self, coverage_file: Path): + with open(coverage_file, "r") as f: + current_record = None + while l := f.readline(): + if not l.strip(): + continue + if current_record is None: + current_record = LcovRecord() + try: + if current_record.add_line(l): + if ( + current_record.source_file + != LcovFile.EMPTY_LCOV_PSEUDO_FILE + ): + self._add_record(current_record) + current_record = None + except AssertionError as e: + raise RuntimeError( + f"assertion in loading {coverage_file}, {current_record.source_file}", + e, + ) + if self.filter_by_tags: + self.filter_by_source_tags( + self.LCOV_EXCL_LINE, + self.LCOV_EXCL_START, + self.LCOV_EXCL_STOP, + self.LCOV_EXCL_BR_LINE, + self.LCOV_EXCL_BR_START, + self.LCOV_EXCL_BR_STOP, + ) + return self + + # This copy of the function is to avoid deep copy + # when we know that the record is going not to be + # used anywhere else after this call. + def _add_record(self, record: LcovRecord): + if record.empty(): + return + key = (record.test_name, record.source_file) + if key in self.records: + self.records[key].union(record) + else: + self.records[key] = record + + def add_record(self, record: LcovRecord): + if record.empty(): + return + key = (record.test_name, record.source_file) + if key in self.records: + self.records[key].union(record) + else: + self.records[key] = copy.deepcopy(record) + + @staticmethod + def write_empty(target_file: Path, as_covered = False): + empty_lcov = LcovFile() + record = LcovRecord() + record.source_file = LcovFile.EMPTY_LCOV_PSEUDO_FILE + record.line_hits[1] = int(as_covered) + empty_lcov._add_record(record) + empty_lcov.write(target_file) + + def write( + self, + target_file: Path, + generate_empty = False, + incompatible_empty = False, + as_covered = False, + ): + """Writes the content of this object to an lcov trace file. + target_file - The target file to write into + generate_empty - If to write even if the content is empty + incompatible_empty - entirely empty lcov traces are not compatible with lcov and htmlgen commands, this is fine + for library operations that use only this object, however, if lcov or htmlgen are to run on + an entirely empty file, they are going to fail on format issues, those tools expect at least + one legal record in a trace file. If True, will generate a truly empty file, else, will generate + a file with a record pointing to a non existing pseudo file. + as_covered - this parameter is only meaningful when incompatible_empty is False and generate_empty is True, + if True, will generate a pseudo record that appears as fully covered (100%), else, the pseudo record will + be fully uncovered (0%) + """ + self.prune() + if not generate_empty: + assert ( + not self.empty() + ), "Writing an empty lcov trace will result in a trace which is incompatible with lcov tools" + if generate_empty and (not incompatible_empty) and self.empty(): + LcovFile.write_empty(target_file = target_file, as_covered = as_covered) + else: + with open(target_file, "w") as f: + [record.write(f) for record in self.records.values()] + + def filter_files(self, files_to_keep: List[Path]): + for key_to_remove in [ + key for key in self.records.keys() if key[1] not in files_to_keep + ]: + del self.records[key_to_remove] + + def filter_lines(self, file: Path, lines_to_keep: List[int]): + for record in [ + record for key, record in self.records.items() if key[1] == file + ]: + record.filter_lines(lines_to_keep) + + def _remap_to_patch( + self, patch_file: Union[Path, unidiff.PatchSet], patch_fn: Path + ): + patch: unidiff.PatchSet = ( + unidiff.PatchSet.from_filename(patch_file) + if isinstance(patch_file, Path) + else patch_file + ) + patched_files: List[unidiff.PatchedFile] = ( + patch.added_files + patch.modified_files + ) + # 1. remove all files that were not patched + self.filter_files( + [Path(pf.target_file).relative_to("b/") for pf in patched_files] + ) + # 2. for every file keep only the lines that were patched + # There is going to be only one new record because all lines belongs to this file. + record_by_source = dict() + for record in self.records.values(): + record_by_source.setdefault(record.source_file, []).append(record) + self.records.clear() + for patched_file in patched_files: + source_file = Path(patched_file.target_file).relative_to("b/") + if not source_file in record_by_source: + continue + lines_remap = { + line.target_line_no: line.diff_line_no + for hunk in patched_file + for line in hunk + if line.is_added + } + for record in record_by_source[source_file]: + record.remap_lines(lines_remap) + if not record.empty(): + record.source_file = patch_fn + self._add_record(record) + + def remap_to_patches(self, patch_files: List[Path]): + patches = prepare_patches_for_lcov(patch_files) + prototype = copy.deepcopy(self) + self.records.clear() + for patch, patch_fn in zip(patches, patch_files): + remapped = copy.deepcopy(prototype) + remapped._remap_to_patch(patch, patch_fn) + self.union(remapped) + self.prune() + return self + + def coverage_report( + self, + f: TextIO, + include_branches: bool = True, + files_to_include: Union[Path, List[Path], None] = None, + colors = False, + ): + if isinstance(files_to_include, Path): + files_to_include = [files_to_include] + for test, files_map in self.records.items(): + for file, record in files_map.items(): + if files_to_include is not None and not file in files_to_include: + continue + if record.empty(): + continue + # We would like to print the following: + # |Line number|Line data|line text + # This will maintain conformance with the html presentation + LINE_NUMBERS_WIDTH = 10 + LINE_DATA_WIDTH = 12 + table_head_format = f"| {{: <{LINE_NUMBERS_WIDTH}}}| {{: <{LINE_DATA_WIDTH}}}| Source code" + table_line_format = ( + f"|{{: >{LINE_NUMBERS_WIDTH}}} |{{: >{LINE_DATA_WIDTH}}} : {{}}" + ) + with open(file, "r") as f_src: + line = 0 + while l := f_src.readline(): + line += 1 + if line == 1: + headline = f"coverage data for: {file}, test: {test}" + f.write(headline + "\n") + f.write("-" * len(headline) + "\n\n") + f.write( + table_head_format.format("Line", "Line Data") + "\n" + ) + if line in record.lines.line_hits: + f.write( + table_line_format.format( + line, record.lines.line_hits[line], l + ) + ) + else: + f.write(table_line_format.format(line, "", l)) + f.write("\n\n") + f.write("Summary\n-------\n") + f.write("Lines Hit: ") + if record.lines.lines_found > 0: + f.write( + f"{record.lines.lines_hit}/{record.lines.lines_found} ({(record.lines.lines_hit/record.lines.lines_found)*100:.2f}%)\n" + ) + else: + f.write("No information found.\n") + f.write("Functions Hit: ") + if record.functions.functions_found > 0: + f.write( + f"{record.functions.functions_hit}/{record.functions.functions_found} ({(record.functions.functions_hit/record.functions.functions_found)*100:.2f}%)\n" + ) + else: + f.write("No information found.\n") + f.write("Branches Hit: ") + if record.branches.branches_found > 0: + f.write( + f"{record.branches.branches_hit}/{record.branches.branches_found} ({(record.branches.branches_hit/record.branches.branches_found)*100:.2f}%)\n" + ) + else: + f.write("No information found.\n") + + # Defaults are set according to `man geninfo`: + # The following markers are recognized by geninfo: + + # LCOV_EXCL_LINE + # Lines containing this marker will be excluded. + # LCOV_EXCL_START + # Marks the beginning of an excluded section. The current line is part of this section. + # LCOV_EXCL_STOP + # Marks the end of an excluded section. The current line not part of this section. + # LCOV_EXCL_BR_LINE + # Lines containing this marker will be excluded from branch coverage. + # LCOV_EXCL_BR_START + # Marks the beginning of a section which is excluded from branch coverage. The current line is part of this section. + # LCOV_EXCL_BR_STOP + # Marks the end of a section which is excluded from branch coverage. The current line not part of this section. + def filter_by_source_tags( + self, + LCOV_EXCL_LINE = "LCOV_EXCL_LINE", + LCOV_EXCL_START = "LCOV_EXCL_START", + LCOV_EXCL_STOP = "LCOV_EXCL_STOP", + LCOV_EXCL_BR_LINE = "LCOV_EXCL_BR_LINE", + LCOV_EXCL_BR_START = "LCOV_EXCL_BR_START", + LCOV_EXCL_BR_STOP = "LCOV_EXCL_BR_STOP", + ): + assert not (bool(LCOV_EXCL_START) ^ bool(LCOV_EXCL_STOP)) + assert not (bool(LCOV_EXCL_BR_START) ^ bool(LCOV_EXCL_BR_STOP)) + + # for each source file, create a map of excludes and then apply them to the file. + def make_exclusion_lists(f: Path): + lines_to_exclude = set() # line numbers to exclude + branches_to_exclude = set() # branch line numbers to exclude + line_section_open = None + branch_section_open = None + with open(f, "r") as src: + current_line = 1 + while l := src.readline(): + if LCOV_EXCL_LINE and LCOV_EXCL_LINE in l: + lines_to_exclude.add(current_line) + if LCOV_EXCL_BR_LINE and LCOV_EXCL_BR_LINE in l: + branches_to_exclude.add(current_line) + if ( + LCOV_EXCL_START + and (LCOV_EXCL_START in l) + and not line_section_open + ): + line_section_open = current_line + if LCOV_EXCL_STOP and (LCOV_EXCL_STOP in l) and line_section_open: + lines_to_exclude.update(range(line_section_open, current_line)) + line_section_open = None + if ( + LCOV_EXCL_BR_START + and (LCOV_EXCL_BR_START in l) + and not branch_section_open + ): + branch_section_open = current_line + if ( + LCOV_EXCL_BR_STOP + and (LCOV_EXCL_BR_STOP in l) + and branch_section_open + ): + branches_to_exclude.update( + range(branch_section_open, current_line) + ) + branch_section_open = None + current_line += 1 + return lines_to_exclude, branches_to_exclude + + records_by_source = {} + for key, record in self.records.items(): + records_by_source.setdefault(key[1], []) + records_by_source[key[1]].append(record) + + for f, records in records_by_source.items(): + lines_to_exclude, branches_to_exclude = make_exclusion_lists(f) + record: LcovRecord + for record in records: + record.remove_lines(lines_to_exclude) + record.remove_branches(branches_to_exclude) + self.prune() + + def intersection(self, other: Self): + common_records = set(self.records.keys()).intersection( + set(other.records.keys()) + ) + old_records = self.records() + self.records = OrderedDict() + for key in common_records(): + self.records[key] = old_records[key].intersection(other.records[key]) + self.prune() + return self + + def union(self, other: Self): + for key, val in other.records.items(): + if key not in self.records: + self.records[key] = copy.deepcopy(val) + else: + self.records[key].union(val) + self.prune() + return self + + def difference(self, other: Self): + common_records = set(self.records.keys()).intersection( + set(other.records.keys()) + ) + for key in common_records: + self.records[key].difference(other.records[key]) + self.prune() + return self + + def symmetric_difference(self, other: Self) -> Self: + """an in place version of the symmetric difference operation + Arguments: + other {Self} -- the other component to intersect with + """ + intersection = self.__and__(other) + self.difference(intersection).union(other.__sub__(intersection)) + return self + + def __and__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.intersection(other) + return new_component + + def __or__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.union(other) + return new_component + + def __sub__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.difference(other) + return new_component + + def __xor__(self, other: Self): + new_component = copy.deepcopy(self) + new_component.symmetric_difference(other) + return new_component + + def prune(self): + keys_to_prune = [key for key, val in self.records.items() if val.empty()] + for key in keys_to_prune: + del self.records[key] + + def empty(self): + if len(self.records) == 0: + return True + else: + if not next(iter(self.records.values())).empty(): + return False + else: + # this is a little bit dangerous because of the recursion, + # if prune() doesn't clear every empty record due to a bug + # we might get into an infinite recursion. + # However, this is the cleanest way to code it. + self.prune() + return self.empty() + + def tag_with_test(self, test_name: str, from_test: Optional[str] = None): + # TODO: we should probably error out or normalize the string in order to preserve + # lcov compatibility + if test_name == from_test: + return + all_compatible_records = [ + key for key in self.records.keys() if not from_test or key[0] == test_name + ] + records_to_change = [self.records[key] for key in all_compatible_records] + for key in all_compatible_records: + del self.records[key] + for record in records_to_change: + record._test_name = test_name + self._add_record(record) + + def tag_with_test_map(self, tests_map: Mapping[str, str]): + for from_test, to_test in tests_map.items(): + self.tag_with_test(from_test, to_test) + + +def prepare_patches_for_lcov_old(patches: List[Path]) -> List[unidiff.PatchSet]: + """Takes a list of patches paths that are assumed to be applied in their + order of appearance in the list and adjusts them to have target_line_no and + files in the final source code. + + Args: + patches (List[Path]): An ordered list of patches paths + """ + + patchsets: List[unidiff.PatchSet] = [ + unidiff.PatchSet.from_filename(f) for f in patches + ] + + def eliminate_rename( + source_file: str, dest_file: str, patchsets: List[unidiff.PatchSet] + ): + new_src_file = "a/" + dest_file[2:] + old_dest_file = "b/" + source_file[2:] + for patch in patchsets: + for patched_file in patch: + patched_file: unidiff.PatchedFile = patched_file + if patched_file.target_file == old_dest_file: + patched_file.source_file = new_src_file + patched_file.target_file = dest_file + + def remove_file(f: str, patchsets: List[unidiff.PatchSet]): + target_to_remove = "b/" + f[2:] + for patch in patchsets: + files_to_remove = [] + for patched_file in patch: + patched_file: unidiff.PatchedFile = patched_file + if patched_file.target_file == target_to_remove: + files_to_remove.append(patched_file) + [patch.remove(f) for f in files_to_remove] + + def update_line_removed(line_num, source_file, patchsets): + for patch in patchsets: + for patched_file in patch: + patched_file: unidiff.PatchedFile = patched_file + if patched_file.source_file == source_file: + for line in patched_file: + line: unidiff.patch.Line = line + if line.target_line_no > line_num: + line.target_line_no -= 1 + + def update_line_added(line_num, target_file, patchsets): + for patch in patchsets: + for patched_file in patch: + patched_file: unidiff.PatchedFile = patched_file + if patched_file.target_file == target_file: + for line in patched_file: + line: unidiff.patch.Line = line + if line.target_line_no >= line_num: + line.target_line_no += 1 + + for idx, patch in enumerate(patchsets): + for patched_file in patch: + patched_file: unidiff.PatchedFile = patched_file + if patched_file.is_removed_file: + remove_file(patched_file.source_file, patchsets[:idx]) + elif patched_file.is_rename: + # Make the semantics as if the file was actually never renamed (since in the final source it will be as if the file is + # already in it renamed form) + eliminate_rename( + patched_file.source_file, + patched_file.target_file, + patchsets[: idx + 1], + ) + for hunk in patched_file: + hunk: unidiff.Hunk = hunk + for line in hunk: + line: unidiff.patch.Line = line + if line.is_removed: + update_line_removed( + line.source_line_no, + patched_file.source_file, + patchsets[:idx], + ) + if line.is_added: + update_line_added( + line.target_line_no, + patched_file.target_file, + patchsets[:idx], + ) + return patchsets + + +def prepare_patches_for_lcov(patches: List[Path]) -> List[unidiff.PatchSet]: + """Takes a list of patches paths that are assumed to be applied in their + order of appearance in the list and adjusts them to have target_line_no and + files in the final source code. + + Args: + patches (List[Path]): An ordered list of patches paths + """ + # rough algorithm- + # incrementally map patches to the latest file so eventually only contain + # patch data of the latest file. each file is getting handled separately. + # it calls for some kind of induction - assume that patches until n-1 are referring to + # the source file of patch n and make them refer to the target file of patch n. + + patchsets: List[unidiff.PatchSet] = [ + unidiff.PatchSet.from_filename(f) for f in patches + ] + for n in range(1, len(patchsets)): + patches_adjustment_step(patchsets[: n + 1]) + return patchsets + + +def patches_adjustment_step(patchlist: List[PatchSet]): + main_patch = patchlist[-1] + patches = patchlist[:-1] + # 1. if file is removed remove it from all patches + removed_files: list[PatchedFile] = list(main_patch.removed_files) + removed_files = [x.path for x in removed_files] + for patch in patches: + files_to_remove = [ + f + for f in (list(patch.modified_files) + list(patch.added_files)) + if f.path in removed_files + ] + [patch.remove(f) for f in files_to_remove] + renamed_files = [f for f in main_patch.modified_files if f.is_rename] + + def maybe_remove_prefix(path: str): + if path.startswith("a/") or path.startswith("b/"): + return path[2:] + return path + + renames = {maybe_remove_prefix(f.source_file): f.target_file for f in renamed_files} + # 2. if file was renamed, change the patches targets to the new file name + for patch in patches: + files: List[PatchedFile] = list(patch.added_files) + list(patch.modified_files) + for f in files: + target = maybe_remove_prefix(f.target_file) + if target in renames: + f.target_file = renames[target] + # 3. invariant, every patch in the previous patches only contain the most recent change, + # make the invariant true also for the patches with this patch applied. + patched_files_by_target = {} + for patch in patches: + for file in patch: + file: PatchedFile = file + patched_files_by_target.setdefault(file.target_file, []) + patched_files_by_target[file.target_file].append(file) + for file in list(main_patch.modified_files): + removed_lines: List[int] = [ + line.source_line_no for hunk in file for line in hunk if line.is_removed + ] + added_lines: List[int] = [ + line.target_line_no for hunk in file for line in hunk if line.is_added + ] + removed_lines.sort() + added_lines.sort() + hunks: List[Hunk] = [ + hunk + for patched_file in patched_files_by_target.get(file.target_file, []) + for hunk in patched_file + ] + # remove replaced/removed lines from previous patches + [ + hunk.remove(line) + for hunk in hunks + for line in list(hunk) + if line.target_line_no in removed_lines + ] + lines: List[Line] = [line for hunk in hunks for line in hunk if line.is_added] + lines.sort(key = lambda l: l.target_line_no) + + def apply_transform(transform): + transform_iter = iter(transform) + current_transform = next(transform_iter, (0, 0)) + for line in lines[::-1]: + while current_transform[0] > line.target_line_no: + current_transform = next(transform_iter, (0, 0)) + if current_transform == (0, 0): + break + line.target_line_no += current_transform[1] + + removed_transform = list(zip(removed_lines, accumulate(repeat(-1))))[::-1] + apply_transform(removed_transform) + added_transform = list(zip(added_lines, accumulate(repeat(1)))) + added_transform = list([(x - y + 1, y) for x, y in added_transform])[::-1] + apply_transform(added_transform) + # 4. TODO: Cleanup + # Remove hunks that don't have added or deleted lines, remove files that don't + # have any hunks. It is a performance optimization hence it is not implemented right + # now.