Coverage for tests / helpers.py: 100.000%
156 statements
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt
4"""Helpers for coverage.py tests."""
6from __future__ import annotations 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
8import collections 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
9import contextlib 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
10import dis 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
11import io 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
12import locale 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
13import os 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
14import os.path 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
15import re 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
16import shutil 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
17import subprocess 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
18import sys 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
19import textwrap 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
20import warnings 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
22from pathlib import Path 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
23from typing import Any, Callable, NoReturn, TypeVar, cast 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
24from collections.abc import Iterable, Iterator 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
26from coverage import env 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
27from coverage.debug import DebugControl 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
28from coverage.exceptions import CoverageWarning 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
29from coverage.types import TArc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
32def _correct_encoding() -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
33 """Determine the right encoding to use for subprocesses."""
34 # Type checking trick due to "unreachable" being set
35 _locale_type_erased: Any = locale 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
37 encoding = os.device_encoding(1) or ( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
38 _locale_type_erased.getpreferredencoding()
39 if sys.version_info < (3, 11)
40 else _locale_type_erased.getencoding()
41 )
42 return encoding 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
45def subprocess_popen(cmd: str) -> subprocess.Popen[bytes]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
46 """Run a command in a subprocess.
48 Returns the Popen object.
50 """
51 # Subprocesses are expensive, but convenient, and so may be over-used in
52 # the test suite. Use these lines to get a list of the tests using them:
53 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
54 pth = "/tmp/processes.txt" # type: ignore[unreachable]
55 with open(pth, "a", encoding="utf-8") as proctxt:
56 print(os.getenv("PYTEST_CURRENT_TEST", "unknown"), file=proctxt, flush=True)
58 # In some strange cases (PyPy3 in a virtualenv!?) the stdout encoding of
59 # the subprocess is set incorrectly to ascii. Use an environment variable
60 # to force the encoding to be the same as ours.
61 sub_env = dict(os.environ) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
62 sub_env["PYTHONIOENCODING"] = _correct_encoding() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
64 proc = subprocess.Popen( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
65 cmd,
66 shell=True,
67 env=sub_env,
68 stdin=subprocess.PIPE,
69 stdout=subprocess.PIPE,
70 stderr=subprocess.STDOUT,
71 )
72 return proc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
75def run_command(cmd: str) -> tuple[int, str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
76 """Run a command in a subprocess.
78 Returns the exit status code and the combined stdout and stderr.
80 """
81 proc = subprocess_popen(cmd) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
82 output, _ = proc.communicate() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
83 status = proc.returncode 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
85 # Get the output, and canonicalize it to strings with newlines.
86 output_str = output.decode(_correct_encoding()).replace("\r", "") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
87 return status, output_str 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
90# $set_env.py: COVERAGE_DIS - Disassemble test code to /tmp/dis
91SHOW_DIS = bool(int(os.getenv("COVERAGE_DIS", "0"))) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
94def make_file( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019
95 filename: str,
96 text: str = "",
97 bytes: bytes = b"",
98 newline: str | None = None,
99) -> str:
100 """Create a file for testing.
102 `filename` is the relative path to the file, including directories if
103 desired, which will be created if need be.
105 `text` is the text content to create in the file, or `bytes` are the
106 bytes to write.
108 If `newline` is provided, it is a string that will be used as the line
109 endings in the created file, otherwise the line endings are as provided
110 in `text`.
112 Returns `filename`.
114 """
115 # pylint: disable=redefined-builtin # bytes
116 if bytes: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
117 data = bytes 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
118 else:
119 text = textwrap.dedent(text) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
120 if newline: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
121 text = text.replace("\n", newline) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
122 data = text.encode("utf-8") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
124 # Make sure the directories are available.
125 dirs, basename = os.path.split(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
126 if dirs: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
127 os.makedirs(dirs, exist_ok=True) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
129 # Create the file.
130 with open(filename, "wb") as f: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
131 f.write(data) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
133 if text and basename.endswith(".py") and SHOW_DIS: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
134 os.makedirs("/tmp/dis", exist_ok=True)
135 with open(f"/tmp/dis/{basename}.dis", "w", encoding="utf-8") as fdis:
136 print(f"# {os.path.abspath(filename)}", file=fdis)
137 cur_test = os.getenv("PYTEST_CURRENT_TEST", "unknown")
138 print(f"# PYTEST_CURRENT_TEST = {cur_test}", file=fdis)
139 kwargs = {}
140 if env.PYVERSION >= (3, 13):
141 kwargs["show_offsets"] = True
142 try:
143 dis.dis(text, file=fdis, **kwargs)
144 except Exception as exc:
145 # Some tests make .py files that aren't Python, so dis will
146 # fail, which is expected.
147 print(f"#! {exc!r}", file=fdis)
149 # For debugging, enable this to show the contents of files created.
150 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
151 print(f" ───┬──┤ {filename} ├───────────────────────") # type: ignore[unreachable]
152 for lineno, line in enumerate(data.splitlines(), start=1):
153 print(f"{lineno:6}│ {line.rstrip().decode()}")
154 print()
156 return filename 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
159def nice_file(*fparts: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
160 """Canonicalize the file name composed of the parts in `fparts`."""
161 fname = os.path.join(*fparts) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
162 return os.path.normcase(os.path.abspath(os.path.realpath(fname))) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
165def os_sep(s: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
166 """Replace slashes in `s` with the correct separator for the OS."""
167 return s.replace("/", os.sep) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
170class CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
171 """Asserts the uniqueness of file names passed to a function."""
173 def __init__(self, wrapped: Callable[..., Any]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
174 self.filenames: set[str] = set() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
175 self.wrapped = wrapped 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
177 @classmethod 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
178 def hook(cls, obj: Any, method_name: str) -> CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
179 """Replace a method with our checking wrapper.
181 The method must take a string as a first argument. That argument
182 will be checked for uniqueness across all the calls to this method.
184 The values don't have to be file names actually, just strings, but
185 we only use it for filename arguments.
187 """
188 method = getattr(obj, method_name) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
189 hook = cls(method) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
190 setattr(obj, method_name, hook.wrapper) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
191 return hook 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
193 def wrapper(self, filename: str, *args: Any, **kwargs: Any) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
194 """The replacement method. Check that we don't have dupes."""
195 assert filename not in self.filenames, ( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
196 f"File name {filename!r} passed to {self.wrapped!r} twice"
197 )
198 self.filenames.add(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
199 return self.wrapped(filename, *args, **kwargs) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
202def re_lines(pat: str, text: str, match: bool = True) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
203 """Return a list of lines selected by `pat` in the string `text`.
205 If `match` is false, the selection is inverted: only the non-matching
206 lines are included.
208 Returns a list, the selected lines, without line endings.
210 """
211 assert len(pat) < 200, "It's super-easy to swap the arguments to re_lines" 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
212 return [l for l in text.splitlines() if bool(re.search(pat, l)) == match] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
215def re_lines_text(pat: str, text: str, match: bool = True) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
216 """Return the multi-line text of lines selected by `pat`."""
217 return "".join(l + "\n" for l in re_lines(pat, text, match=match)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
220def re_line(pat: str, text: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
221 """Return the one line in `text` that matches regex `pat`.
223 Raises an AssertionError if more than one, or less than one, line matches.
225 """
226 lines = re_lines(pat, text) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
227 assert len(lines) == 1 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
228 return lines[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
231def remove_tree(dirname: str) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
232 """Remove a directory tree.
234 It's fine for the directory to not exist in the first place.
235 """
236 if os.path.exists(dirname): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
237 shutil.rmtree(dirname) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
240# Map chars to numbers for arcz_to_arcs
241_arcz_map = {".": -1} 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
242_arcz_map.update({c: ord(c) - ord("0") for c in "123456789"}) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
243_arcz_map.update({c: 10 + ord(c) - ord("A") for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"}) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
246def arcz_to_arcs(arcz: str) -> list[TArc]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
247 """Convert a compact textual representation of arcs to a list of pairs.
249 The text has space-separated pairs of letters. Period is -1, 1-9 are
250 1-9, A-Z are 10 through 36. The resulting list is sorted regardless of
251 the order of the input pairs.
253 ".1 12 2." --> [(-1,1), (1,2), (2,-1)]
255 Minus signs can be included in the pairs:
257 "-11, 12, 2-5" --> [(-1,1), (1,2), (2,-5)]
259 """
260 # The `type: ignore[misc]` here are to suppress "Unpacking a string is
261 # disallowed".
262 a: str
263 b: str
264 arcs = [] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
265 for pair in arcz.split(): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
266 asgn = bsgn = 1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234
267 if len(pair) == 2: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234
268 a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234
269 else:
270 assert len(pair) == 3 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234
271 if pair[0] == "-": 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234
272 _, a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
273 asgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234
274 else:
275 assert pair[1] == "-" 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234
276 a, _, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234
277 bsgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234
278 arcs.append((asgn * _arcz_map[a], bsgn * _arcz_map[b])) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234
279 return sorted(arcs) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
282@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
283def change_dir(new_dir: str | Path) -> Iterator[None]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
284 """Change directory, and then change back.
286 Use as a context manager, it will return to the original
287 directory at the end of the block.
289 """
290 old_dir = os.getcwd() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
291 os.chdir(str(new_dir)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
292 try: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
293 yield 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
294 finally:
295 os.chdir(old_dir) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
298T = TypeVar("T") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
301def assert_count_equal( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
302 a: Iterable[T] | None,
303 b: Iterable[T] | None,
304) -> None:
305 """
306 A pytest-friendly implementation of assertCountEqual.
308 Assert that `a` and `b` have the same elements, but maybe in different order.
309 This only works for hashable elements.
310 """
311 assert a is not None 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
312 assert b is not None 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
313 assert collections.Counter(list(a)) == collections.Counter(list(b)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
316def get_coverage_warnings(warns: Iterable[warnings.WarningMessage]) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
317 """Extract the text of CoverageWarnings."""
318 warns = [w for w in warns if issubclass(w.category, CoverageWarning)] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
319 texts = [cast(Warning, w.message).args[0] for w in warns] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
320 return texts 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
323def assert_coverage_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
324 warns: Iterable[warnings.WarningMessage],
325 *msgs: str | re.Pattern[str],
326) -> None:
327 """
328 Assert that the CoverageWarning's in `warns` have `msgs` as messages.
330 Each msg can be a string compared for equality, or a compiled regex used to
331 search the text.
332 """
333 actuals = get_coverage_warnings(warns) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
334 assert msgs # don't call this without some messages. 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
335 assert len(msgs) == len(actuals) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
336 for actual, expected in zip(actuals, msgs): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
337 if hasattr(expected, "search"): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
338 assert expected.search(actual), f"{actual!r} didn't match {expected!r}" 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UVWX'YZ019234
339 else:
340 actual = actual.partition("; see ")[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
341 assert actual == expected 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
344@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
345def swallow_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
346 message: str = r".",
347 category: type[Warning] = CoverageWarning,
348) -> Iterator[None]:
349 """Swallow particular warnings.
351 It's OK if they happen, or if they don't happen. Just ignore them.
352 """
353 with warnings.catch_warnings(): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
354 warnings.filterwarnings("ignore", category=category, message=message) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
355 yield 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
358class FailingProxy: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
359 """A proxy for another object, but one method will fail a few times before working."""
361 def __init__(self, obj: Any, methname: str, fails: list[Exception]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
362 """Create the failing proxy.
364 `obj` is the object to proxy. `methname` is the method that will fail
365 a few times. `fails` are the exceptions to fail with. Once used up,
366 the method will proxy correctly.
368 """
369 self.obj = obj 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
370 self.methname = methname 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
371 self.fails = fails 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
373 def __getattr__(self, name: str) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
374 if name == self.methname and self.fails: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
375 meth = self._make_failing_method(self.fails[0]) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
376 del self.fails[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
377 else:
378 meth = getattr(self.obj, name) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
379 return meth 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
381 def _make_failing_method(self, exc: Exception) -> Callable[..., NoReturn]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
382 """Return a function that will raise `exc`."""
384 def _meth(*args: Any, **kwargs: Any) -> NoReturn: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
385 raise exc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
387 return _meth 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
390class DebugControlString(DebugControl): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
391 """A `DebugControl` that writes to a StringIO, for testing."""
393 def __init__(self, options: Iterable[str]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
394 self.io = io.StringIO() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
395 super().__init__(options, self.io) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
397 def get_output(self) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
398 """Get the output text from the `DebugControl`."""
399 return self.io.getvalue() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
402def all_our_source_files() -> Iterable[tuple[Path, str]]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
403 """Iterate over all of our own source files.
405 This is used in tests that need a bunch of Python code to analyze, so we
406 might as well use our own source code as the subject.
408 Produces a stream of (filename, file contents) tuples.
409 """
410 cov_dir = Path(__file__).parent.parent 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
411 # To run against all the files in the tox venvs:
412 # for source_file in cov_dir.rglob("*.py"):
413 for sub in [".", "ci", "coverage", "lab", "tests"]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
414 assert (cov_dir / sub).is_dir() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
415 for source_file in (cov_dir / sub).glob("*.py"): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234
416 yield (source_file, source_file.read_text(encoding="utf-8")) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234