Coverage for tests / helpers.py: 100.000%
156 statements
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-29 20:34 +0000
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-29 20:34 +0000
1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt
4"""Helpers for coverage.py tests."""
6from __future__ import annotations 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
8import collections 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
9import contextlib 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
10import dis 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
11import io 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
12import locale 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
13import os 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
14import os.path 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
15import re 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
16import shutil 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
17import subprocess 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
18import sys 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
19import textwrap 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
20import warnings 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
22from pathlib import Path 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
23from typing import Any, Callable, NoReturn, TypeVar, cast 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
24from collections.abc import Iterable, Iterator 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
26from coverage import env 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
27from coverage.debug import DebugControl 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
28from coverage.exceptions import CoverageWarning 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
29from coverage.types import TArc 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
32def _correct_encoding() -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
33 """Determine the right encoding to use for subprocesses."""
34 # Type checking trick due to "unreachable" being set
35 _locale_type_erased: Any = locale 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
37 encoding = os.device_encoding(1) or ( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
38 _locale_type_erased.getpreferredencoding()
39 if sys.version_info < (3, 11)
40 else _locale_type_erased.getencoding()
41 )
42 return encoding 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
45def subprocess_popen(cmd: str) -> subprocess.Popen[bytes]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
46 """Run a command in a subprocess.
48 Returns the Popen object.
50 """
51 # Subprocesses are expensive, but convenient, and so may be over-used in
52 # the test suite. Use these lines to get a list of the tests using them:
53 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
54 pth = "/tmp/processes.txt" # type: ignore[unreachable]
55 with open(pth, "a", encoding="utf-8") as proctxt:
56 print(os.getenv("PYTEST_CURRENT_TEST", "unknown"), file=proctxt, flush=True)
58 # In some strange cases (PyPy3 in a virtualenv!?) the stdout encoding of
59 # the subprocess is set incorrectly to ascii. Use an environment variable
60 # to force the encoding to be the same as ours.
61 sub_env = dict(os.environ) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
62 sub_env["PYTHONIOENCODING"] = _correct_encoding() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
64 proc = subprocess.Popen( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
65 cmd,
66 shell=True,
67 env=sub_env,
68 stdin=subprocess.PIPE,
69 stdout=subprocess.PIPE,
70 stderr=subprocess.STDOUT,
71 )
72 return proc 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
75def run_command(cmd: str) -> tuple[int, str]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
76 """Run a command in a subprocess.
78 Returns the exit status code and the combined stdout and stderr.
80 """
81 proc = subprocess_popen(cmd) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
82 output, _ = proc.communicate() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
83 status = proc.returncode 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
85 # Get the output, and canonicalize it to strings with newlines.
86 output_str = output.decode(_correct_encoding()).replace("\r", "") 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
87 return status, output_str 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
90# $set_env.py: COVERAGE_DIS - Disassemble test code to /tmp/dis
91SHOW_DIS = bool(int(os.getenv("COVERAGE_DIS", "0"))) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
94def make_file( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%
95 filename: str,
96 text: str = "",
97 bytes: bytes = b"",
98 newline: str | None = None,
99) -> str:
100 """Create a file for testing.
102 `filename` is the relative path to the file, including directories if
103 desired, which will be created if need be.
105 `text` is the text content to create in the file, or `bytes` are the
106 bytes to write.
108 If `newline` is provided, it is a string that will be used as the line
109 endings in the created file, otherwise the line endings are as provided
110 in `text`.
112 Returns `filename`.
114 """
115 # pylint: disable=redefined-builtin # bytes
116 if bytes: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
117 data = bytes 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
118 else:
119 text = textwrap.dedent(text) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
120 if newline: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
121 text = text.replace("\n", newline) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
122 data = text.encode("utf-8") 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
124 # Make sure the directories are available.
125 dirs, basename = os.path.split(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
126 if dirs: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
127 os.makedirs(dirs, exist_ok=True) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
129 # Create the file.
130 with open(filename, "wb") as f: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
131 f.write(data) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
133 if text and basename.endswith(".py") and SHOW_DIS: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
134 os.makedirs("/tmp/dis", exist_ok=True)
135 with open(f"/tmp/dis/{basename}.dis", "w", encoding="utf-8") as fdis:
136 print(f"# {os.path.abspath(filename)}", file=fdis)
137 cur_test = os.getenv("PYTEST_CURRENT_TEST", "unknown")
138 print(f"# PYTEST_CURRENT_TEST = {cur_test}", file=fdis)
139 kwargs = {}
140 if env.PYVERSION >= (3, 13):
141 kwargs["show_offsets"] = True
142 try:
143 dis.dis(text, file=fdis, **kwargs)
144 except Exception as exc:
145 # Some tests make .py files that aren't Python, so dis will
146 # fail, which is expected.
147 print(f"#! {exc!r}", file=fdis)
149 # For debugging, enable this to show the contents of files created.
150 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
151 print(f" ───┬──┤ {filename} ├───────────────────────") # type: ignore[unreachable]
152 for lineno, line in enumerate(data.splitlines(), start=1):
153 print(f"{lineno:6}│ {line.rstrip().decode()}")
154 print()
156 return filename 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
159def nice_file(*fparts: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
160 """Canonicalize the file name composed of the parts in `fparts`."""
161 fname = os.path.join(*fparts) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
162 return os.path.normcase(os.path.abspath(os.path.realpath(fname))) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
165def os_sep(s: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
166 """Replace slashes in `s` with the correct separator for the OS."""
167 return s.replace("/", os.sep) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
170class CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
171 """Asserts the uniqueness of file names passed to a function."""
173 def __init__(self, wrapped: Callable[..., Any]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
174 self.filenames: set[str] = set() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
175 self.wrapped = wrapped 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
177 @classmethod 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
178 def hook(cls, obj: Any, method_name: str) -> CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
179 """Replace a method with our checking wrapper.
181 The method must take a string as a first argument. That argument
182 will be checked for uniqueness across all the calls to this method.
184 The values don't have to be file names actually, just strings, but
185 we only use it for filename arguments.
187 """
188 method = getattr(obj, method_name) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
189 hook = cls(method) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
190 setattr(obj, method_name, hook.wrapper) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
191 return hook 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
193 def wrapper(self, filename: str, *args: Any, **kwargs: Any) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
194 """The replacement method. Check that we don't have dupes."""
195 assert filename not in self.filenames, ( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
196 f"File name {filename!r} passed to {self.wrapped!r} twice"
197 )
198 self.filenames.add(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
199 return self.wrapped(filename, *args, **kwargs) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
202def re_lines(pat: str, text: str, match: bool = True) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
203 """Return a list of lines selected by `pat` in the string `text`.
205 If `match` is false, the selection is inverted: only the non-matching
206 lines are included.
208 Returns a list, the selected lines, without line endings.
210 """
211 assert len(pat) < 200, "It's super-easy to swap the arguments to re_lines" 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
212 return [l for l in text.splitlines() if bool(re.search(pat, l)) == match] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
215def re_lines_text(pat: str, text: str, match: bool = True) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
216 """Return the multi-line text of lines selected by `pat`."""
217 return "".join(l + "\n" for l in re_lines(pat, text, match=match)) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
220def re_line(pat: str, text: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
221 """Return the one line in `text` that matches regex `pat`.
223 Raises an AssertionError if more than one, or less than one, line matches.
225 """
226 lines = re_lines(pat, text) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
227 assert len(lines) == 1 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
228 return lines[0] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
231def remove_tree(dirname: str) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
232 """Remove a directory tree.
234 It's fine for the directory to not exist in the first place.
235 """
236 if os.path.exists(dirname): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
237 shutil.rmtree(dirname) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
240# Map chars to numbers for arcz_to_arcs
241_arcz_map = {".": -1} 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
242_arcz_map.update({c: ord(c) - ord("0") for c in "123456789"}) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
243_arcz_map.update({c: 10 + ord(c) - ord("A") for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"}) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
246def arcz_to_arcs(arcz: str) -> list[TArc]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
247 """Convert a compact textual representation of arcs to a list of pairs.
249 The text has space-separated pairs of letters. Period is -1, 1-9 are
250 1-9, A-Z are 10 through 36. The resulting list is sorted regardless of
251 the order of the input pairs.
253 ".1 12 2." --> [(-1,1), (1,2), (2,-1)]
255 Minus signs can be included in the pairs:
257 "-11, 12, 2-5" --> [(-1,1), (1,2), (2,-5)]
259 """
260 # The `type: ignore[misc]` here are to suppress "Unpacking a string is
261 # disallowed".
262 a: str
263 b: str
264 arcs = [] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
265 for pair in arcz.split(): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
266 asgn = bsgn = 1 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJ$KL6MN7OP#QR8ST9UVWX!YZ01%234
267 if len(pair) == 2: 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJ$KL6MN7OP#QR8ST9UVWX!YZ01%234
268 a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJ$KL6MN7OP#QR8ST9UVWX!YZ01%234
269 else:
270 assert len(pair) == 3 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKL6MN7OP#QR8ST9UVWX!YZ01234
271 if pair[0] == "-": 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKL6MN7OP#QR8ST9UVWX!YZ01234
272 _, a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKLMNOPQRSTUVWXYZ01234
273 asgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKLMNOPQRSTUVWXYZ01234
274 else:
275 assert pair[1] == "-" 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKL6MN7OP#QR8ST9UVWX!YZ01234
276 a, _, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKL6MN7OP#QR8ST9UVWX!YZ01234
277 bsgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJKL6MN7OP#QR8ST9UVWX!YZ01234
278 arcs.append((asgn * _arcz_map[a], bsgn * _arcz_map[b])) 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJ$KL6MN7OP#QR8ST9UVWX!YZ01%234
279 return sorted(arcs) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
282@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
283def change_dir(new_dir: str | Path) -> Iterator[None]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
284 """Change directory, and then change back.
286 Use as a context manager, it will return to the original
287 directory at the end of the block.
289 """
290 old_dir = os.getcwd() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
291 os.chdir(str(new_dir)) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
292 try: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
293 yield 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
294 finally:
295 os.chdir(old_dir) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
298T = TypeVar("T") 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
301def assert_count_equal( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
302 a: Iterable[T] | None,
303 b: Iterable[T] | None,
304) -> None:
305 """
306 A pytest-friendly implementation of assertCountEqual.
308 Assert that `a` and `b` have the same elements, but maybe in different order.
309 This only works for hashable elements.
310 """
311 assert a is not None 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
312 assert b is not None 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
313 assert collections.Counter(list(a)) == collections.Counter(list(b)) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
316def get_coverage_warnings(warns: Iterable[warnings.WarningMessage]) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
317 """Extract the text of CoverageWarnings."""
318 warns = [w for w in warns if issubclass(w.category, CoverageWarning)] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
319 texts = [cast(Warning, w.message).args[0] for w in warns] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
320 return texts 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
323def assert_coverage_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
324 warns: Iterable[warnings.WarningMessage],
325 *msgs: str | re.Pattern[str],
326) -> None:
327 """
328 Assert that the CoverageWarning's in `warns` have `msgs` as messages.
330 Each msg can be a string compared for equality, or a compiled regex used to
331 search the text.
332 """
333 actuals = get_coverage_warnings(warns) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
334 assert msgs # don't call this without some messages. 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
335 assert len(msgs) == len(actuals) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
336 for actual, expected in zip(actuals, msgs): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
337 if hasattr(expected, "search"): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
338 assert expected.search(actual), f"{actual!r} didn't match {expected!r}" 1abcdefghijklmnopqrstuvwxyzABCDEFGH5IJ$KL6MN7OPQR8ST9UV'WX!YZ(01%234
339 else:
340 actual = actual.partition("; see ")[0] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
341 assert actual == expected 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
344@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
345def swallow_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
346 message: str = r".",
347 category: type[Warning] = CoverageWarning,
348) -> Iterator[None]:
349 """Swallow particular warnings.
351 It's OK if they happen, or if they don't happen. Just ignore them.
352 """
353 with warnings.catch_warnings(): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
354 warnings.filterwarnings("ignore", category=category, message=message) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
355 yield 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
358class FailingProxy: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
359 """A proxy for another object, but one method will fail a few times before working."""
361 def __init__(self, obj: Any, methname: str, fails: list[Exception]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
362 """Create the failing proxy.
364 `obj` is the object to proxy. `methname` is the method that will fail
365 a few times. `fails` are the exceptions to fail with. Once used up,
366 the method will proxy correctly.
368 """
369 self.obj = obj 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
370 self.methname = methname 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
371 self.fails = fails 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
373 def __getattr__(self, name: str) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
374 if name == self.methname and self.fails: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
375 meth = self._make_failing_method(self.fails[0]) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
376 del self.fails[0] 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
377 else:
378 meth = getattr(self.obj, name) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
379 return meth 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
381 def _make_failing_method(self, exc: Exception) -> Callable[..., NoReturn]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
382 """Return a function that will raise `exc`."""
384 def _meth(*args: Any, **kwargs: Any) -> NoReturn: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
385 raise exc 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
387 return _meth 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
390class DebugControlString(DebugControl): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
391 """A `DebugControl` that writes to a StringIO, for testing."""
393 def __init__(self, options: Iterable[str]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
394 self.io = io.StringIO() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
395 super().__init__(options, self.io) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
397 def get_output(self) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
398 """Get the output text from the `DebugControl`."""
399 return self.io.getvalue() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
402def all_our_source_files() -> Iterable[tuple[Path, str]]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
403 """Iterate over all of our own source files.
405 This is used in tests that need a bunch of Python code to analyze, so we
406 might as well use our own source code as the subject.
408 Produces a stream of (filename, file contents) tuples.
409 """
410 cov_dir = Path(__file__).parent.parent 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
411 # To run against all the files in the tox venvs:
412 # for source_file in cov_dir.rglob("*.py"):
413 for sub in [".", "ci", "coverage", "lab", "tests"]: 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
414 assert (cov_dir / sub).is_dir() 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
415 for source_file in (cov_dir / sub).glob("*.py"): 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234
416 yield (source_file, source_file.read_text(encoding="utf-8")) 1abcdefghijklmnopqrstuvwxyzABCDEF)GH5IJ$KL6MN7OP#QR8ST9UV'WX!YZ(01%234