Coverage for tests / test_concurrency.py: 100.000%

308 statements  

« prev     ^ index     » next       coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000

1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 

2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt 

3 

4"""Tests for concurrency libraries.""" 

5 

6from __future__ import annotations 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

7 

8import glob 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

9import multiprocessing 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

10import os 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

11import pathlib 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

12import random 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

13import re 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

14import sys 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

15import threading 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

16import time 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

17 

18from types import ModuleType 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

19from collections.abc import Iterable 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

20 

21import pytest 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

22 

23import coverage 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

24from coverage import env 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

25from coverage.data import line_counts 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

26from coverage.exceptions import ConfigError 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

27from coverage.files import abs_file 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

28from coverage.misc import import_local_file 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

29 

30from tests import testenv 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

31from tests.coveragetest import CoverageTest 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

32 

33 

34# These libraries aren't always available, we'll skip tests if they aren't. 

35 

36try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

37 import eventlet 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

38except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

39 eventlet = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

40 

41try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

42 import gevent 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

43except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

44 gevent = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

45 

46try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

47 import greenlet 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

48except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(

49 greenlet = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(

50 

51 

52def measurable_line(l: str) -> bool: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

53 """Is this a line of code coverage will measure? 

54 

55 Not blank, not a comment, and not "else" 

56 """ 

57 l = l.strip() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

58 if not l: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

59 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

60 if l.startswith("#"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

61 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

62 if l.startswith("else:"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

63 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

64 return True 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

65 

66 

67def line_count(s: str) -> int: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

68 """How many measurable lines are in `s`?""" 

69 return len(list(filter(measurable_line, s.splitlines()))) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

70 

71 

72def print_simple_annotation(code: str, linenos: Iterable[int]) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

73 """Print the lines in `code` with X for each line number in `linenos`.""" 

74 for lineno, line in enumerate(code.splitlines(), start=1): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

75 print(" {} {}".format("X" if lineno in linenos else " ", line)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

76 

77 

78class LineCountTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

79 """Test the helpers here.""" 

80 

81 run_in_temp_dir = False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

82 

83 def test_line_count(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

84 CODE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

85 # Hey there! 

86 x = 1 

87 if x: 

88 print("hello") 

89 else: 

90 print("bye") 

91 

92 print("done") 

93 """ 

94 

95 assert line_count(CODE) == 5 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

96 

97 

98# The code common to all the concurrency models. 

99SUM_RANGE_Q = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

100 # Above this will be imports defining queue and threading. 

101 

102 class Producer(threading.Thread): 

103 def __init__(self, limit, q): 

104 threading.Thread.__init__(self) 

105 self.limit = limit 

106 self.q = q 

107 

108 def run(self): 

109 for i in range(self.limit): 

110 self.q.put(i) 

111 self.q.put(None) 

112 

113 class Consumer(threading.Thread): 

114 def __init__(self, q, qresult): 

115 threading.Thread.__init__(self) 

116 self.q = q 

117 self.qresult = qresult 

118 

119 def run(self): 

120 sum = 0 

121 while "no peephole".upper(): 

122 i = self.q.get() 

123 if i is None: 

124 break 

125 sum += i 

126 self.qresult.put(sum) 

127 

128 def sum_range(limit): 

129 q = queue.Queue() 

130 qresult = queue.Queue() 

131 c = Consumer(q, qresult) 

132 p = Producer(limit, q) 

133 c.start() 

134 p.start() 

135 

136 p.join() 

137 c.join() 

138 return qresult.get() 

139 

140 # Below this will be something using sum_range. 

141 """ 

142 

143PRINT_SUM_RANGE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

144 print(sum_range({QLIMIT})) 

145 """ 

146 

147# Import the things to use threads. 

148THREAD = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

149 import threading 

150 import queue 

151 """ 

152 

153# Import the things to use eventlet. 

154EVENTLET = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

155 import eventlet.green.threading as threading 

156 import eventlet.queue as queue 

157 """ 

158 

159# Import the things to use gevent. 

160GEVENT = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

161 from gevent import monkey 

162 monkey.patch_thread() 

163 import threading 

164 import gevent.queue as queue 

165 """ 

166 

167# Uncomplicated code that doesn't use any of the concurrency stuff, to test 

168# the simple case under each of the regimes. 

169SIMPLE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

170 total = 0 

171 for i in range({QLIMIT}): 

172 total += i 

173 print(total) 

174 """ 

175 

176 

177def cant_trace_msg(concurrency: str, the_module: ModuleType | None) -> str | None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

178 """What might coverage.py say about a concurrency setting and imported module?""" 

179 # In the concurrency choices, "multiprocessing" doesn't count, so remove it. 

180 if "multiprocessing" in concurrency: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

181 parts = concurrency.split(",") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

182 parts.remove("multiprocessing") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

183 concurrency = ",".join(parts) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

184 

185 if testenv.SYS_MON and concurrency: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

186 expected_out = f"Can't use core=sysmon: it doesn't support concurrency={concurrency}" 187)30%41'52(

187 elif the_module is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

188 # We don't even have the underlying module installed, we expect 

189 # coverage to alert us to this fact. 

190 expected_out = f"Couldn't trace with concurrency={concurrency}, the module isn't installed." 1DpCo6SldiaYImejbQJnfkcRKqrT

191 elif testenv.C_TRACER or concurrency == "thread" or concurrency == "": 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

192 expected_out = None 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

193 else: 

194 expected_out = ( 1ysUztVAuWBvXhgPqrT

195 f"Can't support concurrency={concurrency} with {testenv.REQUESTED_TRACER_CLASS}, " 

196 + "only threads are supported." 

197 ) 

198 return expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

199 

200 

201class ConcurrencyTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

202 """Tests of the concurrency support in coverage.py.""" 

203 

204 QLIMIT = 1000 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

205 

206 def try_some_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

207 self, 

208 code: str, 

209 concurrency: str, 

210 the_module: ModuleType, 

211 expected_out: str | None = None, 

212 ) -> None: 

213 """Run some concurrency testing code and see that it was all covered. 

214 

215 `code` is the Python code to execute. `concurrency` is the name of 

216 the concurrency regime to test it under. `the_module` is the imported 

217 module that must be available for this to work at all. `expected_out` 

218 is the text we expect the code to produce. 

219 

220 """ 

221 self.make_file("try_it.py", code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

222 

223 cmd = f"coverage run --concurrency={concurrency} try_it.py" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

224 _, out = self.run_command_status(cmd) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

225 

226 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

227 

228 if expected_cant_trace is not None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

229 assert expected_cant_trace in out 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

230 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

231 else: 

232 # We can fully measure the code if we are using the C tracer, which 

233 # can support all the concurrency, or if we are using threads. 

234 if expected_out is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

235 expected_out = "%d\n" % (sum(range(self.QLIMIT))) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

236 print(code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

237 assert out == expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

238 

239 # Read the coverage file and see that try_it.py has all its lines 

240 # executed. 

241 data = coverage.CoverageData(".coverage") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

242 data.read() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

243 

244 # If the test fails, it's helpful to see this info: 

245 fname = abs_file("try_it.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

246 linenos = data.lines(fname) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

247 assert linenos is not None 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

248 print(f"{len(linenos)}: {linenos}") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

249 print_simple_annotation(code, linenos) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

250 

251 lines = line_count(code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

252 assert line_counts(data)["try_it.py"] == lines 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

253 

254 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

255 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

256 ) 

257 def test_threads(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

258 code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

259 self.try_some_code(code, "thread", threading) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

260 

261 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

262 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

263 ) 

264 def test_threads_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

265 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

266 self.try_some_code(code, "thread", threading) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

267 

268 def test_eventlet(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

269 code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

270 self.try_some_code(code, "eventlet", eventlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

271 

272 def test_eventlet_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

273 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

274 self.try_some_code(code, "eventlet", eventlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

275 

276 # https://github.com/coveragepy/coveragepy/issues/663 

277 @pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

278 def test_gevent(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

279 code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

280 self.try_some_code(code, "gevent", gevent) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

281 

282 def test_gevent_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

283 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

284 self.try_some_code(code, "gevent", gevent) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

285 

286 def test_greenlet(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

287 GREENLET = """\ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

288 from greenlet import greenlet 

289 

290 def test1(x, y): 

291 z = gr2.switch(x+y) 

292 print(z) 

293 

294 def test2(u): 

295 print(u) 

296 gr1.switch(42) 

297 

298 gr1 = greenlet(test1) 

299 gr2 = greenlet(test2) 

300 gr1.switch("hello", " world") 

301 """ 

302 self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

303 

304 def test_greenlet_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

305 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

306 self.try_some_code(code, "greenlet", greenlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

307 

308 def test_bug_330(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

309 BUG_330 = """\ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

310 from weakref import WeakKeyDictionary 

311 import eventlet 

312 

313 def do(): 

314 eventlet.sleep(.01) 

315 

316 gts = WeakKeyDictionary() 

317 for _ in range(100): 

318 gts[eventlet.spawn(do)] = True 

319 eventlet.sleep(.005) 

320 

321 eventlet.sleep(.1) 

322 print(len(gts)) 

323 """ 

324 self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

325 

326 # Sometimes a test fails due to inherent randomness. Try more times. 

327 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

328 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

329 ) 

330 @pytest.mark.flaky(max_runs=3) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

331 def test_threads_with_gevent(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

332 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

333 "both.py", 

334 """\ 

335 import queue 

336 import threading 

337 

338 import gevent 

339 

340 def work1(q): 

341 q.put(1) 

342 

343 def gwork(q): 

344 gevent.spawn(work1, q).join() 

345 q.put(None) 

346 print("done") 

347 

348 q = queue.Queue() 

349 t = threading.Thread(target=gwork, args=(q,)) 

350 t.start() 

351 t.join() 

352 

353 answer = q.get() 

354 assert answer == 1 

355 """, 

356 ) 

357 _, out = self.run_command_status("coverage run --concurrency=thread,gevent both.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

358 if gevent is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

359 assert "Couldn't trace with concurrency=gevent, the module isn't installed.\n" in out 1DpCo6SldiaYImejbQJnfkcRKqrT

360 pytest.skip("Can't run test without gevent installed.") 1DpCo6SldiaYImejbQJnfkcRKqrT

361 if not testenv.C_TRACER: 1LyEs#UMzFt$VNAGu9WOBHv!XxhwgZP

362 assert testenv.PY_TRACER 1ysUztVAuWBvXhgP

363 assert out == ( 1ysUztVAuWBvXhgP

364 "Can't support concurrency=gevent with PyTracer, only threads are supported.\n" 

365 ) 

366 pytest.skip(f"Can't run gevent with {testenv.REQUESTED_TRACER_CLASS}.") 1ysUztVAuWBvXhgP

367 

368 assert out == "done\n" 1LE#MF$NG9OH!xwZ

369 

370 out = self.run_command("coverage report -m") 1LE#MF$NG9OH!xwZ

371 last_line = self.squeezed_lines(out)[-1] 1LE#MF$NG9OH!xwZ

372 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LE#MF$NG9OH!xwZ

373 

374 def test_bad_concurrency(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

375 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

376 self.command_line("run --concurrency=nothing prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

377 

378 def test_bad_concurrency_in_config(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

379 self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

380 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

381 self.command_line("run prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

382 

383 def test_no_multiple_light_concurrency(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

384 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

385 self.command_line("run --concurrency=gevent,eventlet prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

386 

387 def test_no_multiple_light_concurrency_in_config(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

388 self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

389 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

390 self.command_line("run prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

391 

392 def test_multiprocessing_needs_config_file(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

393 with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

394 self.command_line("run --concurrency=multiprocessing prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

395 

396 

397class WithoutConcurrencyModuleTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

398 """Tests of what happens if the requested concurrency isn't installed.""" 

399 

400 @pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"]) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

401 def test_missing_module(self, module: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

402 self.make_file("prog.py", "a = 1") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

403 sys.modules[module] = None # type: ignore[assignment] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

404 msg = rf"Couldn't trace with concurrency={module}, the module isn't installed." 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

405 with pytest.raises(ConfigError, match=msg): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

406 self.command_line(f"run --concurrency={module} prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

407 

408 

409SQUARE_OR_CUBE_WORK = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

410 def work(x): 

411 # Use different lines in different subprocesses. 

412 if x % 2: 

413 y = x*x 

414 else: 

415 y = x*x*x 

416 return y 

417 """ 

418 

419SUM_RANGE_WORK = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

420 def work(x): 

421 return sum_range((x+1)*100) 

422 """ 

423 

424MULTI_CODE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

425 # Above this will be a definition of work(). 

426 import multiprocessing 

427 import os 

428 import time 

429 import sys 

430 

431 def process_worker_main(args): 

432 # Need to pause, or the tasks go too quickly, and some processes 

433 # in the pool don't get any work, and then don't record data. 

434 ret = work(*args) 

435 time.sleep(0.1) 

436 return os.getpid(), ret 

437 

438 if __name__ == "__main__": # pragma: no branch 

439 # This if is on a single line so we can get 100% coverage 

440 # even if we have no arguments. 

441 if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1]) 

442 pool = multiprocessing.Pool({NPROCS}) 

443 inputs = [(x,) for x in range({UPTO})] 

444 outputs = pool.imap_unordered(process_worker_main, inputs) 

445 pids = set() 

446 total = 0 

447 for pid, sq in outputs: 

448 pids.add(pid) 

449 total += sq 

450 print(f"{{len(pids)}} pids, {{total = }}") 

451 pool.close() 

452 pool.join() 

453 """ 

454 

455 

456@pytest.fixture(params=["fork", "spawn"], name="start_method") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

457def start_method_fixture(request: pytest.FixtureRequest) -> str: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

458 """Parameterized fixture to choose the start_method for multiprocessing.""" 

459 start_method: str = request.param 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

460 if start_method not in multiprocessing.get_all_start_methods(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

461 # Windows doesn't support "fork". 

462 pytest.skip(f"start_method={start_method} not supported here") 1#U$V9W!X6SZP)YI%QJ'RK(T

463 return start_method 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

464 

465 

466# Sometimes a test fails due to inherent randomness. Try more times. 

467# @pytest.mark.flaky(max_runs=30) 

468class MultiprocessingTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

469 """Test support of the multiprocessing module.""" 

470 

471 def try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

472 self, 

473 code: str, 

474 expected_out: str | None, 

475 the_module: ModuleType, 

476 nprocs: int, 

477 start_method: str, 

478 concurrency: str = "multiprocessing", 

479 args: str = "", 

480 ) -> None: 

481 """Run code using multiprocessing, it should produce `expected_out`.""" 

482 self.make_file("multi.py", code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

483 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

484 ".coveragerc", 

485 f"""\ 

486 [run] 

487 concurrency = {concurrency} 

488 source = . 

489 """, 

490 ) 

491 

492 cmd = f"coverage run {args} multi.py {start_method}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

493 _, out = self.run_command_status(cmd) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

494 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

495 

496 if expected_cant_trace is not None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

497 assert expected_cant_trace in out 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

498 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

499 else: 

500 assert out.rstrip() == expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

501 assert len(glob.glob(".coverage.*")) == nprocs + 1 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

502 

503 out = self.run_command("coverage combine") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

504 out_lines = out.splitlines() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

505 assert len(out_lines) == nprocs + 1 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

506 assert all( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

507 re.fullmatch( 

508 r"(Combined data file|Skipping duplicate data) \.coverage\..*\.\d+\.X\w{6}x", 

509 line, 

510 ) 

511 for line in out_lines 

512 ) 

513 assert len(glob.glob(".coverage.*")) == 0 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

514 out = self.run_command("coverage report -m") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

515 

516 last_line = self.squeezed_lines(out)[-1] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

517 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

518 

519 def test_multiprocessing_simple(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

520 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

521 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

522 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

523 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

524 expected_out = f"{nprocs} pids, {total = }" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

525 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

526 code, 

527 expected_out, 

528 threading, 

529 nprocs, 

530 start_method=start_method, 

531 ) 

532 

533 def test_multiprocessing_append(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

534 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

535 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

536 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

537 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

538 expected_out = f"{nprocs} pids, total = {total}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

539 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

540 code, 

541 expected_out, 

542 threading, 

543 nprocs, 

544 args="--append", 

545 start_method=start_method, 

546 ) 

547 

548 def test_multiprocessing_and_gevent(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

549 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

550 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

551 code = (SUM_RANGE_WORK + EVENTLET + SUM_RANGE_Q + MULTI_CODE).format( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

552 NPROCS=nprocs, UPTO=upto 

553 ) 

554 total = sum(sum(range((x + 1) * 100)) for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

555 expected_out = f"{nprocs} pids, total = {total}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

556 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

557 code, 

558 expected_out, 

559 eventlet, 

560 nprocs, 

561 concurrency="multiprocessing,eventlet", 

562 start_method=start_method, 

563 ) 

564 

565 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

566 not testenv.CAN_MEASURE_BRANCHES, reason="Can't measure branches with this core" 

567 ) 

568 def test_multiprocessing_with_branching(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

569 nprocs = 3 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

570 upto = 30 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

571 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

572 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

573 expected_out = f"{nprocs} pids, total = {total}" 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

574 self.make_file("multi.py", code) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

575 self.make_file( 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

576 "multi.rc", 

577 """\ 

578 [run] 

579 concurrency = multiprocessing 

580 branch = True 

581 omit = */site-packages/* 

582 """, 

583 ) 

584 

585 out = self.run_command(f"coverage run --rcfile=multi.rc multi.py {start_method}") 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

586 assert out.rstrip() == expected_out 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

587 

588 out = self.run_command("coverage combine -q") # sneak in a test of -q 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

589 assert out == "" 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

590 out = self.run_command("coverage report -m") 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

591 

592 last_line = self.squeezed_lines(out)[-1] 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

593 assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(

594 

595 def test_multiprocessing_bootstrap_error_handling(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

596 # An exception during bootstrapping will be reported. 

597 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

598 "multi.py", 

599 """\ 

600 import multiprocessing 

601 if __name__ == "__main__": 

602 with multiprocessing.Manager(): 

603 pass 

604 """, 

605 ) 

606 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

607 ".coveragerc", 

608 """\ 

609 [run] 

610 concurrency = multiprocessing 

611 _crash = _bootstrap 

612 """, 

613 ) 

614 out = self.run_command("coverage run multi.py", status=1) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

615 assert "Exception during multiprocessing bootstrap init" in out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

616 assert "RuntimeError: Crashing because called by _bootstrap" in out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

617 

618 def test_bug_890(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

619 # chdir in multiprocessing shouldn't keep us from finding the 

620 # .coveragerc file. 

621 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

622 "multi.py", 

623 """\ 

624 import multiprocessing, os, os.path 

625 if __name__ == "__main__": 

626 if not os.path.exists("./tmp"): os.mkdir("./tmp") 

627 os.chdir("./tmp") 

628 with multiprocessing.Manager(): 

629 pass 

630 print("ok") 

631 """, 

632 ) 

633 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

634 ".coveragerc", 

635 """\ 

636 [run] 

637 concurrency = multiprocessing 

638 """, 

639 ) 

640 out = self.run_command("coverage run multi.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

641 assert out.splitlines()[-1] == "ok" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

642 

643 

644@pytest.mark.skipif(not testenv.SETTRACE_CORE, reason="gettrace is not supported with this core.") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

645def test_coverage_stop_in_threads() -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

646 has_started_coverage = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

647 has_stopped_coverage = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

648 

649 def run_thread() -> None: # pragma: nested 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

650 """Check that coverage is stopping properly in threads.""" 

651 deadline = time.time() + 5 

652 ident = threading.current_thread().ident 

653 if sys.gettrace() is not None: 

654 has_started_coverage.append(ident) 

655 while sys.gettrace() is not None: 

656 # Wait for coverage to stop 

657 time.sleep(0.01) 

658 if time.time() > deadline: 

659 return 

660 has_stopped_coverage.append(ident) 

661 

662 cov = coverage.Coverage() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

663 with cov.collect(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT

664 t = threading.Thread(target=run_thread) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

665 t.start() 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

666 

667 time.sleep(0.1) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

668 t.join() 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT

669 

670 assert has_started_coverage == [t.ident] 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT

671 assert has_stopped_coverage == [t.ident] 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT

672 

673 

674def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

675 # Non-regression test for: https://github.com/coveragepy/coveragepy/issues/581 

676 

677 # Create some Python modules and put them in the path 

678 modules_dir = tmp_path / "test_modules" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

679 modules_dir.mkdir() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

680 module_names = [f"m{i:03d}" for i in range(1000)] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

681 for module_name in module_names: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

682 (modules_dir / (module_name + ".py")).write_text("def f(): pass\n", encoding="utf-8") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

683 

684 # Shared variables for threads 

685 should_run = [True] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

686 imported = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

687 

688 old_dir = os.getcwd() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

689 os.chdir(modules_dir) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

690 try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

691 # Make sure that all dummy modules can be imported. 

692 for module_name in module_names: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

693 import_local_file(module_name) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

694 

695 def random_load() -> None: # pragma: nested 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

696 """Import modules randomly to stress coverage.""" 

697 while should_run[0]: 

698 module_name = random.choice(module_names) 

699 mod = import_local_file(module_name) 

700 mod.f() 

701 imported.append(mod) 

702 

703 # Spawn some threads with coverage enabled and attempt to read the 

704 # results right after stopping coverage collection with the threads 

705 # still running. 

706 duration = 0.01 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

707 for _ in range(3): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

708 cov = coverage.Coverage() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

709 with cov.collect(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

710 threads = [threading.Thread(target=random_load) for _ in range(10)] 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

711 should_run[0] = True 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

712 for t in threads: 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

713 t.start() 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

714 

715 time.sleep(duration) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT

716 

717 # The following call used to crash with running background threads. 

718 cov.get_data() 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

719 

720 # Stop the threads 

721 should_run[0] = False 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

722 for t in threads: 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

723 t.join() 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

724 

725 if (not imported) and duration < 10: # pragma: only failure 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

726 duration *= 2 

727 

728 finally: 

729 os.chdir(old_dir) 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

730 should_run[0] = False 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT

731 

732 

733@pytest.mark.skipif(env.WINDOWS, reason="SIGTERM doesn't work the same on Windows") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

734@pytest.mark.flaky(max_runs=3) # Sometimes a test fails due to inherent randomness. Try more times. 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

735class SigtermTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

736 """Tests of our handling of SIGTERM.""" 

737 

738 @pytest.mark.parametrize("sigterm", [False, True]) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

739 def test_sigterm_multiprocessing_saves_data(self, sigterm: bool) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

740 # A terminated process should save its coverage data. 

741 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

742 "clobbered.py", 

743 """\ 

744 import multiprocessing 

745 import time 

746 

747 def subproc(x): 

748 if x.value == 3: 

749 print("THREE", flush=True) # line 6, missed 

750 else: 

751 print("NOT THREE", flush=True) 

752 x.value = 0 

753 time.sleep(60) 

754 

755 if __name__ == "__main__": 

756 print("START", flush=True) 

757 x = multiprocessing.Value("L", 1) 

758 proc = multiprocessing.Process(target=subproc, args=(x,)) 

759 proc.start() 

760 while x.value != 0: 

761 time.sleep(.05) 

762 proc.terminate() 

763 print("END", flush=True) 

764 """, 

765 ) 

766 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

767 ".coveragerc", 

768 """\ 

769 [run] 

770 parallel = True 

771 concurrency = multiprocessing 

772 """ 

773 + ("sigterm = true" if sigterm else ""), 

774 ) 

775 out = self.run_command("coverage run clobbered.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

776 # Under Linux, things go wrong. Does that matter? 

777 if env.LINUX and "assert self._collectors" in out: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

778 lines = out.splitlines(True) 1EsFtGuHvCoqr

779 out = "".join(lines[:3]) 1EsFtGuHvCoqr

780 assert out == "START\nNOT THREE\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

781 self.run_command("coverage combine") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

782 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

783 if sigterm: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

784 expected = "clobbered.py 17 1 94% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

785 else: 

786 expected = "clobbered.py 17 5 71% 5-10" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

787 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

788 

789 def test_sigterm_threading_saves_data(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

790 # A terminated process should save its coverage data. 

791 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

792 "handler.py", 

793 """\ 

794 import os, signal 

795 

796 print("START", flush=True) 

797 print("SIGTERM", flush=True) 

798 os.kill(os.getpid(), signal.SIGTERM) 

799 print("NOT HERE", flush=True) 

800 """, 

801 ) 

802 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

803 ".coveragerc", 

804 """\ 

805 [run] 

806 # The default concurrency option. 

807 concurrency = thread 

808 sigterm = true 

809 """, 

810 ) 

811 status, out = self.run_command_status("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

812 assert status != 0 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

813 out_lines = out.splitlines() 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

814 assert len(out_lines) in [2, 3] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

815 assert out_lines[:2] == ["START", "SIGTERM"] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

816 if len(out_lines) == 3: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

817 assert out_lines[2] == "Terminated" 1EsFtGuHvCowg7ia0jb1kc2qr

818 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

819 expected = "handler.py 5 1 80% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

820 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

821 

822 def test_sigterm_still_runs(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT

823 # A terminated process still runs its own SIGTERM handler. 

824 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

825 "handler.py", 

826 """\ 

827 import multiprocessing 

828 import signal 

829 import time 

830 

831 def subproc(x): 

832 print("START", flush=True) 

833 def on_sigterm(signum, frame): 

834 print("SIGTERM", flush=True) 

835 

836 signal.signal(signal.SIGTERM, on_sigterm) 

837 x.value = 0 

838 try: 

839 time.sleep(.1) 

840 except OSError: # This happens on PyPy3.11 on Mac 

841 pass 

842 print("END", flush=True) 

843 

844 if __name__ == "__main__": 

845 x = multiprocessing.Value("L", 1) 

846 proc = multiprocessing.Process(target=subproc, args=(x,)) 

847 proc.start() 

848 while x.value != 0: 

849 time.sleep(.02) 

850 proc.terminate() 

851 """, 

852 ) 

853 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

854 ".coveragerc", 

855 """\ 

856 [run] 

857 parallel = True 

858 concurrency = multiprocessing 

859 sigterm = True 

860 """, 

861 ) 

862 out = self.run_command("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

863 assert out == "START\nSIGTERM\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr