Files
scylladb/test/pylib/lcov_utils.py
Avi Kivity f3eade2f62 treewide: relicense to ScyllaDB-Source-Available-1.0
Drop the AGPL license in favor of a source-available license.
See the blog post [1] for details.

[1] https://www.scylladb.com/2024/12/18/why-were-moving-to-a-source-available-license/
2024-12-18 17:45:13 +02:00

1205 lines
47 KiB
Python

# -*- coding: utf-8 -*-
#
# Copyright (C) 2024-present ScyllaDB
#
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import unidiff
from typing import (
List,
OrderedDict as OrderedDictType,
Tuple,
Callable,
Union,
TextIO,
Self,
Optional,
Mapping,
)
from collections import OrderedDict
from pathlib import Path
import unidiff.patch
from unidiff import PatchSet, PatchedFile
from unidiff.patch import Hunk, Line
import copy
from itertools import repeat, accumulate
# TN: test name
# SF: source file path
# FN: line number,function name
# FNF: number functions found
# FNH: number hit
# BRDA: branch data: line, block, (expressions,count)+
# BRF: branches found
# DA: line number, hit count
# LF: lines found
# LH: lines hit.
# This monkey patching is done for two main reasons:
# 1. In order to allow deep copying of the objects (which is the main purpose)
# 2. For efficiency, instead of creating a function that will compare the strings
# every time.
class MakeLcovRouter(type):
def __new__(cls, name, bases, dct):
routes = {}
dct_keys = list(dct.keys())
for match in dct["routes"]:
keys = [key for key in dct_keys if key.endswith("_" + match)]
assert len(keys) == 1
routes[match] = dct[keys[0]]
dct["routes"] = routes
def route(self, route, *args, **kwargs):
return self.routes[route](self, *args, **kwargs)
dct["route"] = route
return super().__new__(cls, name, bases, dct)
class LcovRecord(metaclass = MakeLcovRouter):
routes = [
"TN",
"SF",
"FN",
"FNDA",
"FNF",
"FNH",
"BRDA",
"BRF",
"BRH",
"DA",
"LF",
"LH",
"end_of_record",
]
def __init__(self) -> None:
self._test_name: Optional[str] = None
self.source_file: Optional[Path] = None
self.line_hits: dict[int, int] = dict()
self.function_hits: dict[Tuple(int, str), int] = dict()
self.functions_to_lines: dict[str, int] = dict()
self.branch_hits: dict[Tuple[int, int, int], Optional[int]] = dict()
self.sealed: bool = False
self.FNF = None
self.FNH = None
self.BRF = None
self.BRH = None
self.LF = None
self.LH = None
def empty(self):
return (
len(self.line_hits) + len(self.function_hits) + len(self.branch_hits)
) == 0
@property
def test_name(self):
return self._test_name if self._test_name else ""
@property
def functions_found(self):
return len(self.function_hits)
@property
def functions_hit(self):
return sum([int(hits and hits > 0) for hits in self.function_hits.values()])
@property
def branches_found(self):
return len(self.branch_hits)
@property
def branches_hit(self):
return sum([int(bool(hits and hits > 0)) for hits in self.branch_hits.values()])
@property
def lines_found(self):
return len(self.line_hits)
@property
def lines_hit(self):
return sum([int(hits and hits > 0) for hits in self.line_hits.values()])
def add(self, type_str: str, fields: List[str]):
assert not self.sealed
return self.route(type_str, fields)
def add_TN(self, fields: List[str]) -> bool:
assert self._test_name is None
self._test_name = fields[0]
return False
def add_SF(self, fields: List[str]) -> bool:
assert self.source_file is None
self.source_file = Path(fields[0])
return False
def add_FN(self, fields: List[str]) -> bool:
line, func_name = fields
line = int(line)
assert func_name not in self.functions_to_lines
self.functions_to_lines[func_name] = line
self.function_hits.setdefault((line, func_name), 0)
return False
def add_FNDA(self, fields: List[str]) -> bool:
hits, func_name = fields
line = self.functions_to_lines[func_name]
hits = int(hits)
self.function_hits[(line, func_name)] = hits
return False
def add_FNF(self, fields: List[str]) -> bool:
self.FNF = int(fields[0])
return False
def add_FNH(self, fields: List[str]) -> bool:
self.FNH = int(fields[0])
return False
def add_BRDA(self, fields: List[str]) -> bool:
line, block, branch, count = fields
line = int(line)
block = int(block)
branch = int(branch)
count = int(count) if count != "-" else None
self.branch_hits.setdefault((line, block, branch), count)
return False
def add_BRF(self, fields: List[str]) -> bool:
self.BRF = int(fields[0])
return False
def add_BRH(self, fields: List[str]) -> bool:
self.BRH = int(fields[0])
return False
def add_DA(self, fields: List[str]) -> bool:
line, hits = fields
line = int(line)
hits = int(hits)
self.line_hits.setdefault(line, hits)
return False
def add_LF(self, fields: List[str]) -> bool:
self.LF = int(fields[0])
return False
def add_LH(self, fields: List[str]) -> bool:
self.LH = int(fields[0])
return False
def add_end_of_record(self, fields: List[str]) -> bool:
self.sealed = True
self.validate_integrity()
self._refresh_functions_to_lines()
return True
def remove_lines(self, line_numbers: List[int]):
self.validate_integrity()
for line_number in line_numbers:
if line_number in self.line_hits:
del self.line_hits[line_number]
functions_to_remove = list(
{
(line, func_name)
for func_name, line in self.functions_to_lines.items()
if line in line_numbers
}
)
for key in functions_to_remove:
del self.function_hits[key]
del self.functions_to_lines[key[1]]
branches_to_remove = [
branch for branch in self.branch_hits if branch[0] in line_numbers
]
for branch in branches_to_remove:
del self.branch_hits[branch]
self.validate_integrity()
def remove_line(self, line_number: int):
self.remove_lines([line_number])
def remove_branches(self, branch_line_numbers: List[int]):
branch_keys_to_remove = {
key for key in self.branch_hits.keys() if key[0] in branch_line_numbers
}
for branch_to_remove in branch_keys_to_remove:
del self.branch_hits[branch_to_remove]
def remove_branch(self, branch_line):
self.remove_branches([branch_line])
def validate_integrity(self):
assert (
len(set(self.functions_to_lines.values()) - set(self.line_hits.keys())) == 0
)
assert (
len(
set([x[0] for x in self.branch_hits.keys()])
- set(self.line_hits.keys())
)
== 0
)
def get_lines(self) -> set[int]:
return set(self.line_hits.keys())
def filter_lines(self, lines: List[int]):
self.remove_lines(self.get_lines().difference(set(lines)))
self._refresh_functions_to_lines()
def remap_lines(self, lines_mapping: Mapping[int, int]):
# Validate that no two lines are mapped to the same target
assert len(lines_mapping) == len(set(lines_mapping.values()))
# First filter all the None mapped lines
lines_to_keep = self.get_lines().intersection(set(lines_mapping.keys()))
self.filter_lines(lines_to_keep)
line_hits = self.line_hits
self.line_hits = dict()
for line, hits in line_hits.items():
new_line = lines_mapping[line]
self.line_hits[new_line] = hits
function_hits = self.function_hits
self.function_hits = dict()
for (line, func_name), hits in function_hits.items():
new_key = (lines_mapping[line], func_name)
self.function_hits[new_key] = hits
branch_hits = self.branch_hits
self.branch_hits = dict()
for (line, block, branch), count in branch_hits.items():
new_key = (lines_mapping[line], block, branch)
self.branch_hits[new_key] = count
def transform_line_hitrates(self, transform: Callable[[Optional[int]], int]):
for line in self.line_hits.keys():
self.line_hits[line] = transform(self.line_hits[line])
def transform_function_hitrates(self, transform: Callable[[Optional[int]], int]):
for key in self.function_hits.keys():
self.function_hits[key] = transform(self.function_hits[key])
def transform_branch_hitrates(self, transform: Callable[[Optional[int]], int]):
for key in self.branch_hits.keys():
self.branch_hits[key] = transform(self.branch_hits[key])
def transform_hitrates(self, transform: Callable[[Optional[int]], int]):
self.transform_line_hitrates(transform)
self.transform_function_hitrates(transform)
self.transform_branch_hitrates(transform)
def _get_branches_line_hitrate(self) -> Mapping[int, int]:
this_branch_line_hits = dict()
for key in self.branch_hits.keys():
this_branch_line_hits.setdefault(key[0], 0)
if self.branch_hits[key] is not None:
this_branch_line_hits[key[0]] += self.branch_hits[key]
return this_branch_line_hits
def _refresh_functions_to_lines(self):
self.functions_to_lines = {
func: line for line, func in self.function_hits.keys()
}
def union(self, other: Self) -> Self:
"""an in place version of the union operation,
the semantics are that the hitrates are combined for
shared lines, functions and branches.
Arguments:
other {Self} -- the other component to union with
"""
for line in other.line_hits.keys():
if line not in self.line_hits:
self.line_hits[line] = other.line_hits[line]
else:
self.line_hits[line] += other.line_hits[line]
for key in other.function_hits.keys():
if key not in self.function_hits:
self.function_hits[key] = other.function_hits[key]
else:
self.function_hits[key] += other.function_hits[key]
self._refresh_functions_to_lines()
for key in other.branch_hits.keys():
if key not in self.branch_hits or self.branch_hits[key] is None:
self.branch_hits[key] = other.branch_hits[key]
elif other.branch_hits[key] is not None:
self.branch_hits[key] += other.branch_hits[key]
return self
def intersection(self, other: Self) -> Self:
"""an in place version of the intersection operation.
The semantics are, everything that is covered by both components
is kept and merged, everything that is covered by only one component is
removed from coverage completely (not just the hitrate).
The coverage is measured in lines, and functions but for brunches it is
measured in lines since there is not much meaning to only partial branch
instead the specific branch that should be removed is set to None.
Note: Self intersection is not an identity, it will remove any uncovered
lines, functions and (line) branches completely.
Arguments:
other {Self} -- the other component to intersect with
"""
covered_lines = dict()
lines_to_merge = self.get_lines().intersection(other.get_lines())
lines_to_merge = [
line
for line in lines_to_merge
if self.line_hits[line] > 0 and other.line_hits[line] > 0
]
if other == self:
for line in lines_to_merge:
covered_lines[line] = self.line_hits[line]
else:
for line in lines_to_merge:
covered_lines[line] = self.line_hits[line] + other.line_hits[line]
self.line_hits = covered_lines
covered_functions = dict()
functions_to_merge = set(self.function_hits.keys()).intersection(
set(other.function_hits.keys())
)
functions_to_merge = [
(line, func_name)
for line, func_name in functions_to_merge
if self.function_hits[(line, func_name)] > 0
and other.function_hits[(line, func_name)] > 0
]
if other == self:
for key in functions_to_merge:
covered_functions[key] = self.function_hits[key]
else:
for key in functions_to_merge:
covered_functions[key] = (
self.function_hits[key] + other.function_hits[key]
)
self.function_hits = covered_functions
self._refresh_functions_to_lines()
covered_branches = dict()
# for branches, count hits per line
this_branch_line_hits = dict()
for key in self.branch_hits.keys():
if self.branch_hits[key] is not None:
this_branch_line_hits.setdefault(key[0], 0)
this_branch_line_hits[key[0]] += self.branch_hits[key]
other_branch_line_hits = dict()
for key in other.branch_hits.keys():
if other.branch_hits[key] is not None:
other_branch_line_hits.setdefault(key[0], 0)
other_branch_line_hits[key[0]] += other.branch_hits[key]
this_branch_line_hits = {
key
for key in this_branch_line_hits.keys()
if this_branch_line_hits[key] > 0
}
other_branch_line_hits = {
key
for key in other_branch_line_hits.keys()
if other_branch_line_hits[key] > 0
}
branches_lines_to_merge = this_branch_line_hits.intersection(
other_branch_line_hits
)
for key in self.branch_hits.keys():
if key[0] not in branches_lines_to_merge:
continue
this_hits = self.branch_hits[key]
other_hits = other.branch_hits[key]
if this_hits is None and other_hits is None:
covered_branches[key] = None
elif this_hits is None:
covered_branches[key] = other_hits
elif other_hits is None:
covered_branches[key] = this_hits
else:
covered_branches[key] = this_hits + other_hits
self.branch_hits = covered_branches
return self
def difference(self, other: Self) -> Self:
"""an in place version of the difference operation
The semantics are everything that is covered by this component
but not the other. For branches it is calculated per line, not
per branch so it can be that some uncovered branches are still
indicated but not complete lines that are not evaluated.
For branches:
If the branch is only covered by this component, the hit rate will
be preserved, if it is covered by both or neither it will be None and if only covered
by other it will be 0.
Arguments:
other {Self} -- the other component to intersect with
"""
# first remove every line that is not covered by self (at all)
self.remove_lines([line for line, hits in self.line_hits.items() if hits <= 0])
# remove every line that is covered by both
this_covered_lines = {
key for key in self.line_hits.keys() if self.line_hits[key] > 0
}
other_covered_lines = {
key for key in other.line_hits.keys() if other.line_hits[key] > 0
}
lines_to_remove = this_covered_lines.intersection(other_covered_lines)
self.remove_lines(lines_to_remove)
self._refresh_functions_to_lines()
# first remove every function that is not covered by self (at all)
for key in list(self.function_hits.keys()):
if self.function_hits[key] <= 0:
del self.function_hits[key]
this_covered_functions = {
key for key in self.function_hits.keys() if self.function_hits[key] > 0
}
other_covered_functions = {
key for key in other.function_hits.keys() if other.function_hits[key] > 0
}
# the remove all functions that are covered by both
functions_to_remove = this_covered_functions.intersection(
other_covered_functions
)
for key in functions_to_remove:
del self.function_hits[key]
self._refresh_functions_to_lines()
# first remove every line that is not hit at all
branch_line_hits = self._get_branches_line_hitrate()
branches_lines_with_hits = {
line for key, line in branch_line_hits.items() if branch_line_hits[key] > 0
}
for key in list(self.branch_hits.keys()):
if key[0] not in branch_line_hits:
del self.branch_hits[key]
for key, hits in list(self.branch_hits.items()):
covered_by_this = bool(self.branch_hits[key])
covered_by_other = (
bool(other.branch_hits[key]) if key in other.branch_hits else False
)
covered_by_both = covered_by_this and covered_by_other
# covered by both
if covered_by_both:
self.branch_hits[key] = None
elif covered_by_this: # Only covered by this
pass
elif covered_by_other:
self.branch_hits[key] = 0
else: # covered by neither
self.branch_hits[key] = None
return self
def symmetric_difference(self, other: Self) -> Self:
"""an in place version of the symmetric difference operation
Arguments:
other {Self} -- the other component to intersect with
"""
intersection = self.__and__(other)
self.difference(intersection).union(other.__sub__(intersection))
return self
def __and__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.intersection(other)
return new_component
def __or__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.union(other)
return new_component
def __sub__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.difference(other)
return new_component
def __xor__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.symmetric_difference(other)
return new_component
def write(self, f: TextIO):
# Test Name
f.write("TN:" + self.test_name + "\n")
# Source File
f.write("SF:" + str(self.source_file) + "\n")
# functions
if self.functions_found > 0:
functions_and_lines = list(self.function_hits.keys())
functions_and_lines.sort()
[f.write(f"FN:{line},{func}\n") for line, func in functions_and_lines]
[
f.write(f"FNDA:{int(self.function_hits[key])},{key[1]}\n")
for key in functions_and_lines
]
# Function data is outputted regardless to function mapping (?)
f.write(f"FNF:{self.functions_found}\n")
f.write(f"FNH:{self.functions_hit}\n")
# branches
if self.branches_found > 0:
sorted_branches = list(self.branch_hits.keys())
sorted_branches.sort()
for key in sorted_branches:
line, block, branch = key
count = self.branch_hits[key]
f.write(
f"BRDA:{line},{block},{branch},{count if count is not None else '-'}\n"
)
f.write(f"BRF:{self.branches_found}\n")
f.write(f"BRH:{self.branches_hit}\n")
# lines
if self.lines_found > 0:
[f.write(f"DA:{line},{count}\n") for line, count in self.line_hits.items()]
f.write(f"LF:{self.lines_found}\n")
f.write(f"LH:{self.lines_hit}\n")
f.write("end_of_record\n")
@staticmethod
def get_type_and_fields(line: str) -> Tuple[str, List[str]]:
parts = line.split(":", maxsplit = 1)
if len(parts) == 1:
parts.append("")
type_str = parts[0].strip()
fields = [parts[1].strip()] if type_str == "SF" else [field.strip() for field in parts[1].split(",")]
return type_str, fields
def add_line(self, line: str) -> bool:
type_str, fields = self.get_type_and_fields(line)
return self.add(type_str, fields)
def __eq__(self, other: Self):
if isinstance(other, type(self)):
if self.test_name != other.test_name:
return False
if self.source_file != other.source_file:
return False
elif self.line_hits != other.line_hits:
return False
elif self.function_hits != other.function_hits:
return False
elif self.branch_hits != other.branch_hits:
return False
else:
return True
else:
return False
class LcovFile:
LCOV_EXCL_LINE_DEFAULT = "LCOV_EXCL_LINE"
LCOV_EXCL_START_DEFAULT = "LCOV_EXCL_START"
LCOV_EXCL_STOP_DEFAULT = "LCOV_EXCL_STOP"
LCOV_EXCL_BR_LINE_DEFAULT = "LCOV_EXCL_BR_LINE"
LCOV_EXCL_BR_START_DEFAULT = "LCOV_EXCL_BR_START"
LCOV_EXCL_BR_STOP_DEFAULT = "LCOV_EXCL_BR_STOP"
EMPTY_LCOV_PSEUDO_FILE = Path("this_lcov_is_empty")
def __init__(
self,
coverage_file: Optional[Path] = None,
filter_by_tags: bool = False,
LCOV_EXCL_LINE = LCOV_EXCL_LINE_DEFAULT,
LCOV_EXCL_START = LCOV_EXCL_START_DEFAULT,
LCOV_EXCL_STOP = LCOV_EXCL_STOP_DEFAULT,
LCOV_EXCL_BR_LINE = LCOV_EXCL_BR_LINE_DEFAULT,
LCOV_EXCL_BR_START = LCOV_EXCL_BR_START_DEFAULT,
LCOV_EXCL_BR_STOP = LCOV_EXCL_BR_STOP_DEFAULT,
):
self.records: OrderedDictType[Tuple[str, Path], LcovRecord] = OrderedDict()
self.filter_by_tags = filter_by_tags
self.LCOV_EXCL_LINE = LCOV_EXCL_LINE
self.LCOV_EXCL_START = LCOV_EXCL_START
self.LCOV_EXCL_STOP = LCOV_EXCL_STOP
self.LCOV_EXCL_BR_LINE = LCOV_EXCL_BR_LINE
self.LCOV_EXCL_BR_START = LCOV_EXCL_BR_START
self.LCOV_EXCL_BR_STOP = LCOV_EXCL_BR_STOP
if coverage_file:
self.load(coverage_file)
def __eq__(self, other: Self):
if isinstance(other, type(self)):
return dict(self.records.items()) == dict(other.records.items())
else:
return False
def load(self, coverage_file: Path):
with open(coverage_file, "r") as f:
current_record = None
while l := f.readline():
if not l.strip():
continue
if current_record is None:
current_record = LcovRecord()
try:
if current_record.add_line(l):
if (
current_record.source_file
!= LcovFile.EMPTY_LCOV_PSEUDO_FILE
):
self._add_record(current_record)
current_record = None
except AssertionError as e:
raise RuntimeError(
f"assertion in loading {coverage_file}, {current_record.source_file}",
e,
)
if self.filter_by_tags:
self.filter_by_source_tags(
self.LCOV_EXCL_LINE,
self.LCOV_EXCL_START,
self.LCOV_EXCL_STOP,
self.LCOV_EXCL_BR_LINE,
self.LCOV_EXCL_BR_START,
self.LCOV_EXCL_BR_STOP,
)
return self
# This copy of the function is to avoid deep copy
# when we know that the record is going not to be
# used anywhere else after this call.
def _add_record(self, record: LcovRecord):
if record.empty():
return
key = (record.test_name, record.source_file)
if key in self.records:
self.records[key].union(record)
else:
self.records[key] = record
def add_record(self, record: LcovRecord):
if record.empty():
return
key = (record.test_name, record.source_file)
if key in self.records:
self.records[key].union(record)
else:
self.records[key] = copy.deepcopy(record)
@staticmethod
def write_empty(target_file: Path, as_covered = False):
empty_lcov = LcovFile()
record = LcovRecord()
record.source_file = LcovFile.EMPTY_LCOV_PSEUDO_FILE
record.line_hits[1] = int(as_covered)
empty_lcov._add_record(record)
empty_lcov.write(target_file)
def write(
self,
target_file: Path,
generate_empty = False,
incompatible_empty = False,
as_covered = False,
):
"""Writes the content of this object to an lcov trace file.
target_file - The target file to write into
generate_empty - If to write even if the content is empty
incompatible_empty - entirely empty lcov traces are not compatible with lcov and htmlgen commands, this is fine
for library operations that use only this object, however, if lcov or htmlgen are to run on
an entirely empty file, they are going to fail on format issues, those tools expect at least
one legal record in a trace file. If True, will generate a truly empty file, else, will generate
a file with a record pointing to a non existing pseudo file.
as_covered - this parameter is only meaningful when incompatible_empty is False and generate_empty is True,
if True, will generate a pseudo record that appears as fully covered (100%), else, the pseudo record will
be fully uncovered (0%)
"""
self.prune()
if not generate_empty:
assert (
not self.empty()
), "Writing an empty lcov trace will result in a trace which is incompatible with lcov tools"
if generate_empty and (not incompatible_empty) and self.empty():
LcovFile.write_empty(target_file = target_file, as_covered = as_covered)
else:
with open(target_file, "w") as f:
[record.write(f) for record in self.records.values()]
def filter_files(self, files_to_keep: List[Path]):
for key_to_remove in [
key for key in self.records.keys() if key[1] not in files_to_keep
]:
del self.records[key_to_remove]
def filter_lines(self, file: Path, lines_to_keep: List[int]):
for record in [
record for key, record in self.records.items() if key[1] == file
]:
record.filter_lines(lines_to_keep)
def _remap_to_patch(
self, patch_file: Union[Path, unidiff.PatchSet], patch_fn: Path
):
patch: unidiff.PatchSet = (
unidiff.PatchSet.from_filename(patch_file)
if isinstance(patch_file, Path)
else patch_file
)
patched_files: List[unidiff.PatchedFile] = (
patch.added_files + patch.modified_files
)
# 1. remove all files that were not patched
self.filter_files(
[Path(pf.target_file).relative_to("b/") for pf in patched_files]
)
# 2. for every file keep only the lines that were patched
# There is going to be only one new record because all lines belongs to this file.
record_by_source = dict()
for record in self.records.values():
record_by_source.setdefault(record.source_file, []).append(record)
self.records.clear()
for patched_file in patched_files:
source_file = Path(patched_file.target_file).relative_to("b/")
if not source_file in record_by_source:
continue
lines_remap = {
line.target_line_no: line.diff_line_no
for hunk in patched_file
for line in hunk
if line.is_added
}
for record in record_by_source[source_file]:
record.remap_lines(lines_remap)
if not record.empty():
record.source_file = patch_fn
self._add_record(record)
def remap_to_patches(self, patch_files: List[Path]):
patches = prepare_patches_for_lcov(patch_files)
prototype = copy.deepcopy(self)
self.records.clear()
for patch, patch_fn in zip(patches, patch_files):
remapped = copy.deepcopy(prototype)
remapped._remap_to_patch(patch, patch_fn)
self.union(remapped)
self.prune()
return self
def coverage_report(
self,
f: TextIO,
include_branches: bool = True,
files_to_include: Union[Path, List[Path], None] = None,
colors = False,
):
if isinstance(files_to_include, Path):
files_to_include = [files_to_include]
for test, files_map in self.records.items():
for file, record in files_map.items():
if files_to_include is not None and not file in files_to_include:
continue
if record.empty():
continue
# We would like to print the following:
# |Line number|Line data|line text
# This will maintain conformance with the html presentation
LINE_NUMBERS_WIDTH = 10
LINE_DATA_WIDTH = 12
table_head_format = f"| {{: <{LINE_NUMBERS_WIDTH}}}| {{: <{LINE_DATA_WIDTH}}}| Source code"
table_line_format = (
f"|{{: >{LINE_NUMBERS_WIDTH}}} |{{: >{LINE_DATA_WIDTH}}} : {{}}"
)
with open(file, "r") as f_src:
line = 0
while l := f_src.readline():
line += 1
if line == 1:
headline = f"coverage data for: {file}, test: {test}"
f.write(headline + "\n")
f.write("-" * len(headline) + "\n\n")
f.write(
table_head_format.format("Line", "Line Data") + "\n"
)
if line in record.lines.line_hits:
f.write(
table_line_format.format(
line, record.lines.line_hits[line], l
)
)
else:
f.write(table_line_format.format(line, "", l))
f.write("\n\n")
f.write("Summary\n-------\n")
f.write("Lines Hit: ")
if record.lines.lines_found > 0:
f.write(
f"{record.lines.lines_hit}/{record.lines.lines_found} ({(record.lines.lines_hit/record.lines.lines_found)*100:.2f}%)\n"
)
else:
f.write("No information found.\n")
f.write("Functions Hit: ")
if record.functions.functions_found > 0:
f.write(
f"{record.functions.functions_hit}/{record.functions.functions_found} ({(record.functions.functions_hit/record.functions.functions_found)*100:.2f}%)\n"
)
else:
f.write("No information found.\n")
f.write("Branches Hit: ")
if record.branches.branches_found > 0:
f.write(
f"{record.branches.branches_hit}/{record.branches.branches_found} ({(record.branches.branches_hit/record.branches.branches_found)*100:.2f}%)\n"
)
else:
f.write("No information found.\n")
# Defaults are set according to `man geninfo`:
# The following markers are recognized by geninfo:
# LCOV_EXCL_LINE
# Lines containing this marker will be excluded.
# LCOV_EXCL_START
# Marks the beginning of an excluded section. The current line is part of this section.
# LCOV_EXCL_STOP
# Marks the end of an excluded section. The current line not part of this section.
# LCOV_EXCL_BR_LINE
# Lines containing this marker will be excluded from branch coverage.
# LCOV_EXCL_BR_START
# Marks the beginning of a section which is excluded from branch coverage. The current line is part of this section.
# LCOV_EXCL_BR_STOP
# Marks the end of a section which is excluded from branch coverage. The current line not part of this section.
def filter_by_source_tags(
self,
LCOV_EXCL_LINE = "LCOV_EXCL_LINE",
LCOV_EXCL_START = "LCOV_EXCL_START",
LCOV_EXCL_STOP = "LCOV_EXCL_STOP",
LCOV_EXCL_BR_LINE = "LCOV_EXCL_BR_LINE",
LCOV_EXCL_BR_START = "LCOV_EXCL_BR_START",
LCOV_EXCL_BR_STOP = "LCOV_EXCL_BR_STOP",
):
assert not (bool(LCOV_EXCL_START) ^ bool(LCOV_EXCL_STOP))
assert not (bool(LCOV_EXCL_BR_START) ^ bool(LCOV_EXCL_BR_STOP))
# for each source file, create a map of excludes and then apply them to the file.
def make_exclusion_lists(f: Path):
lines_to_exclude = set() # line numbers to exclude
branches_to_exclude = set() # branch line numbers to exclude
line_section_open = None
branch_section_open = None
with open(f, "r") as src:
current_line = 1
while l := src.readline():
if LCOV_EXCL_LINE and LCOV_EXCL_LINE in l:
lines_to_exclude.add(current_line)
if LCOV_EXCL_BR_LINE and LCOV_EXCL_BR_LINE in l:
branches_to_exclude.add(current_line)
if (
LCOV_EXCL_START
and (LCOV_EXCL_START in l)
and not line_section_open
):
line_section_open = current_line
if LCOV_EXCL_STOP and (LCOV_EXCL_STOP in l) and line_section_open:
lines_to_exclude.update(range(line_section_open, current_line))
line_section_open = None
if (
LCOV_EXCL_BR_START
and (LCOV_EXCL_BR_START in l)
and not branch_section_open
):
branch_section_open = current_line
if (
LCOV_EXCL_BR_STOP
and (LCOV_EXCL_BR_STOP in l)
and branch_section_open
):
branches_to_exclude.update(
range(branch_section_open, current_line)
)
branch_section_open = None
current_line += 1
return lines_to_exclude, branches_to_exclude
records_by_source = {}
for key, record in self.records.items():
records_by_source.setdefault(key[1], [])
records_by_source[key[1]].append(record)
for f, records in records_by_source.items():
lines_to_exclude, branches_to_exclude = make_exclusion_lists(f)
record: LcovRecord
for record in records:
record.remove_lines(lines_to_exclude)
record.remove_branches(branches_to_exclude)
self.prune()
def intersection(self, other: Self):
common_records = set(self.records.keys()).intersection(
set(other.records.keys())
)
old_records = self.records()
self.records = OrderedDict()
for key in common_records():
self.records[key] = old_records[key].intersection(other.records[key])
self.prune()
return self
def union(self, other: Self):
for key, val in other.records.items():
if key not in self.records:
self.records[key] = copy.deepcopy(val)
else:
self.records[key].union(val)
self.prune()
return self
def difference(self, other: Self):
common_records = set(self.records.keys()).intersection(
set(other.records.keys())
)
for key in common_records:
self.records[key].difference(other.records[key])
self.prune()
return self
def symmetric_difference(self, other: Self) -> Self:
"""an in place version of the symmetric difference operation
Arguments:
other {Self} -- the other component to intersect with
"""
intersection = self.__and__(other)
self.difference(intersection).union(other.__sub__(intersection))
return self
def __and__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.intersection(other)
return new_component
def __or__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.union(other)
return new_component
def __sub__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.difference(other)
return new_component
def __xor__(self, other: Self):
new_component = copy.deepcopy(self)
new_component.symmetric_difference(other)
return new_component
def prune(self):
keys_to_prune = [key for key, val in self.records.items() if val.empty()]
for key in keys_to_prune:
del self.records[key]
def empty(self):
if len(self.records) == 0:
return True
else:
if not next(iter(self.records.values())).empty():
return False
else:
# this is a little bit dangerous because of the recursion,
# if prune() doesn't clear every empty record due to a bug
# we might get into an infinite recursion.
# However, this is the cleanest way to code it.
self.prune()
return self.empty()
def tag_with_test(self, test_name: str, from_test: Optional[str] = None):
# TODO: we should probably error out or normalize the string in order to preserve
# lcov compatibility
if test_name == from_test:
return
all_compatible_records = [
key for key in self.records.keys() if not from_test or key[0] == test_name
]
records_to_change = [self.records[key] for key in all_compatible_records]
for key in all_compatible_records:
del self.records[key]
for record in records_to_change:
record._test_name = test_name
self._add_record(record)
def tag_with_test_map(self, tests_map: Mapping[str, str]):
for from_test, to_test in tests_map.items():
self.tag_with_test(from_test, to_test)
def prepare_patches_for_lcov_old(patches: List[Path]) -> List[unidiff.PatchSet]:
"""Takes a list of patches paths that are assumed to be applied in their
order of appearance in the list and adjusts them to have target_line_no and
files in the final source code.
Args:
patches (List[Path]): An ordered list of patches paths
"""
patchsets: List[unidiff.PatchSet] = [
unidiff.PatchSet.from_filename(f) for f in patches
]
def eliminate_rename(
source_file: str, dest_file: str, patchsets: List[unidiff.PatchSet]
):
new_src_file = "a/" + dest_file[2:]
old_dest_file = "b/" + source_file[2:]
for patch in patchsets:
for patched_file in patch:
patched_file: unidiff.PatchedFile = patched_file
if patched_file.target_file == old_dest_file:
patched_file.source_file = new_src_file
patched_file.target_file = dest_file
def remove_file(f: str, patchsets: List[unidiff.PatchSet]):
target_to_remove = "b/" + f[2:]
for patch in patchsets:
files_to_remove = []
for patched_file in patch:
patched_file: unidiff.PatchedFile = patched_file
if patched_file.target_file == target_to_remove:
files_to_remove.append(patched_file)
[patch.remove(f) for f in files_to_remove]
def update_line_removed(line_num, source_file, patchsets):
for patch in patchsets:
for patched_file in patch:
patched_file: unidiff.PatchedFile = patched_file
if patched_file.source_file == source_file:
for line in patched_file:
line: unidiff.patch.Line = line
if line.target_line_no > line_num:
line.target_line_no -= 1
def update_line_added(line_num, target_file, patchsets):
for patch in patchsets:
for patched_file in patch:
patched_file: unidiff.PatchedFile = patched_file
if patched_file.target_file == target_file:
for line in patched_file:
line: unidiff.patch.Line = line
if line.target_line_no >= line_num:
line.target_line_no += 1
for idx, patch in enumerate(patchsets):
for patched_file in patch:
patched_file: unidiff.PatchedFile = patched_file
if patched_file.is_removed_file:
remove_file(patched_file.source_file, patchsets[:idx])
elif patched_file.is_rename:
# Make the semantics as if the file was actually never renamed (since in the final source it will be as if the file is
# already in it renamed form)
eliminate_rename(
patched_file.source_file,
patched_file.target_file,
patchsets[: idx + 1],
)
for hunk in patched_file:
hunk: unidiff.Hunk = hunk
for line in hunk:
line: unidiff.patch.Line = line
if line.is_removed:
update_line_removed(
line.source_line_no,
patched_file.source_file,
patchsets[:idx],
)
if line.is_added:
update_line_added(
line.target_line_no,
patched_file.target_file,
patchsets[:idx],
)
return patchsets
def prepare_patches_for_lcov(patches: List[Path]) -> List[unidiff.PatchSet]:
"""Takes a list of patches paths that are assumed to be applied in their
order of appearance in the list and adjusts them to have target_line_no and
files in the final source code.
Args:
patches (List[Path]): An ordered list of patches paths
"""
# rough algorithm-
# incrementally map patches to the latest file so eventually only contain
# patch data of the latest file. each file is getting handled separately.
# it calls for some kind of induction - assume that patches until n-1 are referring to
# the source file of patch n and make them refer to the target file of patch n.
patchsets: List[unidiff.PatchSet] = [
unidiff.PatchSet.from_filename(f) for f in patches
]
for n in range(1, len(patchsets)):
patches_adjustment_step(patchsets[: n + 1])
return patchsets
def patches_adjustment_step(patchlist: List[PatchSet]):
main_patch = patchlist[-1]
patches = patchlist[:-1]
# 1. if file is removed remove it from all patches
removed_files: list[PatchedFile] = list(main_patch.removed_files)
removed_files = [x.path for x in removed_files]
for patch in patches:
files_to_remove = [
f
for f in (list(patch.modified_files) + list(patch.added_files))
if f.path in removed_files
]
[patch.remove(f) for f in files_to_remove]
renamed_files = [f for f in main_patch.modified_files if f.is_rename]
def maybe_remove_prefix(path: str):
if path.startswith("a/") or path.startswith("b/"):
return path[2:]
return path
renames = {maybe_remove_prefix(f.source_file): f.target_file for f in renamed_files}
# 2. if file was renamed, change the patches targets to the new file name
for patch in patches:
files: List[PatchedFile] = list(patch.added_files) + list(patch.modified_files)
for f in files:
target = maybe_remove_prefix(f.target_file)
if target in renames:
f.target_file = renames[target]
# 3. invariant, every patch in the previous patches only contain the most recent change,
# make the invariant true also for the patches with this patch applied.
patched_files_by_target = {}
for patch in patches:
for file in patch:
file: PatchedFile = file
patched_files_by_target.setdefault(file.target_file, [])
patched_files_by_target[file.target_file].append(file)
for file in list(main_patch.modified_files):
removed_lines: List[int] = [
line.source_line_no for hunk in file for line in hunk if line.is_removed
]
added_lines: List[int] = [
line.target_line_no for hunk in file for line in hunk if line.is_added
]
removed_lines.sort()
added_lines.sort()
hunks: List[Hunk] = [
hunk
for patched_file in patched_files_by_target.get(file.target_file, [])
for hunk in patched_file
]
# remove replaced/removed lines from previous patches
[
hunk.remove(line)
for hunk in hunks
for line in list(hunk)
if line.target_line_no in removed_lines
]
lines: List[Line] = [line for hunk in hunks for line in hunk if line.is_added]
lines.sort(key = lambda l: l.target_line_no)
def apply_transform(transform):
transform_iter = iter(transform)
current_transform = next(transform_iter, (0, 0))
for line in lines[::-1]:
while current_transform[0] > line.target_line_no:
current_transform = next(transform_iter, (0, 0))
if current_transform == (0, 0):
break
line.target_line_no += current_transform[1]
removed_transform = list(zip(removed_lines, accumulate(repeat(-1))))[::-1]
apply_transform(removed_transform)
added_transform = list(zip(added_lines, accumulate(repeat(1))))
added_transform = list([(x - y + 1, y) for x, y in added_transform])[::-1]
apply_transform(added_transform)
# 4. TODO: Cleanup
# Remove hunks that don't have added or deleted lines, remove files that don't
# have any hunks. It is a performance optimization hence it is not implemented right
# now.