Coverage for coverage / pytracer.py: 14.783%
164 statements
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt
4"""Raw data collector for coverage.py."""
6from __future__ import annotations 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
8import atexit 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
9import dis 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
10import itertools 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
11import sys 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
12import threading 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
13from types import FrameType, ModuleType 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
14from typing import Any, Callable, cast 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
16from coverage import env 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
17from coverage.types import ( 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
18 TArc,
19 TFileDisposition,
20 TLineNo,
21 Tracer,
22 TShouldStartContextFn,
23 TShouldTraceFn,
24 TTraceData,
25 TTraceFileData,
26 TTraceFn,
27 TWarnFn,
28)
30# I don't understand why, but if we use `cast(set[TLineNo], ...)` inside
31# the _trace() function, we get some strange behavior on PyPy 3.10.
32# Assigning these names here and using them below fixes the problem.
33# See https://github.com/coveragepy/coveragepy/issues/1902
34set_TLineNo = set[TLineNo] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
35set_TArc = set[TArc] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
38# We need the YIELD_VALUE opcode below, in a comparison-friendly form.
39# PYVERSIONS: RESUME is new in Python3.11
40RESUME = dis.opmap.get("RESUME") 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
41RETURN_VALUE = dis.opmap["RETURN_VALUE"] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
42if RESUME is None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
43 YIELD_VALUE = dis.opmap["YIELD_VALUE"] 1abc
44 YIELD_FROM = dis.opmap["YIELD_FROM"] 1abc
45 YIELD_FROM_OFFSET = 0 if env.PYPY else 2 1abc
46else:
47 YIELD_VALUE = YIELD_FROM = YIELD_FROM_OFFSET = -1 1pqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
49# When running meta-coverage, this file can try to trace itself, which confuses
50# everything. Don't trace ourselves.
52THIS_FILE = __file__.rstrip("co") 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
55class PyTracer(Tracer): 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
56 """Python implementation of the raw data tracer."""
58 # Because of poor implementations of trace-function-manipulating tools,
59 # the Python trace function must be kept very simple. In particular, there
60 # must be only one function ever set as the trace function, both through
61 # sys.settrace, and as the return value from the trace function. Put
62 # another way, the trace function must always return itself. It cannot
63 # swap in other functions, or return None to avoid tracing a particular
64 # frame.
65 #
66 # The trace manipulator that introduced this restriction is DecoratorTools,
67 # which sets a trace function, and then later restores the pre-existing one
68 # by calling sys.settrace with a function it found in the current frame.
69 #
70 # Systems that use DecoratorTools (or similar trace manipulations) must use
71 # PyTracer to get accurate results. The command-line --timid argument is
72 # used to force the use of this tracer.
74 tracer_ids = itertools.count() 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
76 def __init__(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
77 # Which tracer are we?
78 self.id = next(self.tracer_ids)
80 # Attributes set from the collector:
81 self.data: TTraceData
82 self.trace_arcs = False
83 self.should_trace: TShouldTraceFn
84 self.should_trace_cache: dict[str, TFileDisposition | None]
85 self.should_start_context: TShouldStartContextFn | None = None
86 self.switch_context: Callable[[str | None], None] | None = None
87 self.lock_data: Callable[[], None]
88 self.unlock_data: Callable[[], None]
89 self.warn: TWarnFn
91 # The threading module to use, if any.
92 self.threading: ModuleType | None = None
94 self.cur_file_data: TTraceFileData | None = None
95 self.last_line: TLineNo = 0
96 self.cur_file_name: str | None = None
97 self.context: str | None = None
98 self.started_context = False
100 # The data_stack parallels the Python call stack. Each entry is
101 # information about an active frame, a four-element tuple:
102 # [0] The TTraceData for this frame's file. Could be None if we
103 # aren't tracing this frame.
104 # [1] The current file name for the frame. None if we aren't tracing
105 # this frame.
106 # [2] The last line number executed in this frame.
107 # [3] Boolean: did this frame start a new context?
108 self.data_stack: list[tuple[TTraceFileData | None, str | None, TLineNo, bool]] = []
109 self.thread: threading.Thread | None = None
110 self.stopped = False
111 self._activity = False
113 self.in_atexit = False
114 # On exit, self.in_atexit = True
115 atexit.register(setattr, self, "in_atexit", True)
117 # Cache a bound method on the instance, so that we don't have to
118 # re-create a bound method object all the time.
119 self._cached_bound_method_trace: TTraceFn = self._trace
121 def __repr__(self) -> str: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
122 points = sum(len(v) for v in self.data.values())
123 files = len(self.data)
124 return f"<PyTracer at {id(self):#x}: {points} data points in {files} files>"
126 def log(self, marker: str, *args: Any) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
127 """For hard-core logging of what this tracer is doing."""
128 with open("/tmp/debug_trace.txt", "a", encoding="utf-8") as f:
129 f.write(f"{marker} {self.id}[{len(self.data_stack)}]")
130 if 0: # if you want thread ids..
131 f.write( # type: ignore[unreachable]
132 ".{:x}.{:x}".format(
133 self.thread.ident,
134 self.threading.current_thread().ident,
135 )
136 )
137 f.write(" {}".format(" ".join(map(str, args))))
138 if 0: # if you want callers..
139 f.write(" | ") # type: ignore[unreachable]
140 stack = " / ".join(
141 (fname or "???").rpartition("/")[-1] for _, fname, _, _ in self.data_stack
142 )
143 f.write(stack)
144 f.write("\n")
146 def _trace( 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
147 self,
148 frame: FrameType,
149 event: str,
150 arg: Any, # pylint: disable=unused-argument
151 lineno: TLineNo | None = None, # pylint: disable=unused-argument
152 ) -> TTraceFn | None:
153 """The trace function passed to sys.settrace."""
155 if THIS_FILE in frame.f_code.co_filename:
156 return None
158 # f = frame; code = f.f_code
159 # self.log(":", f"{code.co_filename} {f.f_lineno} {code.co_name}()", event)
161 if self.stopped and sys.gettrace() == self._cached_bound_method_trace: # pylint: disable=comparison-with-callable
162 # The PyTrace.stop() method has been called, possibly by another
163 # thread, let's deactivate ourselves now.
164 if 0:
165 f = frame # type: ignore[unreachable]
166 self.log("---\nX", f.f_code.co_filename, f.f_lineno)
167 while f:
168 self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace)
169 f = f.f_back
170 sys.settrace(None)
171 try:
172 self.cur_file_data, self.cur_file_name, self.last_line, self.started_context = (
173 self.data_stack.pop()
174 )
175 except IndexError:
176 self.log(
177 "Empty stack!",
178 frame.f_code.co_filename,
179 frame.f_lineno,
180 frame.f_code.co_name,
181 )
182 return None
184 # if event != "call" and frame.f_code.co_filename != self.cur_file_name:
185 # self.log("---\n*", frame.f_code.co_filename, self.cur_file_name, frame.f_lineno)
187 if event == "call":
188 # Should we start a new context?
189 if self.should_start_context and self.context is None:
190 context_maybe = self.should_start_context(frame) # pylint: disable=not-callable
191 if context_maybe is not None:
192 self.context = context_maybe
193 started_context = True
194 assert self.switch_context is not None
195 self.switch_context(self.context) # pylint: disable=not-callable
196 else:
197 started_context = False
198 else:
199 started_context = False
200 self.started_context = started_context
202 # Entering a new frame. Decide if we should trace in this file.
203 self._activity = True
204 self.data_stack.append(
205 (
206 self.cur_file_data,
207 self.cur_file_name,
208 self.last_line,
209 started_context,
210 ),
211 )
213 # Improve tracing performance: when calling a function, both caller
214 # and callee are often within the same file. if that's the case, we
215 # don't have to re-check whether to trace the corresponding
216 # function (which is a little bit expensive since it involves
217 # dictionary lookups). This optimization is only correct if we
218 # didn't start a context.
219 filename = frame.f_code.co_filename
220 if filename != self.cur_file_name or started_context:
221 self.cur_file_name = filename
222 disp = self.should_trace_cache.get(filename)
223 if disp is None:
224 disp = self.should_trace(filename, frame)
225 self.should_trace_cache[filename] = disp
227 self.cur_file_data = None
228 if disp.trace:
229 tracename = disp.source_filename
230 assert tracename is not None
231 self.lock_data()
232 try:
233 if tracename not in self.data:
234 self.data[tracename] = set()
235 finally:
236 self.unlock_data()
237 self.cur_file_data = self.data[tracename]
238 else:
239 frame.f_trace_lines = False
240 elif not self.cur_file_data:
241 frame.f_trace_lines = False
243 # The call event is really a "start frame" event, and happens for
244 # function calls and re-entering generators. The f_lasti field is
245 # -1 for calls, and a real offset for generators. Use <0 as the
246 # line number for calls, and the real line number for generators.
247 if RESUME is not None:
248 # The current opcode is guaranteed to be RESUME. The argument
249 # determines what kind of resume it is.
250 oparg = frame.f_code.co_code[frame.f_lasti + 1]
251 real_call = (oparg == 0) # fmt: skip
252 else:
253 real_call = (getattr(frame, "f_lasti", -1) < 0) # fmt: skip
254 if real_call:
255 self.last_line = -frame.f_code.co_firstlineno
256 else:
257 self.last_line = frame.f_lineno
259 elif event == "line":
260 # Record an executed line.
261 if self.cur_file_data is not None:
262 flineno: TLineNo = frame.f_lineno
264 if self.trace_arcs:
265 cast(set_TArc, self.cur_file_data).add((self.last_line, flineno))
266 else:
267 cast(set_TLineNo, self.cur_file_data).add(flineno)
268 self.last_line = flineno
270 elif event == "return":
271 if self.trace_arcs and self.cur_file_data:
272 # Record an arc leaving the function, but beware that a
273 # "return" event might just mean yielding from a generator.
274 code = frame.f_code.co_code
275 lasti = frame.f_lasti
276 if RESUME is not None:
277 if len(code) == lasti + 2:
278 # A return from the end of a code object is a real return.
279 real_return = True
280 else:
281 # It is a real return if we aren't going to resume next.
282 if env.PYBEHAVIOR.lasti_is_yield:
283 lasti += 2
284 real_return = code[lasti] != RESUME
285 else:
286 if code[lasti] == RETURN_VALUE:
287 real_return = True
288 elif code[lasti] == YIELD_VALUE:
289 real_return = False
290 elif len(code) <= lasti + YIELD_FROM_OFFSET:
291 real_return = True
292 elif code[lasti + YIELD_FROM_OFFSET] == YIELD_FROM:
293 real_return = False
294 else:
295 real_return = True
296 if real_return:
297 first = frame.f_code.co_firstlineno
298 cast(set_TArc, self.cur_file_data).add((self.last_line, -first))
300 # Leaving this function, pop the filename stack.
301 self.cur_file_data, self.cur_file_name, self.last_line, self.started_context = (
302 self.data_stack.pop()
303 )
304 # Leaving a context?
305 if self.started_context:
306 assert self.switch_context is not None
307 self.context = None
308 self.switch_context(None) # pylint: disable=not-callable
310 return self._cached_bound_method_trace
312 def start(self) -> TTraceFn: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
313 """Start this Tracer.
315 Return a Python function suitable for use with sys.settrace().
317 """
318 self.stopped = False
319 if self.threading:
320 if self.thread is None:
321 self.thread = self.threading.current_thread()
323 sys.settrace(self._cached_bound_method_trace)
324 return self._cached_bound_method_trace
326 def stop(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
327 """Stop this Tracer."""
328 # Get the active tracer callback before setting the stop flag to be
329 # able to detect if the tracer was changed prior to stopping it.
330 tf = sys.gettrace()
332 # Set the stop flag. The actual call to sys.settrace(None) will happen
333 # in the self._trace callback itself to make sure to call it from the
334 # right thread.
335 self.stopped = True
337 if self.threading:
338 assert self.thread is not None
339 if self.thread.ident != self.threading.current_thread().ident:
340 # Called on a different thread than started us: we can't unhook
341 # ourselves, but we've set the flag that we should stop, so we
342 # won't do any more tracing.
343 # self.log("~", "stopping on different threads")
344 return
346 # PyPy clears the trace function before running atexit functions,
347 # so don't warn if we are in atexit on PyPy and the trace function
348 # has changed to None. Metacoverage also messes this up, so don't
349 # warn if we are measuring ourselves.
350 suppress_warning = (env.PYPY and self.in_atexit and tf is None) or env.METACOV
351 if self.warn and not suppress_warning:
352 if tf != self._cached_bound_method_trace: # pylint: disable=comparison-with-callable
353 self.warn(
354 "Trace function changed, data is likely wrong: "
355 + f"{tf!r} != {self._cached_bound_method_trace!r}",
356 slug="trace-changed",
357 )
359 def activity(self) -> bool: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
360 """Has there been any activity?"""
361 return self._activity 1NdOePfQgRhSiTjUkVlWmXnYo
363 def reset_activity(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
364 """Reset the activity() flag."""
365 self._activity = False 1NdOePfQgRhSiTjUkVlWmXnYo
367 def get_stats(self) -> dict[str, int] | None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo
368 """Return a dictionary of statistics, or None."""
369 return None