Coverage for tests / helpers.py: 100.000%

156 statements  

« prev     ^ index     » next       coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000

1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 

2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt 

3 

4"""Helpers for coverage.py tests.""" 

5 

6from __future__ import annotations 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

7 

8import collections 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

9import contextlib 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

10import dis 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

11import io 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

12import locale 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

13import os 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

14import os.path 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

15import re 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

16import shutil 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

17import subprocess 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

18import sys 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

19import textwrap 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

20import warnings 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

21 

22from pathlib import Path 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

23from typing import Any, Callable, NoReturn, TypeVar, cast 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

24from collections.abc import Iterable, Iterator 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

25 

26from coverage import env 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

27from coverage.debug import DebugControl 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

28from coverage.exceptions import CoverageWarning 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

29from coverage.types import TArc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

30 

31 

32def _correct_encoding() -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

33 """Determine the right encoding to use for subprocesses.""" 

34 # Type checking trick due to "unreachable" being set 

35 _locale_type_erased: Any = locale 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

36 

37 encoding = os.device_encoding(1) or ( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

38 _locale_type_erased.getpreferredencoding() 

39 if sys.version_info < (3, 11) 

40 else _locale_type_erased.getencoding() 

41 ) 

42 return encoding 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

43 

44 

45def subprocess_popen(cmd: str) -> subprocess.Popen[bytes]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

46 """Run a command in a subprocess. 

47 

48 Returns the Popen object. 

49 

50 """ 

51 # Subprocesses are expensive, but convenient, and so may be over-used in 

52 # the test suite. Use these lines to get a list of the tests using them: 

53 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

54 pth = "/tmp/processes.txt" # type: ignore[unreachable] 

55 with open(pth, "a", encoding="utf-8") as proctxt: 

56 print(os.getenv("PYTEST_CURRENT_TEST", "unknown"), file=proctxt, flush=True) 

57 

58 # In some strange cases (PyPy3 in a virtualenv!?) the stdout encoding of 

59 # the subprocess is set incorrectly to ascii. Use an environment variable 

60 # to force the encoding to be the same as ours. 

61 sub_env = dict(os.environ) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

62 sub_env["PYTHONIOENCODING"] = _correct_encoding() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

63 

64 proc = subprocess.Popen( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

65 cmd, 

66 shell=True, 

67 env=sub_env, 

68 stdin=subprocess.PIPE, 

69 stdout=subprocess.PIPE, 

70 stderr=subprocess.STDOUT, 

71 ) 

72 return proc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

73 

74 

75def run_command(cmd: str) -> tuple[int, str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

76 """Run a command in a subprocess. 

77 

78 Returns the exit status code and the combined stdout and stderr. 

79 

80 """ 

81 proc = subprocess_popen(cmd) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

82 output, _ = proc.communicate() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

83 status = proc.returncode 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

84 

85 # Get the output, and canonicalize it to strings with newlines. 

86 output_str = output.decode(_correct_encoding()).replace("\r", "") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

87 return status, output_str 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

88 

89 

90# $set_env.py: COVERAGE_DIS - Disassemble test code to /tmp/dis 

91SHOW_DIS = bool(int(os.getenv("COVERAGE_DIS", "0"))) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

92 

93 

94def make_file( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019

95 filename: str, 

96 text: str = "", 

97 bytes: bytes = b"", 

98 newline: str | None = None, 

99) -> str: 

100 """Create a file for testing. 

101 

102 `filename` is the relative path to the file, including directories if 

103 desired, which will be created if need be. 

104 

105 `text` is the text content to create in the file, or `bytes` are the 

106 bytes to write. 

107 

108 If `newline` is provided, it is a string that will be used as the line 

109 endings in the created file, otherwise the line endings are as provided 

110 in `text`. 

111 

112 Returns `filename`. 

113 

114 """ 

115 # pylint: disable=redefined-builtin # bytes 

116 if bytes: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

117 data = bytes 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234

118 else: 

119 text = textwrap.dedent(text) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

120 if newline: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

121 text = text.replace("\n", newline) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234

122 data = text.encode("utf-8") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

123 

124 # Make sure the directories are available. 

125 dirs, basename = os.path.split(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

126 if dirs: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

127 os.makedirs(dirs, exist_ok=True) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

128 

129 # Create the file. 

130 with open(filename, "wb") as f: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

131 f.write(data) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

132 

133 if text and basename.endswith(".py") and SHOW_DIS: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

134 os.makedirs("/tmp/dis", exist_ok=True) 

135 with open(f"/tmp/dis/{basename}.dis", "w", encoding="utf-8") as fdis: 

136 print(f"# {os.path.abspath(filename)}", file=fdis) 

137 cur_test = os.getenv("PYTEST_CURRENT_TEST", "unknown") 

138 print(f"# PYTEST_CURRENT_TEST = {cur_test}", file=fdis) 

139 kwargs = {} 

140 if env.PYVERSION >= (3, 13): 

141 kwargs["show_offsets"] = True 

142 try: 

143 dis.dis(text, file=fdis, **kwargs) 

144 except Exception as exc: 

145 # Some tests make .py files that aren't Python, so dis will 

146 # fail, which is expected. 

147 print(f"#! {exc!r}", file=fdis) 

148 

149 # For debugging, enable this to show the contents of files created. 

150 if 0: # pragma: debugging 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

151 print(f" ───┬──┤ {filename} ├───────────────────────") # type: ignore[unreachable] 

152 for lineno, line in enumerate(data.splitlines(), start=1): 

153 print(f"{lineno:6}│ {line.rstrip().decode()}") 

154 print() 

155 

156 return filename 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

157 

158 

159def nice_file(*fparts: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

160 """Canonicalize the file name composed of the parts in `fparts`.""" 

161 fname = os.path.join(*fparts) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

162 return os.path.normcase(os.path.abspath(os.path.realpath(fname))) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

163 

164 

165def os_sep(s: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

166 """Replace slashes in `s` with the correct separator for the OS.""" 

167 return s.replace("/", os.sep) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

168 

169 

170class CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

171 """Asserts the uniqueness of file names passed to a function.""" 

172 

173 def __init__(self, wrapped: Callable[..., Any]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

174 self.filenames: set[str] = set() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

175 self.wrapped = wrapped 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

176 

177 @classmethod 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

178 def hook(cls, obj: Any, method_name: str) -> CheckUniqueFilenames: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

179 """Replace a method with our checking wrapper. 

180 

181 The method must take a string as a first argument. That argument 

182 will be checked for uniqueness across all the calls to this method. 

183 

184 The values don't have to be file names actually, just strings, but 

185 we only use it for filename arguments. 

186 

187 """ 

188 method = getattr(obj, method_name) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

189 hook = cls(method) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

190 setattr(obj, method_name, hook.wrapper) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

191 return hook 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

192 

193 def wrapper(self, filename: str, *args: Any, **kwargs: Any) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

194 """The replacement method. Check that we don't have dupes.""" 

195 assert filename not in self.filenames, ( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

196 f"File name {filename!r} passed to {self.wrapped!r} twice" 

197 ) 

198 self.filenames.add(filename) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

199 return self.wrapped(filename, *args, **kwargs) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

200 

201 

202def re_lines(pat: str, text: str, match: bool = True) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

203 """Return a list of lines selected by `pat` in the string `text`. 

204 

205 If `match` is false, the selection is inverted: only the non-matching 

206 lines are included. 

207 

208 Returns a list, the selected lines, without line endings. 

209 

210 """ 

211 assert len(pat) < 200, "It's super-easy to swap the arguments to re_lines" 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

212 return [l for l in text.splitlines() if bool(re.search(pat, l)) == match] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

213 

214 

215def re_lines_text(pat: str, text: str, match: bool = True) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

216 """Return the multi-line text of lines selected by `pat`.""" 

217 return "".join(l + "\n" for l in re_lines(pat, text, match=match)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

218 

219 

220def re_line(pat: str, text: str) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

221 """Return the one line in `text` that matches regex `pat`. 

222 

223 Raises an AssertionError if more than one, or less than one, line matches. 

224 

225 """ 

226 lines = re_lines(pat, text) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

227 assert len(lines) == 1 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

228 return lines[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

229 

230 

231def remove_tree(dirname: str) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

232 """Remove a directory tree. 

233 

234 It's fine for the directory to not exist in the first place. 

235 """ 

236 if os.path.exists(dirname): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

237 shutil.rmtree(dirname) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

238 

239 

240# Map chars to numbers for arcz_to_arcs 

241_arcz_map = {".": -1} 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

242_arcz_map.update({c: ord(c) - ord("0") for c in "123456789"}) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

243_arcz_map.update({c: 10 + ord(c) - ord("A") for c in "ABCDEFGHIJKLMNOPQRSTUVWXYZ"}) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

244 

245 

246def arcz_to_arcs(arcz: str) -> list[TArc]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

247 """Convert a compact textual representation of arcs to a list of pairs. 

248 

249 The text has space-separated pairs of letters. Period is -1, 1-9 are 

250 1-9, A-Z are 10 through 36. The resulting list is sorted regardless of 

251 the order of the input pairs. 

252 

253 ".1 12 2." --> [(-1,1), (1,2), (2,-1)] 

254 

255 Minus signs can be included in the pairs: 

256 

257 "-11, 12, 2-5" --> [(-1,1), (1,2), (2,-5)] 

258 

259 """ 

260 # The `type: ignore[misc]` here are to suppress "Unpacking a string is 

261 # disallowed". 

262 a: str 

263 b: str 

264 arcs = [] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

265 for pair in arcz.split(): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

266 asgn = bsgn = 1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234

267 if len(pair) == 2: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234

268 a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234

269 else: 

270 assert len(pair) == 3 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234

271 if pair[0] == "-": 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234

272 _, a, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234

273 asgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234

274 else: 

275 assert pair[1] == "-" 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234

276 a, _, b = pair # type: ignore[misc] 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234

277 bsgn = -1 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMN5OPQRSTUVWXYZ01234

278 arcs.append((asgn * _arcz_map[a], bsgn * _arcz_map[b])) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ6KL7MN5OPQRST8UVWXYZ019234

279 return sorted(arcs) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

280 

281 

282@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

283def change_dir(new_dir: str | Path) -> Iterator[None]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

284 """Change directory, and then change back. 

285 

286 Use as a context manager, it will return to the original 

287 directory at the end of the block. 

288 

289 """ 

290 old_dir = os.getcwd() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

291 os.chdir(str(new_dir)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

292 try: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

293 yield 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

294 finally: 

295 os.chdir(old_dir) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

296 

297 

298T = TypeVar("T") 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

299 

300 

301def assert_count_equal( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

302 a: Iterable[T] | None, 

303 b: Iterable[T] | None, 

304) -> None: 

305 """ 

306 A pytest-friendly implementation of assertCountEqual. 

307 

308 Assert that `a` and `b` have the same elements, but maybe in different order. 

309 This only works for hashable elements. 

310 """ 

311 assert a is not None 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

312 assert b is not None 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

313 assert collections.Counter(list(a)) == collections.Counter(list(b)) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

314 

315 

316def get_coverage_warnings(warns: Iterable[warnings.WarningMessage]) -> list[str]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

317 """Extract the text of CoverageWarnings.""" 

318 warns = [w for w in warns if issubclass(w.category, CoverageWarning)] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

319 texts = [cast(Warning, w.message).args[0] for w in warns] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

320 return texts 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

321 

322 

323def assert_coverage_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

324 warns: Iterable[warnings.WarningMessage], 

325 *msgs: str | re.Pattern[str], 

326) -> None: 

327 """ 

328 Assert that the CoverageWarning's in `warns` have `msgs` as messages. 

329 

330 Each msg can be a string compared for equality, or a compiled regex used to 

331 search the text. 

332 """ 

333 actuals = get_coverage_warnings(warns) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

334 assert msgs # don't call this without some messages. 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

335 assert len(msgs) == len(actuals) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

336 for actual, expected in zip(actuals, msgs): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

337 if hasattr(expected, "search"): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

338 assert expected.search(actual), f"{actual!r} didn't match {expected!r}" 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UVWX'YZ019234

339 else: 

340 actual = actual.partition("; see ")[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

341 assert actual == expected 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

342 

343 

344@contextlib.contextmanager 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

345def swallow_warnings( 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

346 message: str = r".", 

347 category: type[Warning] = CoverageWarning, 

348) -> Iterator[None]: 

349 """Swallow particular warnings. 

350 

351 It's OK if they happen, or if they don't happen. Just ignore them. 

352 """ 

353 with warnings.catch_warnings(): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

354 warnings.filterwarnings("ignore", category=category, message=message) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

355 yield 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

356 

357 

358class FailingProxy: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

359 """A proxy for another object, but one method will fail a few times before working.""" 

360 

361 def __init__(self, obj: Any, methname: str, fails: list[Exception]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

362 """Create the failing proxy. 

363 

364 `obj` is the object to proxy. `methname` is the method that will fail 

365 a few times. `fails` are the exceptions to fail with. Once used up, 

366 the method will proxy correctly. 

367 

368 """ 

369 self.obj = obj 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

370 self.methname = methname 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

371 self.fails = fails 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

372 

373 def __getattr__(self, name: str) -> Any: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

374 if name == self.methname and self.fails: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

375 meth = self._make_failing_method(self.fails[0]) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

376 del self.fails[0] 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

377 else: 

378 meth = getattr(self.obj, name) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

379 return meth 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

380 

381 def _make_failing_method(self, exc: Exception) -> Callable[..., NoReturn]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

382 """Return a function that will raise `exc`.""" 

383 

384 def _meth(*args: Any, **kwargs: Any) -> NoReturn: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

385 raise exc 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

386 

387 return _meth 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

388 

389 

390class DebugControlString(DebugControl): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

391 """A `DebugControl` that writes to a StringIO, for testing.""" 

392 

393 def __init__(self, options: Iterable[str]) -> None: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

394 self.io = io.StringIO() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

395 super().__init__(options, self.io) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

396 

397 def get_output(self) -> str: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

398 """Get the output text from the `DebugControl`.""" 

399 return self.io.getvalue() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

400 

401 

402def all_our_source_files() -> Iterable[tuple[Path, str]]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

403 """Iterate over all of our own source files. 

404 

405 This is used in tests that need a bunch of Python code to analyze, so we 

406 might as well use our own source code as the subject. 

407 

408 Produces a stream of (filename, file contents) tuples. 

409 """ 

410 cov_dir = Path(__file__).parent.parent 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

411 # To run against all the files in the tox venvs: 

412 # for source_file in cov_dir.rglob("*.py"): 

413 for sub in [".", "ci", "coverage", "lab", "tests"]: 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

414 assert (cov_dir / sub).is_dir() 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

415 for source_file in (cov_dir / sub).glob("*.py"): 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234

416 yield (source_file, source_file.read_text(encoding="utf-8")) 1abcdefghijklmnopqrstuvwxyzABCDEF!GH#IJ6KL7MN5OP$QR%ST8UV(WX'YZ)019234