Coverage for coverage / pytracer.py: 14.783%

164 statements  

« prev     ^ index     » next       coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000

1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 

2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt 

3 

4"""Raw data collector for coverage.py.""" 

5 

6from __future__ import annotations 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

7 

8import atexit 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

9import dis 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

10import itertools 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

11import sys 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

12import threading 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

13from types import FrameType, ModuleType 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

14from typing import Any, Callable, cast 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

15 

16from coverage import env 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

17from coverage.types import ( 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

18 TArc, 

19 TFileDisposition, 

20 TLineNo, 

21 Tracer, 

22 TShouldStartContextFn, 

23 TShouldTraceFn, 

24 TTraceData, 

25 TTraceFileData, 

26 TTraceFn, 

27 TWarnFn, 

28) 

29 

30# I don't understand why, but if we use `cast(set[TLineNo], ...)` inside 

31# the _trace() function, we get some strange behavior on PyPy 3.10. 

32# Assigning these names here and using them below fixes the problem. 

33# See https://github.com/coveragepy/coveragepy/issues/1902 

34set_TLineNo = set[TLineNo] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

35set_TArc = set[TArc] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

36 

37 

38# We need the YIELD_VALUE opcode below, in a comparison-friendly form. 

39# PYVERSIONS: RESUME is new in Python3.11 

40RESUME = dis.opmap.get("RESUME") 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

41RETURN_VALUE = dis.opmap["RETURN_VALUE"] 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

42if RESUME is None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

43 YIELD_VALUE = dis.opmap["YIELD_VALUE"] 1abc

44 YIELD_FROM = dis.opmap["YIELD_FROM"] 1abc

45 YIELD_FROM_OFFSET = 0 if env.PYPY else 2 1abc

46else: 

47 YIELD_VALUE = YIELD_FROM = YIELD_FROM_OFFSET = -1 1pqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

48 

49# When running meta-coverage, this file can try to trace itself, which confuses 

50# everything. Don't trace ourselves. 

51 

52THIS_FILE = __file__.rstrip("co") 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

53 

54 

55class PyTracer(Tracer): 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

56 """Python implementation of the raw data tracer.""" 

57 

58 # Because of poor implementations of trace-function-manipulating tools, 

59 # the Python trace function must be kept very simple. In particular, there 

60 # must be only one function ever set as the trace function, both through 

61 # sys.settrace, and as the return value from the trace function. Put 

62 # another way, the trace function must always return itself. It cannot 

63 # swap in other functions, or return None to avoid tracing a particular 

64 # frame. 

65 # 

66 # The trace manipulator that introduced this restriction is DecoratorTools, 

67 # which sets a trace function, and then later restores the pre-existing one 

68 # by calling sys.settrace with a function it found in the current frame. 

69 # 

70 # Systems that use DecoratorTools (or similar trace manipulations) must use 

71 # PyTracer to get accurate results. The command-line --timid argument is 

72 # used to force the use of this tracer. 

73 

74 tracer_ids = itertools.count() 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

75 

76 def __init__(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

77 # Which tracer are we? 

78 self.id = next(self.tracer_ids) 

79 

80 # Attributes set from the collector: 

81 self.data: TTraceData 

82 self.trace_arcs = False 

83 self.should_trace: TShouldTraceFn 

84 self.should_trace_cache: dict[str, TFileDisposition | None] 

85 self.should_start_context: TShouldStartContextFn | None = None 

86 self.switch_context: Callable[[str | None], None] | None = None 

87 self.lock_data: Callable[[], None] 

88 self.unlock_data: Callable[[], None] 

89 self.warn: TWarnFn 

90 

91 # The threading module to use, if any. 

92 self.threading: ModuleType | None = None 

93 

94 self.cur_file_data: TTraceFileData | None = None 

95 self.last_line: TLineNo = 0 

96 self.cur_file_name: str | None = None 

97 self.context: str | None = None 

98 self.started_context = False 

99 

100 # The data_stack parallels the Python call stack. Each entry is 

101 # information about an active frame, a four-element tuple: 

102 # [0] The TTraceData for this frame's file. Could be None if we 

103 # aren't tracing this frame. 

104 # [1] The current file name for the frame. None if we aren't tracing 

105 # this frame. 

106 # [2] The last line number executed in this frame. 

107 # [3] Boolean: did this frame start a new context? 

108 self.data_stack: list[tuple[TTraceFileData | None, str | None, TLineNo, bool]] = [] 

109 self.thread: threading.Thread | None = None 

110 self.stopped = False 

111 self._activity = False 

112 

113 self.in_atexit = False 

114 # On exit, self.in_atexit = True 

115 atexit.register(setattr, self, "in_atexit", True) 

116 

117 # Cache a bound method on the instance, so that we don't have to 

118 # re-create a bound method object all the time. 

119 self._cached_bound_method_trace: TTraceFn = self._trace 

120 

121 def __repr__(self) -> str: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

122 points = sum(len(v) for v in self.data.values()) 

123 files = len(self.data) 

124 return f"<PyTracer at {id(self):#x}: {points} data points in {files} files>" 

125 

126 def log(self, marker: str, *args: Any) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

127 """For hard-core logging of what this tracer is doing.""" 

128 with open("/tmp/debug_trace.txt", "a", encoding="utf-8") as f: 

129 f.write(f"{marker} {self.id}[{len(self.data_stack)}]") 

130 if 0: # if you want thread ids.. 

131 f.write( # type: ignore[unreachable] 

132 ".{:x}.{:x}".format( 

133 self.thread.ident, 

134 self.threading.current_thread().ident, 

135 ) 

136 ) 

137 f.write(" {}".format(" ".join(map(str, args)))) 

138 if 0: # if you want callers.. 

139 f.write(" | ") # type: ignore[unreachable] 

140 stack = " / ".join( 

141 (fname or "???").rpartition("/")[-1] for _, fname, _, _ in self.data_stack 

142 ) 

143 f.write(stack) 

144 f.write("\n") 

145 

146 def _trace( 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

147 self, 

148 frame: FrameType, 

149 event: str, 

150 arg: Any, # pylint: disable=unused-argument 

151 lineno: TLineNo | None = None, # pylint: disable=unused-argument 

152 ) -> TTraceFn | None: 

153 """The trace function passed to sys.settrace.""" 

154 

155 if THIS_FILE in frame.f_code.co_filename: 

156 return None 

157 

158 # f = frame; code = f.f_code 

159 # self.log(":", f"{code.co_filename} {f.f_lineno} {code.co_name}()", event) 

160 

161 if self.stopped and sys.gettrace() == self._cached_bound_method_trace: # pylint: disable=comparison-with-callable 

162 # The PyTrace.stop() method has been called, possibly by another 

163 # thread, let's deactivate ourselves now. 

164 if 0: 

165 f = frame # type: ignore[unreachable] 

166 self.log("---\nX", f.f_code.co_filename, f.f_lineno) 

167 while f: 

168 self.log(">", f.f_code.co_filename, f.f_lineno, f.f_code.co_name, f.f_trace) 

169 f = f.f_back 

170 sys.settrace(None) 

171 try: 

172 self.cur_file_data, self.cur_file_name, self.last_line, self.started_context = ( 

173 self.data_stack.pop() 

174 ) 

175 except IndexError: 

176 self.log( 

177 "Empty stack!", 

178 frame.f_code.co_filename, 

179 frame.f_lineno, 

180 frame.f_code.co_name, 

181 ) 

182 return None 

183 

184 # if event != "call" and frame.f_code.co_filename != self.cur_file_name: 

185 # self.log("---\n*", frame.f_code.co_filename, self.cur_file_name, frame.f_lineno) 

186 

187 if event == "call": 

188 # Should we start a new context? 

189 if self.should_start_context and self.context is None: 

190 context_maybe = self.should_start_context(frame) # pylint: disable=not-callable 

191 if context_maybe is not None: 

192 self.context = context_maybe 

193 started_context = True 

194 assert self.switch_context is not None 

195 self.switch_context(self.context) # pylint: disable=not-callable 

196 else: 

197 started_context = False 

198 else: 

199 started_context = False 

200 self.started_context = started_context 

201 

202 # Entering a new frame. Decide if we should trace in this file. 

203 self._activity = True 

204 self.data_stack.append( 

205 ( 

206 self.cur_file_data, 

207 self.cur_file_name, 

208 self.last_line, 

209 started_context, 

210 ), 

211 ) 

212 

213 # Improve tracing performance: when calling a function, both caller 

214 # and callee are often within the same file. if that's the case, we 

215 # don't have to re-check whether to trace the corresponding 

216 # function (which is a little bit expensive since it involves 

217 # dictionary lookups). This optimization is only correct if we 

218 # didn't start a context. 

219 filename = frame.f_code.co_filename 

220 if filename != self.cur_file_name or started_context: 

221 self.cur_file_name = filename 

222 disp = self.should_trace_cache.get(filename) 

223 if disp is None: 

224 disp = self.should_trace(filename, frame) 

225 self.should_trace_cache[filename] = disp 

226 

227 self.cur_file_data = None 

228 if disp.trace: 

229 tracename = disp.source_filename 

230 assert tracename is not None 

231 self.lock_data() 

232 try: 

233 if tracename not in self.data: 

234 self.data[tracename] = set() 

235 finally: 

236 self.unlock_data() 

237 self.cur_file_data = self.data[tracename] 

238 else: 

239 frame.f_trace_lines = False 

240 elif not self.cur_file_data: 

241 frame.f_trace_lines = False 

242 

243 # The call event is really a "start frame" event, and happens for 

244 # function calls and re-entering generators. The f_lasti field is 

245 # -1 for calls, and a real offset for generators. Use <0 as the 

246 # line number for calls, and the real line number for generators. 

247 if RESUME is not None: 

248 # The current opcode is guaranteed to be RESUME. The argument 

249 # determines what kind of resume it is. 

250 oparg = frame.f_code.co_code[frame.f_lasti + 1] 

251 real_call = (oparg == 0) # fmt: skip 

252 else: 

253 real_call = (getattr(frame, "f_lasti", -1) < 0) # fmt: skip 

254 if real_call: 

255 self.last_line = -frame.f_code.co_firstlineno 

256 else: 

257 self.last_line = frame.f_lineno 

258 

259 elif event == "line": 

260 # Record an executed line. 

261 if self.cur_file_data is not None: 

262 flineno: TLineNo = frame.f_lineno 

263 

264 if self.trace_arcs: 

265 cast(set_TArc, self.cur_file_data).add((self.last_line, flineno)) 

266 else: 

267 cast(set_TLineNo, self.cur_file_data).add(flineno) 

268 self.last_line = flineno 

269 

270 elif event == "return": 

271 if self.trace_arcs and self.cur_file_data: 

272 # Record an arc leaving the function, but beware that a 

273 # "return" event might just mean yielding from a generator. 

274 code = frame.f_code.co_code 

275 lasti = frame.f_lasti 

276 if RESUME is not None: 

277 if len(code) == lasti + 2: 

278 # A return from the end of a code object is a real return. 

279 real_return = True 

280 else: 

281 # It is a real return if we aren't going to resume next. 

282 if env.PYBEHAVIOR.lasti_is_yield: 

283 lasti += 2 

284 real_return = code[lasti] != RESUME 

285 else: 

286 if code[lasti] == RETURN_VALUE: 

287 real_return = True 

288 elif code[lasti] == YIELD_VALUE: 

289 real_return = False 

290 elif len(code) <= lasti + YIELD_FROM_OFFSET: 

291 real_return = True 

292 elif code[lasti + YIELD_FROM_OFFSET] == YIELD_FROM: 

293 real_return = False 

294 else: 

295 real_return = True 

296 if real_return: 

297 first = frame.f_code.co_firstlineno 

298 cast(set_TArc, self.cur_file_data).add((self.last_line, -first)) 

299 

300 # Leaving this function, pop the filename stack. 

301 self.cur_file_data, self.cur_file_name, self.last_line, self.started_context = ( 

302 self.data_stack.pop() 

303 ) 

304 # Leaving a context? 

305 if self.started_context: 

306 assert self.switch_context is not None 

307 self.context = None 

308 self.switch_context(None) # pylint: disable=not-callable 

309 

310 return self._cached_bound_method_trace 

311 

312 def start(self) -> TTraceFn: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

313 """Start this Tracer. 

314 

315 Return a Python function suitable for use with sys.settrace(). 

316 

317 """ 

318 self.stopped = False 

319 if self.threading: 

320 if self.thread is None: 

321 self.thread = self.threading.current_thread() 

322 

323 sys.settrace(self._cached_bound_method_trace) 

324 return self._cached_bound_method_trace 

325 

326 def stop(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

327 """Stop this Tracer.""" 

328 # Get the active tracer callback before setting the stop flag to be 

329 # able to detect if the tracer was changed prior to stopping it. 

330 tf = sys.gettrace() 

331 

332 # Set the stop flag. The actual call to sys.settrace(None) will happen 

333 # in the self._trace callback itself to make sure to call it from the 

334 # right thread. 

335 self.stopped = True 

336 

337 if self.threading: 

338 assert self.thread is not None 

339 if self.thread.ident != self.threading.current_thread().ident: 

340 # Called on a different thread than started us: we can't unhook 

341 # ourselves, but we've set the flag that we should stop, so we 

342 # won't do any more tracing. 

343 # self.log("~", "stopping on different threads") 

344 return 

345 

346 # PyPy clears the trace function before running atexit functions, 

347 # so don't warn if we are in atexit on PyPy and the trace function 

348 # has changed to None. Metacoverage also messes this up, so don't 

349 # warn if we are measuring ourselves. 

350 suppress_warning = (env.PYPY and self.in_atexit and tf is None) or env.METACOV 

351 if self.warn and not suppress_warning: 

352 if tf != self._cached_bound_method_trace: # pylint: disable=comparison-with-callable 

353 self.warn( 

354 "Trace function changed, data is likely wrong: " 

355 + f"{tf!r} != {self._cached_bound_method_trace!r}", 

356 slug="trace-changed", 

357 ) 

358 

359 def activity(self) -> bool: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

360 """Has there been any activity?""" 

361 return self._activity 1NdOePfQgRhSiTjUkVlWmXnYo

362 

363 def reset_activity(self) -> None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

364 """Reset the activity() flag.""" 

365 self._activity = False 1NdOePfQgRhSiTjUkVlWmXnYo

366 

367 def get_stats(self) -> dict[str, int] | None: 1abcpqrstuvwxyzABdCeDfEgFhGiHjIkJlKmLnMo

368 """Return a dictionary of statistics, or None.""" 

369 return None