Coverage for coverage / sysmon.py: 24.232%

223 statements  

« prev     ^ index     » next       coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000

1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 

2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt 

3 

4"""Callback functions and support for sys.monitoring data collection.""" 

5 

6from __future__ import annotations 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

7 

8import collections 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

9import functools 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

10import inspect 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

11import os 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

12import os.path 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

13import sys 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

14import threading 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

15import traceback 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

16from dataclasses import dataclass 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

17from types import CodeType 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

18from typing import Any, Callable, NewType, Optional, cast 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

19 

20from coverage import env 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

21from coverage.bytecode import TBranchTrails, always_jumps, branch_trails 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

22from coverage.debug import short_filename, short_stack 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

23from coverage.exceptions import NoSource, NotPython 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

24from coverage.misc import isolate_module 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

25from coverage.parser import PythonParser 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

26from coverage.types import ( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

27 AnyCallable, 

28 TFileDisposition, 

29 TLineNo, 

30 TOffset, 

31 Tracer, 

32 TShouldStartContextFn, 

33 TShouldTraceFn, 

34 TTraceData, 

35 TTraceFileData, 

36 TWarnFn, 

37) 

38 

39# Only needed for some of the commented-out logging: 

40# from coverage.debug import ppformat 

41 

42os = isolate_module(os) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

43 

44# pylint: disable=unused-argument 

45 

46# $set_env.py: COVERAGE_SYSMON_LOG - Log sys.monitoring activity 

47LOG = bool(int(os.getenv("COVERAGE_SYSMON_LOG", 0))) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

48 

49# $set_env.py: COVERAGE_SYSMON_STATS - Collect sys.monitoring stats 

50COLLECT_STATS = bool(int(os.getenv("COVERAGE_SYSMON_STATS", 0))) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

51 

52# This module will be imported in all versions of Python, but only used in 3.12+ 

53# It will be type-checked for 3.12, but not for earlier versions. 

54sys_monitoring = getattr(sys, "monitoring", None) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

55 

56DISABLE_TYPE = NewType("DISABLE_TYPE", object) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

57MonitorReturn = Optional[DISABLE_TYPE] 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

58DISABLE = cast(MonitorReturn, getattr(sys_monitoring, "DISABLE", None)) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

59 

60 

61if LOG: # pragma: debugging 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

62 

63 class LoggingWrapper: 

64 """Wrap a namespace to log all its functions.""" 

65 

66 def __init__(self, wrapped: Any, namespace: str) -> None: 

67 self.wrapped = wrapped 

68 self.namespace = namespace 

69 

70 def __getattr__(self, name: str) -> Callable[..., Any]: 

71 def _wrapped(*args: Any, **kwargs: Any) -> Any: 

72 log(f"{self.namespace}.{name}{args}{kwargs}") 

73 return getattr(self.wrapped, name)(*args, **kwargs) 

74 

75 return _wrapped 

76 

77 sys_monitoring = LoggingWrapper(sys_monitoring, "sys.monitoring") 

78 assert sys_monitoring is not None 1KLM

79 

80 short_stack = functools.partial( 

81 short_stack, 

82 full=True, 

83 short_filenames=True, 

84 frame_ids=True, 

85 ) 

86 seen_threads: set[int] = set() 

87 

88 def log(msg: str) -> None: 

89 """Write a message to our detailed debugging log(s).""" 

90 # Thread ids are reused across processes? 

91 # Make a shorter number more likely to be unique. 

92 pid = os.getpid() 

93 tid = cast(int, threading.current_thread().ident) 

94 tslug = f"{(pid * tid) % 9_999_991:07d}" 

95 if tid not in seen_threads: 

96 seen_threads.add(tid) 

97 log(f"New thread {tid} {tslug}:\n{short_stack()}") 

98 # log_seq = int(os.getenv("PANSEQ", "0")) 

99 # root = f"/tmp/pan.{log_seq:03d}" 

100 for filename in [ 

101 "/tmp/foo.out", 

102 # f"{root}.out", 

103 # f"{root}-{pid}.out", 

104 # f"{root}-{pid}-{tslug}.out", 

105 ]: 

106 with open(filename, "a", encoding="utf-8") as f: 

107 try: 

108 print(f"{pid}:{tslug}: {msg}", file=f, flush=True) 

109 except UnicodeError: 

110 print(f"{pid}:{tslug}: {ascii(msg)}", file=f, flush=True) 

111 

112 def arg_repr(arg: Any) -> str: 

113 """Make a customized repr for logged values.""" 

114 if isinstance(arg, CodeType): 

115 return ( 

116 f"<code @{id(arg):#x}" 

117 + f" name={arg.co_name}," 

118 + f" file={short_filename(arg.co_filename)!r}#{arg.co_firstlineno}>" 

119 ) 

120 return repr(arg) 

121 

122 def panopticon(*names: str | None) -> AnyCallable: 

123 """Decorate a function to log its calls.""" 

124 

125 def _decorator(method: AnyCallable) -> AnyCallable: 

126 @functools.wraps(method) 

127 def _wrapped(self: Any, *args: Any) -> Any: 

128 try: 

129 # log(f"{method.__name__}() stack:\n{short_stack()}") 

130 args_reprs = [] 

131 for name, arg in zip(names, args): 

132 if name is None: 

133 continue 

134 args_reprs.append(f"{name}={arg_repr(arg)}") 

135 log(f"{id(self):#x}:{method.__name__}({', '.join(args_reprs)})") 

136 ret = method(self, *args) 

137 # log(f" end {id(self):#x}:{method.__name__}({', '.join(args_reprs)})") 

138 return ret 

139 except Exception as exc: 

140 log(f"!!{exc.__class__.__name__}: {exc}") 

141 if 1: 

142 log("".join(traceback.format_exception(exc))) 

143 try: 

144 assert sys_monitoring is not None 

145 sys_monitoring.set_events(sys.monitoring.COVERAGE_ID, 0) 

146 except ValueError: 

147 # We might have already shut off monitoring. 

148 log("oops, shutting off events with disabled tool id") 

149 raise 

150 

151 return _wrapped 

152 

153 return _decorator 

154 

155else: 

156 

157 def log(msg: str) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

158 """Write a message to our detailed debugging log(s), but not really.""" 

159 

160 def panopticon(*names: str | None) -> AnyCallable: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

161 """Decorate a function to log its calls, but not really.""" 

162 

163 def _decorator(meth: AnyCallable) -> AnyCallable: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

164 return meth 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

165 

166 return _decorator 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

167 

168 

169@dataclass 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

170class CodeInfo: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

171 """The information we want about each code object.""" 

172 

173 tracing: bool 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

174 file_data: TTraceFileData | None 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

175 byte_to_line: dict[TOffset, TLineNo] | None 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

176 

177 # Keys are start instruction offsets for branches. 

178 # Values are dicts: 

179 # { 

180 # (from_line, to_line): {offset, offset, ...}, 

181 # (from_line, to_line): {offset, offset, ...}, 

182 # } 

183 branch_trails: TBranchTrails 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

184 

185 # Always-jumps are bytecode offsets that do no work but move 

186 # to another offset. 

187 always_jumps: dict[TOffset, TOffset] 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

188 

189 

190def bytes_to_lines(code: CodeType) -> dict[TOffset, TLineNo]: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

191 """Make a dict mapping byte code offsets to line numbers.""" 

192 b2l = {} 

193 for bstart, bend, lineno in code.co_lines(): 

194 if lineno is not None: 

195 for boffset in range(bstart, bend, 2): 

196 b2l[boffset] = lineno 

197 return b2l 

198 

199 

200class SysMonitor(Tracer): 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

201 """Python implementation of the raw data tracer for PEP669 implementations.""" 

202 

203 # One of these will be used across threads. Be careful. 

204 

205 def __init__(self, tool_id: int) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

206 # Attributes set from the collector: 

207 self.data: TTraceData 

208 self.trace_arcs = False 

209 self.should_trace: TShouldTraceFn 

210 self.should_trace_cache: dict[str, TFileDisposition | None] 

211 # TODO: should_start_context and switch_context are unused! 

212 # Change tests/testenv.py:DYN_CONTEXTS when this is updated. 

213 self.should_start_context: TShouldStartContextFn | None = None 

214 self.switch_context: Callable[[str | None], None] | None = None 

215 self.lock_data: Callable[[], None] 

216 self.unlock_data: Callable[[], None] 

217 # TODO: warn is unused. 

218 self.warn: TWarnFn 

219 

220 self.myid = tool_id 

221 

222 # Map id(code_object) -> CodeInfo 

223 self.code_infos: dict[int, CodeInfo] = {} 

224 # A list of code_objects, just to keep them alive so that id's are 

225 # useful as identity. 

226 self.code_objects: list[CodeType] = [] 

227 

228 # Map filename:__name__ -> set(id(code_object)) 

229 self.filename_code_ids: dict[str, set[int]] = collections.defaultdict(set) 

230 

231 self.sysmon_on = False 

232 self.lock = threading.Lock() 

233 

234 self.stats: dict[str, int] | None = None 

235 if COLLECT_STATS: 

236 self.stats = dict.fromkeys( 

237 "starts start_tracing returns line_lines line_arcs branches branch_trails".split(), 

238 0, 

239 ) 

240 

241 self._activity = False 

242 

243 def __repr__(self) -> str: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

244 points = sum(len(v) for v in self.data.values()) 

245 files = len(self.data) 

246 return f"<SysMonitor at {id(self):#x}: {points} data points in {files} files>" 

247 

248 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

249 def start(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

250 """Start this Tracer.""" 

251 with self.lock: 

252 assert sys_monitoring is not None 

253 sys_monitoring.use_tool_id(self.myid, "coverage.py") 

254 register = functools.partial(sys_monitoring.register_callback, self.myid) 

255 events = sys.monitoring.events 

256 

257 sys_monitoring.set_events(self.myid, events.PY_START) 

258 register(events.PY_START, self.sysmon_py_start) 

259 if self.trace_arcs: 

260 register(events.PY_RETURN, self.sysmon_py_return) 

261 register(events.LINE, self.sysmon_line_arcs) 

262 if env.PYBEHAVIOR.branch_right_left: 

263 register( 

264 events.BRANCH_RIGHT, # type:ignore[attr-defined] 

265 self.sysmon_branch_either, 

266 ) 

267 register( 

268 events.BRANCH_LEFT, 

269 self.sysmon_branch_either, 

270 ) 

271 else: 

272 register(events.LINE, self.sysmon_line_lines) 

273 sys_monitoring.restart_events() 

274 self.sysmon_on = True 

275 

276 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

277 def stop(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

278 """Stop this Tracer.""" 

279 with self.lock: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

280 if not self.sysmon_on: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was never true1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

281 # In forking situations, we might try to stop when we are not 

282 # started. Do nothing in that case. 

283 return 

284 assert sys_monitoring is not None 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

285 sys_monitoring.set_events(self.myid, 0) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

286 self.sysmon_on = False 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

287 sys_monitoring.free_tool_id(self.myid) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

288 

289 if LOG: # pragma: debugging 

290 items = sorted( 

291 self.filename_code_ids.items(), 

292 key=lambda item: len(item[1]), 

293 reverse=True, 

294 ) 

295 code_objs = sum(len(code_ids) for _, code_ids in items) 

296 dupes = code_objs - len(items) 

297 if dupes: 

298 log(f"==== Duplicate code objects: {dupes} duplicates, {code_objs} total") 

299 for filename, code_ids in items: 

300 if len(code_ids) > 1: 

301 log(f"{len(code_ids):>5} objects: {filename}") 

302 else: 

303 log("==== Duplicate code objects: none") 

304 

305 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

306 def post_fork(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

307 """The process has forked, clean up as needed.""" 

308 self.stop() 

309 

310 def activity(self) -> bool: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

311 """Has there been any activity?""" 

312 return self._activity 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

313 

314 def reset_activity(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

315 """Reset the activity() flag.""" 

316 self._activity = False 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ

317 

318 def get_stats(self) -> dict[str, int] | None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

319 """Return a dictionary of statistics, or None.""" 

320 return self.stats 

321 

322 @panopticon("code", "@") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

323 def sysmon_py_start(self, code: CodeType, instruction_offset: TOffset) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

324 """Handle sys.monitoring.events.PY_START events.""" 

325 self._activity = True 

326 if self.stats is not None: 

327 self.stats["starts"] += 1 

328 

329 if code.co_name == "__annotate__": 

330 # Type annotation code objects don't execute, ignore them. 

331 return DISABLE 

332 

333 # Entering a new frame. Decide if we should trace in this file. 

334 code_info = self.code_infos.get(id(code)) 

335 tracing_code: bool | None = None 

336 file_data: TTraceFileData | None = None 

337 if code_info is not None: 

338 tracing_code = code_info.tracing 

339 file_data = code_info.file_data 

340 

341 if tracing_code is None: 

342 filename = code.co_filename 

343 disp = self.should_trace_cache.get(filename) 

344 if disp is None: 

345 frame = inspect.currentframe() 

346 if frame is not None: 

347 frame = inspect.currentframe().f_back # type: ignore[union-attr] 

348 if LOG: # pragma: debugging 

349 # @panopticon adds a frame. 

350 frame = frame.f_back # type: ignore[union-attr] 

351 disp = self.should_trace(filename, frame) # type: ignore[arg-type] 

352 self.should_trace_cache[filename] = disp 

353 

354 tracing_code = disp.trace 

355 if tracing_code: 

356 tracename = disp.source_filename 

357 assert tracename is not None 

358 self.lock_data() 

359 try: 

360 if tracename not in self.data: 

361 self.data[tracename] = set() 

362 finally: 

363 self.unlock_data() 

364 file_data = self.data[tracename] 

365 b2l = bytes_to_lines(code) 

366 else: 

367 file_data = None 

368 b2l = None 

369 

370 code_info = CodeInfo( 

371 tracing=tracing_code, 

372 file_data=file_data, 

373 byte_to_line=b2l, 

374 branch_trails={}, 

375 always_jumps={}, 

376 ) 

377 self.code_infos[id(code)] = code_info 

378 self.code_objects.append(code) 

379 

380 if tracing_code: 

381 if self.stats is not None: 

382 self.stats["start_tracing"] += 1 

383 events = sys.monitoring.events 

384 with self.lock: 

385 if self.sysmon_on: 

386 assert sys_monitoring is not None 

387 local_events = events.PY_RETURN | events.PY_RESUME | events.LINE 

388 if self.trace_arcs: 

389 assert env.PYBEHAVIOR.branch_right_left 

390 local_events |= ( 

391 events.BRANCH_RIGHT # type:ignore[attr-defined] 

392 | events.BRANCH_LEFT 

393 ) 

394 sys_monitoring.set_local_events(self.myid, code, local_events) 

395 

396 if LOG: # pragma: debugging 

397 if code.co_filename not in {"<string>"}: 

398 self.filename_code_ids[f"{code.co_filename}:{code.co_name}"].add( 

399 id(code) 

400 ) 

401 

402 return DISABLE 

403 

404 @panopticon("code", "@", None) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

405 def sysmon_py_return( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

406 self, 

407 code: CodeType, 

408 instruction_offset: TOffset, 

409 retval: object, 

410 ) -> MonitorReturn: 

411 """Handle sys.monitoring.events.PY_RETURN events for branch coverage.""" 

412 if self.stats is not None: 

413 self.stats["returns"] += 1 

414 code_info = self.code_infos.get(id(code)) 

415 # code_info is not None and code_info.file_data is not None, since we 

416 # wouldn't have enabled this event if they were. 

417 last_line = code_info.byte_to_line.get(instruction_offset) # type: ignore 

418 if last_line is not None: 

419 arc = (last_line, -code.co_firstlineno) 

420 code_info.file_data.add(arc) # type: ignore 

421 # log(f"adding {arc=}") 

422 return DISABLE 

423 

424 @panopticon("code", "line") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

425 def sysmon_line_lines(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

426 """Handle sys.monitoring.events.LINE events for line coverage.""" 

427 if self.stats is not None: 

428 self.stats["line_lines"] += 1 

429 code_info = self.code_infos.get(id(code)) 

430 # It should be true that code_info is not None and code_info.file_data 

431 # is not None, since we wouldn't have enabled this event if they were. 

432 # But somehow code_info can be None here, so we have to check. 

433 if code_info is not None and code_info.file_data is not None: 

434 code_info.file_data.add(line_number) # type: ignore 

435 # log(f"adding {line_number=}") 

436 return DISABLE 

437 

438 @panopticon("code", "line") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

439 def sysmon_line_arcs(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

440 """Handle sys.monitoring.events.LINE events for branch coverage.""" 

441 if self.stats is not None: 

442 self.stats["line_arcs"] += 1 

443 code_info = self.code_infos[id(code)] 

444 # code_info is not None and code_info.file_data is not None, since we 

445 # wouldn't have enabled this event if they were. 

446 arc = (line_number, line_number) 

447 code_info.file_data.add(arc) # type: ignore 

448 # log(f"adding {arc=}") 

449 return DISABLE 

450 

451 @panopticon("code", "@", "@") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

452 def sysmon_branch_either( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

453 self, code: CodeType, instruction_offset: TOffset, destination_offset: TOffset 

454 ) -> MonitorReturn: 

455 """Handle BRANCH_RIGHT and BRANCH_LEFT events.""" 

456 if self.stats is not None: 

457 self.stats["branches"] += 1 

458 code_info = self.code_infos[id(code)] 

459 # code_info is not None and code_info.file_data is not None, since we 

460 # wouldn't have enabled this event if they were. 

461 if not code_info.branch_trails: 

462 if self.stats is not None: 

463 self.stats["branch_trails"] += 1 

464 multiline_map = get_multiline_map(code.co_filename) 

465 code_info.branch_trails = branch_trails(code, multiline_map=multiline_map) 

466 code_info.always_jumps = always_jumps(code) 

467 # log(f"branch_trails for {code}:\n{ppformat(code_info.branch_trails)}") 

468 added_arc = False 

469 dest_info = code_info.branch_trails.get(instruction_offset) 

470 

471 # Re-map the destination offset through always-jumps to deal with NOP etc. 

472 dests = {destination_offset} 

473 while (dest := code_info.always_jumps.get(destination_offset)) is not None: 

474 destination_offset = dest 

475 dests.add(destination_offset) 

476 

477 # log(f"dest_info = {ppformat(dest_info)}") 

478 if dest_info is not None: 

479 for arc, offsets in dest_info.items(): 

480 if arc is None: 

481 continue 

482 if dests & offsets: 

483 code_info.file_data.add(arc) # type: ignore 

484 # log(f"adding {arc=}") 

485 added_arc = True 

486 break 

487 

488 if not added_arc: 

489 # This could be an exception jumping from line to line. 

490 assert code_info.byte_to_line is not None 

491 l1 = code_info.byte_to_line.get(instruction_offset) 

492 if l1 is not None: 

493 l2 = code_info.byte_to_line.get(destination_offset) 

494 if l2 is not None and l1 != l2: 

495 arc = (l1, l2) 

496 code_info.file_data.add(arc) # type: ignore 

497 # log(f"adding unforeseen {arc=}") 

498 

499 return DISABLE 

500 

501 

502@functools.lru_cache(maxsize=20) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

503def get_multiline_map(filename: str) -> dict[TLineNo, TLineNo]: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM

504 """Get a PythonParser for the given filename, cached.""" 

505 try: 

506 parser = PythonParser(filename=filename) 

507 parser.parse_source() 

508 except NotPython: 

509 # The file was not Python. This can happen when the code object refers 

510 # to an original non-Python source file, like a Jinja template. 

511 # In that case, just return an empty map, which might lead to slightly 

512 # wrong branch coverage, but we don't have any better option. 

513 return {} 

514 except NoSource: 

515 # This can happen if open() in python.py fails. 

516 return {} 

517 return parser.multiline_map