Coverage for coverage / sysmon.py: 24.232%
223 statements
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt
4"""Callback functions and support for sys.monitoring data collection."""
6from __future__ import annotations 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
8import collections 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
9import functools 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
10import inspect 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
11import os 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
12import os.path 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
13import sys 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
14import threading 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
15import traceback 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
16from dataclasses import dataclass 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
17from types import CodeType 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
18from typing import Any, Callable, NewType, Optional, cast 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
20from coverage import env 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
21from coverage.bytecode import TBranchTrails, always_jumps, branch_trails 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
22from coverage.debug import short_filename, short_stack 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
23from coverage.exceptions import NoSource, NotPython 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
24from coverage.misc import isolate_module 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
25from coverage.parser import PythonParser 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
26from coverage.types import ( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
27 AnyCallable,
28 TFileDisposition,
29 TLineNo,
30 TOffset,
31 Tracer,
32 TShouldStartContextFn,
33 TShouldTraceFn,
34 TTraceData,
35 TTraceFileData,
36 TWarnFn,
37)
39# Only needed for some of the commented-out logging:
40# from coverage.debug import ppformat
42os = isolate_module(os) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
44# pylint: disable=unused-argument
46# $set_env.py: COVERAGE_SYSMON_LOG - Log sys.monitoring activity
47LOG = bool(int(os.getenv("COVERAGE_SYSMON_LOG", 0))) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
49# $set_env.py: COVERAGE_SYSMON_STATS - Collect sys.monitoring stats
50COLLECT_STATS = bool(int(os.getenv("COVERAGE_SYSMON_STATS", 0))) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
52# This module will be imported in all versions of Python, but only used in 3.12+
53# It will be type-checked for 3.12, but not for earlier versions.
54sys_monitoring = getattr(sys, "monitoring", None) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
56DISABLE_TYPE = NewType("DISABLE_TYPE", object) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
57MonitorReturn = Optional[DISABLE_TYPE] 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
58DISABLE = cast(MonitorReturn, getattr(sys_monitoring, "DISABLE", None)) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
61if LOG: # pragma: debugging 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
63 class LoggingWrapper:
64 """Wrap a namespace to log all its functions."""
66 def __init__(self, wrapped: Any, namespace: str) -> None:
67 self.wrapped = wrapped
68 self.namespace = namespace
70 def __getattr__(self, name: str) -> Callable[..., Any]:
71 def _wrapped(*args: Any, **kwargs: Any) -> Any:
72 log(f"{self.namespace}.{name}{args}{kwargs}")
73 return getattr(self.wrapped, name)(*args, **kwargs)
75 return _wrapped
77 sys_monitoring = LoggingWrapper(sys_monitoring, "sys.monitoring")
78 assert sys_monitoring is not None 1KLM
80 short_stack = functools.partial(
81 short_stack,
82 full=True,
83 short_filenames=True,
84 frame_ids=True,
85 )
86 seen_threads: set[int] = set()
88 def log(msg: str) -> None:
89 """Write a message to our detailed debugging log(s)."""
90 # Thread ids are reused across processes?
91 # Make a shorter number more likely to be unique.
92 pid = os.getpid()
93 tid = cast(int, threading.current_thread().ident)
94 tslug = f"{(pid * tid) % 9_999_991:07d}"
95 if tid not in seen_threads:
96 seen_threads.add(tid)
97 log(f"New thread {tid} {tslug}:\n{short_stack()}")
98 # log_seq = int(os.getenv("PANSEQ", "0"))
99 # root = f"/tmp/pan.{log_seq:03d}"
100 for filename in [
101 "/tmp/foo.out",
102 # f"{root}.out",
103 # f"{root}-{pid}.out",
104 # f"{root}-{pid}-{tslug}.out",
105 ]:
106 with open(filename, "a", encoding="utf-8") as f:
107 try:
108 print(f"{pid}:{tslug}: {msg}", file=f, flush=True)
109 except UnicodeError:
110 print(f"{pid}:{tslug}: {ascii(msg)}", file=f, flush=True)
112 def arg_repr(arg: Any) -> str:
113 """Make a customized repr for logged values."""
114 if isinstance(arg, CodeType):
115 return (
116 f"<code @{id(arg):#x}"
117 + f" name={arg.co_name},"
118 + f" file={short_filename(arg.co_filename)!r}#{arg.co_firstlineno}>"
119 )
120 return repr(arg)
122 def panopticon(*names: str | None) -> AnyCallable:
123 """Decorate a function to log its calls."""
125 def _decorator(method: AnyCallable) -> AnyCallable:
126 @functools.wraps(method)
127 def _wrapped(self: Any, *args: Any) -> Any:
128 try:
129 # log(f"{method.__name__}() stack:\n{short_stack()}")
130 args_reprs = []
131 for name, arg in zip(names, args):
132 if name is None:
133 continue
134 args_reprs.append(f"{name}={arg_repr(arg)}")
135 log(f"{id(self):#x}:{method.__name__}({', '.join(args_reprs)})")
136 ret = method(self, *args)
137 # log(f" end {id(self):#x}:{method.__name__}({', '.join(args_reprs)})")
138 return ret
139 except Exception as exc:
140 log(f"!!{exc.__class__.__name__}: {exc}")
141 if 1:
142 log("".join(traceback.format_exception(exc)))
143 try:
144 assert sys_monitoring is not None
145 sys_monitoring.set_events(sys.monitoring.COVERAGE_ID, 0)
146 except ValueError:
147 # We might have already shut off monitoring.
148 log("oops, shutting off events with disabled tool id")
149 raise
151 return _wrapped
153 return _decorator
155else:
157 def log(msg: str) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
158 """Write a message to our detailed debugging log(s), but not really."""
160 def panopticon(*names: str | None) -> AnyCallable: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
161 """Decorate a function to log its calls, but not really."""
163 def _decorator(meth: AnyCallable) -> AnyCallable: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
164 return meth 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
166 return _decorator 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
169@dataclass 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
170class CodeInfo: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
171 """The information we want about each code object."""
173 tracing: bool 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
174 file_data: TTraceFileData | None 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
175 byte_to_line: dict[TOffset, TLineNo] | None 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
177 # Keys are start instruction offsets for branches.
178 # Values are dicts:
179 # {
180 # (from_line, to_line): {offset, offset, ...},
181 # (from_line, to_line): {offset, offset, ...},
182 # }
183 branch_trails: TBranchTrails 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
185 # Always-jumps are bytecode offsets that do no work but move
186 # to another offset.
187 always_jumps: dict[TOffset, TOffset] 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
190def bytes_to_lines(code: CodeType) -> dict[TOffset, TLineNo]: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
191 """Make a dict mapping byte code offsets to line numbers."""
192 b2l = {}
193 for bstart, bend, lineno in code.co_lines():
194 if lineno is not None:
195 for boffset in range(bstart, bend, 2):
196 b2l[boffset] = lineno
197 return b2l
200class SysMonitor(Tracer): 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
201 """Python implementation of the raw data tracer for PEP669 implementations."""
203 # One of these will be used across threads. Be careful.
205 def __init__(self, tool_id: int) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
206 # Attributes set from the collector:
207 self.data: TTraceData
208 self.trace_arcs = False
209 self.should_trace: TShouldTraceFn
210 self.should_trace_cache: dict[str, TFileDisposition | None]
211 # TODO: should_start_context and switch_context are unused!
212 # Change tests/testenv.py:DYN_CONTEXTS when this is updated.
213 self.should_start_context: TShouldStartContextFn | None = None
214 self.switch_context: Callable[[str | None], None] | None = None
215 self.lock_data: Callable[[], None]
216 self.unlock_data: Callable[[], None]
217 # TODO: warn is unused.
218 self.warn: TWarnFn
220 self.myid = tool_id
222 # Map id(code_object) -> CodeInfo
223 self.code_infos: dict[int, CodeInfo] = {}
224 # A list of code_objects, just to keep them alive so that id's are
225 # useful as identity.
226 self.code_objects: list[CodeType] = []
228 # Map filename:__name__ -> set(id(code_object))
229 self.filename_code_ids: dict[str, set[int]] = collections.defaultdict(set)
231 self.sysmon_on = False
232 self.lock = threading.Lock()
234 self.stats: dict[str, int] | None = None
235 if COLLECT_STATS:
236 self.stats = dict.fromkeys(
237 "starts start_tracing returns line_lines line_arcs branches branch_trails".split(),
238 0,
239 )
241 self._activity = False
243 def __repr__(self) -> str: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
244 points = sum(len(v) for v in self.data.values())
245 files = len(self.data)
246 return f"<SysMonitor at {id(self):#x}: {points} data points in {files} files>"
248 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
249 def start(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
250 """Start this Tracer."""
251 with self.lock:
252 assert sys_monitoring is not None
253 sys_monitoring.use_tool_id(self.myid, "coverage.py")
254 register = functools.partial(sys_monitoring.register_callback, self.myid)
255 events = sys.monitoring.events
257 sys_monitoring.set_events(self.myid, events.PY_START)
258 register(events.PY_START, self.sysmon_py_start)
259 if self.trace_arcs:
260 register(events.PY_RETURN, self.sysmon_py_return)
261 register(events.LINE, self.sysmon_line_arcs)
262 if env.PYBEHAVIOR.branch_right_left:
263 register(
264 events.BRANCH_RIGHT, # type:ignore[attr-defined]
265 self.sysmon_branch_either,
266 )
267 register(
268 events.BRANCH_LEFT,
269 self.sysmon_branch_either,
270 )
271 else:
272 register(events.LINE, self.sysmon_line_lines)
273 sys_monitoring.restart_events()
274 self.sysmon_on = True
276 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
277 def stop(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
278 """Stop this Tracer."""
279 with self.lock: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
280 if not self.sysmon_on: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was never true1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
281 # In forking situations, we might try to stop when we are not
282 # started. Do nothing in that case.
283 return
284 assert sys_monitoring is not None 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
285 sys_monitoring.set_events(self.myid, 0) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
286 self.sysmon_on = False 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
287 sys_monitoring.free_tool_id(self.myid) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
289 if LOG: # pragma: debugging
290 items = sorted(
291 self.filename_code_ids.items(),
292 key=lambda item: len(item[1]),
293 reverse=True,
294 )
295 code_objs = sum(len(code_ids) for _, code_ids in items)
296 dupes = code_objs - len(items)
297 if dupes:
298 log(f"==== Duplicate code objects: {dupes} duplicates, {code_objs} total")
299 for filename, code_ids in items:
300 if len(code_ids) > 1:
301 log(f"{len(code_ids):>5} objects: {filename}")
302 else:
303 log("==== Duplicate code objects: none")
305 @panopticon() 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
306 def post_fork(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
307 """The process has forked, clean up as needed."""
308 self.stop()
310 def activity(self) -> bool: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
311 """Has there been any activity?"""
312 return self._activity 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
314 def reset_activity(self) -> None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
315 """Reset the activity() flag."""
316 self._activity = False 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
318 def get_stats(self) -> dict[str, int] | None: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
319 """Return a dictionary of statistics, or None."""
320 return self.stats
322 @panopticon("code", "@") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
323 def sysmon_py_start(self, code: CodeType, instruction_offset: TOffset) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
324 """Handle sys.monitoring.events.PY_START events."""
325 self._activity = True
326 if self.stats is not None:
327 self.stats["starts"] += 1
329 if code.co_name == "__annotate__":
330 # Type annotation code objects don't execute, ignore them.
331 return DISABLE
333 # Entering a new frame. Decide if we should trace in this file.
334 code_info = self.code_infos.get(id(code))
335 tracing_code: bool | None = None
336 file_data: TTraceFileData | None = None
337 if code_info is not None:
338 tracing_code = code_info.tracing
339 file_data = code_info.file_data
341 if tracing_code is None:
342 filename = code.co_filename
343 disp = self.should_trace_cache.get(filename)
344 if disp is None:
345 frame = inspect.currentframe()
346 if frame is not None:
347 frame = inspect.currentframe().f_back # type: ignore[union-attr]
348 if LOG: # pragma: debugging
349 # @panopticon adds a frame.
350 frame = frame.f_back # type: ignore[union-attr]
351 disp = self.should_trace(filename, frame) # type: ignore[arg-type]
352 self.should_trace_cache[filename] = disp
354 tracing_code = disp.trace
355 if tracing_code:
356 tracename = disp.source_filename
357 assert tracename is not None
358 self.lock_data()
359 try:
360 if tracename not in self.data:
361 self.data[tracename] = set()
362 finally:
363 self.unlock_data()
364 file_data = self.data[tracename]
365 b2l = bytes_to_lines(code)
366 else:
367 file_data = None
368 b2l = None
370 code_info = CodeInfo(
371 tracing=tracing_code,
372 file_data=file_data,
373 byte_to_line=b2l,
374 branch_trails={},
375 always_jumps={},
376 )
377 self.code_infos[id(code)] = code_info
378 self.code_objects.append(code)
380 if tracing_code:
381 if self.stats is not None:
382 self.stats["start_tracing"] += 1
383 events = sys.monitoring.events
384 with self.lock:
385 if self.sysmon_on:
386 assert sys_monitoring is not None
387 local_events = events.PY_RETURN | events.PY_RESUME | events.LINE
388 if self.trace_arcs:
389 assert env.PYBEHAVIOR.branch_right_left
390 local_events |= (
391 events.BRANCH_RIGHT # type:ignore[attr-defined]
392 | events.BRANCH_LEFT
393 )
394 sys_monitoring.set_local_events(self.myid, code, local_events)
396 if LOG: # pragma: debugging
397 if code.co_filename not in {"<string>"}:
398 self.filename_code_ids[f"{code.co_filename}:{code.co_name}"].add(
399 id(code)
400 )
402 return DISABLE
404 @panopticon("code", "@", None) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
405 def sysmon_py_return( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
406 self,
407 code: CodeType,
408 instruction_offset: TOffset,
409 retval: object,
410 ) -> MonitorReturn:
411 """Handle sys.monitoring.events.PY_RETURN events for branch coverage."""
412 if self.stats is not None:
413 self.stats["returns"] += 1
414 code_info = self.code_infos.get(id(code))
415 # code_info is not None and code_info.file_data is not None, since we
416 # wouldn't have enabled this event if they were.
417 last_line = code_info.byte_to_line.get(instruction_offset) # type: ignore
418 if last_line is not None:
419 arc = (last_line, -code.co_firstlineno)
420 code_info.file_data.add(arc) # type: ignore
421 # log(f"adding {arc=}")
422 return DISABLE
424 @panopticon("code", "line") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
425 def sysmon_line_lines(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
426 """Handle sys.monitoring.events.LINE events for line coverage."""
427 if self.stats is not None:
428 self.stats["line_lines"] += 1
429 code_info = self.code_infos.get(id(code))
430 # It should be true that code_info is not None and code_info.file_data
431 # is not None, since we wouldn't have enabled this event if they were.
432 # But somehow code_info can be None here, so we have to check.
433 if code_info is not None and code_info.file_data is not None:
434 code_info.file_data.add(line_number) # type: ignore
435 # log(f"adding {line_number=}")
436 return DISABLE
438 @panopticon("code", "line") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
439 def sysmon_line_arcs(self, code: CodeType, line_number: TLineNo) -> MonitorReturn: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
440 """Handle sys.monitoring.events.LINE events for branch coverage."""
441 if self.stats is not None:
442 self.stats["line_arcs"] += 1
443 code_info = self.code_infos[id(code)]
444 # code_info is not None and code_info.file_data is not None, since we
445 # wouldn't have enabled this event if they were.
446 arc = (line_number, line_number)
447 code_info.file_data.add(arc) # type: ignore
448 # log(f"adding {arc=}")
449 return DISABLE
451 @panopticon("code", "@", "@") 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
452 def sysmon_branch_either( 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
453 self, code: CodeType, instruction_offset: TOffset, destination_offset: TOffset
454 ) -> MonitorReturn:
455 """Handle BRANCH_RIGHT and BRANCH_LEFT events."""
456 if self.stats is not None:
457 self.stats["branches"] += 1
458 code_info = self.code_infos[id(code)]
459 # code_info is not None and code_info.file_data is not None, since we
460 # wouldn't have enabled this event if they were.
461 if not code_info.branch_trails:
462 if self.stats is not None:
463 self.stats["branch_trails"] += 1
464 multiline_map = get_multiline_map(code.co_filename)
465 code_info.branch_trails = branch_trails(code, multiline_map=multiline_map)
466 code_info.always_jumps = always_jumps(code)
467 # log(f"branch_trails for {code}:\n{ppformat(code_info.branch_trails)}")
468 added_arc = False
469 dest_info = code_info.branch_trails.get(instruction_offset)
471 # Re-map the destination offset through always-jumps to deal with NOP etc.
472 dests = {destination_offset}
473 while (dest := code_info.always_jumps.get(destination_offset)) is not None:
474 destination_offset = dest
475 dests.add(destination_offset)
477 # log(f"dest_info = {ppformat(dest_info)}")
478 if dest_info is not None:
479 for arc, offsets in dest_info.items():
480 if arc is None:
481 continue
482 if dests & offsets:
483 code_info.file_data.add(arc) # type: ignore
484 # log(f"adding {arc=}")
485 added_arc = True
486 break
488 if not added_arc:
489 # This could be an exception jumping from line to line.
490 assert code_info.byte_to_line is not None
491 l1 = code_info.byte_to_line.get(instruction_offset)
492 if l1 is not None:
493 l2 = code_info.byte_to_line.get(destination_offset)
494 if l2 is not None and l1 != l2:
495 arc = (l1, l2)
496 code_info.file_data.add(arc) # type: ignore
497 # log(f"adding unforeseen {arc=}")
499 return DISABLE
502@functools.lru_cache(maxsize=20) 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
503def get_multiline_map(filename: str) -> dict[TLineNo, TLineNo]: 1NOPQRSTUVWXYZ0123456789!#$%'()abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM
504 """Get a PythonParser for the given filename, cached."""
505 try:
506 parser = PythonParser(filename=filename)
507 parser.parse_source()
508 except NotPython:
509 # The file was not Python. This can happen when the code object refers
510 # to an original non-Python source file, like a Jinja template.
511 # In that case, just return an empty map, which might lead to slightly
512 # wrong branch coverage, but we don't have any better option.
513 return {}
514 except NoSource:
515 # This can happen if open() in python.py fails.
516 return {}
517 return parser.multiline_map