Coverage for tests / test_concurrency.py: 100.000%

308 statements  

« prev     ^ index     » next       coverage.py v7.12.1a0.dev1, created at 2025-11-29 20:34 +0000

1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0 

2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt 

3 

4"""Tests for concurrency libraries.""" 

5 

6from __future__ import annotations 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

7 

8import glob 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

9import multiprocessing 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

10import os 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

11import pathlib 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

12import random 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

13import re 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

14import sys 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

15import threading 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

16import time 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

17 

18from types import ModuleType 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

19from collections.abc import Iterable 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

20 

21import pytest 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

22 

23import coverage 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

24from coverage import env 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

25from coverage.data import line_counts 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

26from coverage.exceptions import ConfigError 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

27from coverage.files import abs_file 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

28from coverage.misc import import_local_file 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

29 

30from tests import testenv 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

31from tests.coveragetest import CoverageTest 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

32 

33 

34# These libraries aren't always available, we'll skip tests if they aren't. 

35 

36try: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

37 import eventlet 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

38except ImportError: 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

39 eventlet = None 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

40 

41try: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

42 import gevent 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

43except ImportError: 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

44 gevent = None 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

45 

46try: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

47 import greenlet 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

48except ImportError: 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(

49 greenlet = None 1DpCo6Tld3ia0QI%me4jb1RJ'nf5kc2SK(

50 

51 

52def measurable_line(l: str) -> bool: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

53 """Is this a line of code coverage will measure? 

54 

55 Not blank, not a comment, and not "else" 

56 """ 

57 l = l.strip() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

58 if not l: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

59 return False 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

60 if l.startswith("#"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

61 return False 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

62 if l.startswith("else:"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

63 return False 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

64 return True 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

65 

66 

67def line_count(s: str) -> int: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

68 """How many measurable lines are in `s`?""" 

69 return len(list(filter(measurable_line, s.splitlines()))) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

70 

71 

72def print_simple_annotation(code: str, linenos: Iterable[int]) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

73 """Print the lines in `code` with X for each line number in `linenos`.""" 

74 for lineno, line in enumerate(code.splitlines(), start=1): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

75 print(" {} {}".format("X" if lineno in linenos else " ", line)) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

76 

77 

78class LineCountTest(CoverageTest): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

79 """Test the helpers here.""" 

80 

81 run_in_temp_dir = False 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

82 

83 def test_line_count(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

84 CODE = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

85 # Hey there! 

86 x = 1 

87 if x: 

88 print("hello") 

89 else: 

90 print("bye") 

91 

92 print("done") 

93 """ 

94 

95 assert line_count(CODE) == 5 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

96 

97 

98# The code common to all the concurrency models. 

99SUM_RANGE_Q = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

100 # Above this will be imports defining queue and threading. 

101 

102 class Producer(threading.Thread): 

103 def __init__(self, limit, q): 

104 threading.Thread.__init__(self) 

105 self.limit = limit 

106 self.q = q 

107 

108 def run(self): 

109 for i in range(self.limit): 

110 self.q.put(i) 

111 self.q.put(None) 

112 

113 class Consumer(threading.Thread): 

114 def __init__(self, q, qresult): 

115 threading.Thread.__init__(self) 

116 self.q = q 

117 self.qresult = qresult 

118 

119 def run(self): 

120 sum = 0 

121 while "no peephole".upper(): 

122 i = self.q.get() 

123 if i is None: 

124 break 

125 sum += i 

126 self.qresult.put(sum) 

127 

128 def sum_range(limit): 

129 q = queue.Queue() 

130 qresult = queue.Queue() 

131 c = Consumer(q, qresult) 

132 p = Producer(limit, q) 

133 c.start() 

134 p.start() 

135 

136 p.join() 

137 c.join() 

138 return qresult.get() 

139 

140 # Below this will be something using sum_range. 

141 """ 

142 

143PRINT_SUM_RANGE = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

144 print(sum_range({QLIMIT})) 

145 """ 

146 

147# Import the things to use threads. 

148THREAD = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

149 import threading 

150 import queue 

151 """ 

152 

153# Import the things to use eventlet. 

154EVENTLET = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

155 import eventlet.green.threading as threading 

156 import eventlet.queue as queue 

157 """ 

158 

159# Import the things to use gevent. 

160GEVENT = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

161 from gevent import monkey 

162 monkey.patch_thread() 

163 import threading 

164 import gevent.queue as queue 

165 """ 

166 

167# Uncomplicated code that doesn't use any of the concurrency stuff, to test 

168# the simple case under each of the regimes. 

169SIMPLE = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

170 total = 0 

171 for i in range({QLIMIT}): 

172 total += i 

173 print(total) 

174 """ 

175 

176 

177def cant_trace_msg(concurrency: str, the_module: ModuleType | None) -> str | None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

178 """What might coverage.py say about a concurrency setting and imported module?""" 

179 # In the concurrency choices, "multiprocessing" doesn't count, so remove it. 

180 if "multiprocessing" in concurrency: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

181 parts = concurrency.split(",") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

182 parts.remove("multiprocessing") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

183 concurrency = ",".join(parts) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

184 

185 if testenv.SYS_MON and concurrency: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

186 expected_out = f"Can't use core=sysmon: it doesn't support concurrency={concurrency}" 187)30%41'52(

187 elif the_module is None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

188 # We don't even have the underlying module installed, we expect 

189 # coverage to alert us to this fact. 

190 expected_out = f"Couldn't trace with concurrency={concurrency}, the module isn't installed." 1DpCo6TldiaQImejbRJnfkcSKqrU

191 elif testenv.C_TRACER or concurrency == "thread" or concurrency == "": 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

192 expected_out = None 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

193 else: 

194 expected_out = ( 1ysVztWAuXBvYhgPqrU

195 f"Can't support concurrency={concurrency} with {testenv.REQUESTED_TRACER_CLASS}, " 

196 + "only threads are supported." 

197 ) 

198 return expected_out 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

199 

200 

201class ConcurrencyTest(CoverageTest): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

202 """Tests of the concurrency support in coverage.py.""" 

203 

204 QLIMIT = 1000 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

205 

206 def try_some_code( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

207 self, 

208 code: str, 

209 concurrency: str, 

210 the_module: ModuleType, 

211 expected_out: str | None = None, 

212 ) -> None: 

213 """Run some concurrency testing code and see that it was all covered. 

214 

215 `code` is the Python code to execute. `concurrency` is the name of 

216 the concurrency regime to test it under. `the_module` is the imported 

217 module that must be available for this to work at all. `expected_out` 

218 is the text we expect the code to produce. 

219 

220 """ 

221 self.make_file("try_it.py", code) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

222 

223 cmd = f"coverage run --concurrency={concurrency} try_it.py" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

224 _, out = self.run_command_status(cmd) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

225 

226 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

227 

228 if expected_cant_trace is not None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

229 assert expected_cant_trace in out 1ysVztWAuXBvYDpCo6Th8g7P)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

230 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysVztWAuXBvYDpCo6Th8g7P)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

231 else: 

232 # We can fully measure the code if we are using the C tracer, which 

233 # can support all the concurrency, or if we are using threads. 

234 if expected_out is None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

235 expected_out = "%d\n" % (sum(range(self.QLIMIT))) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

236 print(code) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

237 assert out == expected_out 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

238 

239 # Read the coverage file and see that try_it.py has all its lines 

240 # executed. 

241 data = coverage.CoverageData(".coverage") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

242 data.read() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

243 

244 # If the test fails, it's helpful to see this info: 

245 fname = abs_file("try_it.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

246 linenos = data.lines(fname) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

247 assert linenos is not None 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

248 print(f"{len(linenos)}: {linenos}") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

249 print_simple_annotation(code, linenos) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

250 

251 lines = line_count(code) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

252 assert line_counts(data)["try_it.py"] == lines 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

253 

254 @pytest.mark.skipif( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

255 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

256 ) 

257 def test_threads(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

258 code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

259 self.try_some_code(code, "thread", threading) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

260 

261 @pytest.mark.skipif( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

262 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

263 ) 

264 def test_threads_simple_code(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

265 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

266 self.try_some_code(code, "thread", threading) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

267 

268 def test_eventlet(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

269 code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

270 self.try_some_code(code, "eventlet", eventlet) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

271 

272 def test_eventlet_simple_code(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

273 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

274 self.try_some_code(code, "eventlet", eventlet) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

275 

276 # https://github.com/coveragepy/coveragepy/issues/663 

277 @pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

278 def test_gevent(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

279 code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

280 self.try_some_code(code, "gevent", gevent) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

281 

282 def test_gevent_simple_code(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

283 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

284 self.try_some_code(code, "gevent", gevent) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

285 

286 def test_greenlet(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

287 GREENLET = """\ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

288 from greenlet import greenlet 

289 

290 def test1(x, y): 

291 z = gr2.switch(x+y) 

292 print(z) 

293 

294 def test2(u): 

295 print(u) 

296 gr1.switch(42) 

297 

298 gr1 = greenlet(test1) 

299 gr2 = greenlet(test2) 

300 gr1.switch("hello", " world") 

301 """ 

302 self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

303 

304 def test_greenlet_simple_code(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

305 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

306 self.try_some_code(code, "greenlet", greenlet) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

307 

308 def test_bug_330(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

309 BUG_330 = """\ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

310 from weakref import WeakKeyDictionary 

311 import eventlet 

312 

313 def do(): 

314 eventlet.sleep(.01) 

315 

316 gts = WeakKeyDictionary() 

317 for _ in range(100): 

318 gts[eventlet.spawn(do)] = True 

319 eventlet.sleep(.005) 

320 

321 eventlet.sleep(.1) 

322 print(len(gts)) 

323 """ 

324 self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

325 

326 # Sometimes a test fails due to inherent randomness. Try more times. 

327 @pytest.mark.skipif( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

328 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core." 

329 ) 

330 @pytest.mark.flaky(max_runs=3) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

331 def test_threads_with_gevent(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

332 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

333 "both.py", 

334 """\ 

335 import queue 

336 import threading 

337 

338 import gevent 

339 

340 def work1(q): 

341 q.put(1) 

342 

343 def gwork(q): 

344 gevent.spawn(work1, q).join() 

345 q.put(None) 

346 print("done") 

347 

348 q = queue.Queue() 

349 t = threading.Thread(target=gwork, args=(q,)) 

350 t.start() 

351 t.join() 

352 

353 answer = q.get() 

354 assert answer == 1 

355 """, 

356 ) 

357 _, out = self.run_command_status("coverage run --concurrency=thread,gevent both.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

358 if gevent is None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

359 assert "Couldn't trace with concurrency=gevent, the module isn't installed.\n" in out 1DpCo6TldiaQImejbRJnfkcSKqrU

360 pytest.skip("Can't run test without gevent installed.") 1DpCo6TldiaQImejbRJnfkcSKqrU

361 if not testenv.C_TRACER: 1LyEs9VMzFt!WNAGu#XOBHv$YxhwgZP

362 assert testenv.PY_TRACER 1ysVztWAuXBvYhgP

363 assert out == ( 1ysVztWAuXBvYhgP

364 "Can't support concurrency=gevent with PyTracer, only threads are supported.\n" 

365 ) 

366 pytest.skip(f"Can't run gevent with {testenv.REQUESTED_TRACER_CLASS}.") 1ysVztWAuXBvYhgP

367 

368 assert out == "done\n" 1LE9MF!NG#OH$xwZ

369 

370 out = self.run_command("coverage report -m") 1LE9MF!NG#OH$xwZ

371 last_line = self.squeezed_lines(out)[-1] 1LE9MF!NG#OH$xwZ

372 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LE9MF!NG#OH$xwZ

373 

374 def test_bad_concurrency(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

375 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

376 self.command_line("run --concurrency=nothing prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

377 

378 def test_bad_concurrency_in_config(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

379 self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

380 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

381 self.command_line("run prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

382 

383 def test_no_multiple_light_concurrency(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

384 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

385 self.command_line("run --concurrency=gevent,eventlet prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

386 

387 def test_no_multiple_light_concurrency_in_config(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

388 self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

389 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

390 self.command_line("run prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

391 

392 def test_multiprocessing_needs_config_file(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

393 with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

394 self.command_line("run --concurrency=multiprocessing prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

395 

396 

397class WithoutConcurrencyModuleTest(CoverageTest): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

398 """Tests of what happens if the requested concurrency isn't installed.""" 

399 

400 @pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"]) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

401 def test_missing_module(self, module: str) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

402 self.make_file("prog.py", "a = 1") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

403 sys.modules[module] = None # type: ignore[assignment] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

404 msg = rf"Couldn't trace with concurrency={module}, the module isn't installed." 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

405 with pytest.raises(ConfigError, match=msg): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

406 self.command_line(f"run --concurrency={module} prog.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

407 

408 

409SQUARE_OR_CUBE_WORK = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

410 def work(x): 

411 # Use different lines in different subprocesses. 

412 if x % 2: 

413 y = x*x 

414 else: 

415 y = x*x*x 

416 return y 

417 """ 

418 

419SUM_RANGE_WORK = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

420 def work(x): 

421 return sum_range((x+1)*100) 

422 """ 

423 

424MULTI_CODE = """ 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

425 # Above this will be a definition of work(). 

426 import multiprocessing 

427 import os 

428 import time 

429 import sys 

430 

431 def process_worker_main(args): 

432 # Need to pause, or the tasks go too quickly, and some processes 

433 # in the pool don't get any work, and then don't record data. 

434 ret = work(*args) 

435 time.sleep(0.1) 

436 return os.getpid(), ret 

437 

438 if __name__ == "__main__": # pragma: no branch 

439 # This if is on a single line so we can get 100% coverage 

440 # even if we have no arguments. 

441 if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1]) 

442 pool = multiprocessing.Pool({NPROCS}) 

443 inputs = [(x,) for x in range({UPTO})] 

444 outputs = pool.imap_unordered(process_worker_main, inputs) 

445 pids = set() 

446 total = 0 

447 for pid, sq in outputs: 

448 pids.add(pid) 

449 total += sq 

450 print(f"{{len(pids)}} pids, {{total = }}") 

451 pool.close() 

452 pool.join() 

453 """ 

454 

455 

456@pytest.fixture(params=["fork", "spawn"], name="start_method") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

457def start_method_fixture(request: pytest.FixtureRequest) -> str: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

458 """Parameterized fixture to choose the start_method for multiprocessing.""" 

459 start_method: str = request.param 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

460 if start_method not in multiprocessing.get_all_start_methods(): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

461 # Windows doesn't support "fork". 

462 pytest.skip(f"start_method={start_method} not supported here") 19V!W#X$Y6TZP)QI%RJ'SK(U

463 return start_method 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

464 

465 

466# Sometimes a test fails due to inherent randomness. Try more times. 

467# @pytest.mark.flaky(max_runs=30) 

468class MultiprocessingTest(CoverageTest): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

469 """Test support of the multiprocessing module.""" 

470 

471 def try_multiprocessing_code( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

472 self, 

473 code: str, 

474 expected_out: str | None, 

475 the_module: ModuleType, 

476 nprocs: int, 

477 start_method: str, 

478 concurrency: str = "multiprocessing", 

479 args: str = "", 

480 ) -> None: 

481 """Run code using multiprocessing, it should produce `expected_out`.""" 

482 self.make_file("multi.py", code) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

483 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

484 ".coveragerc", 

485 f"""\ 

486 [run] 

487 concurrency = {concurrency} 

488 source = . 

489 """, 

490 ) 

491 

492 cmd = f"coverage run {args} multi.py {start_method}" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

493 _, out = self.run_command_status(cmd) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

494 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

495 

496 if expected_cant_trace is not None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

497 assert expected_cant_trace in out 1ysVztWAuXBvYDpCo6Th8g7P)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

498 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysVztWAuXBvYDpCo6Th8g7P)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

499 else: 

500 assert out.rstrip() == expected_out 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

501 assert len(glob.glob(".coverage.*")) == nprocs + 1 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

502 

503 out = self.run_command("coverage combine") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

504 out_lines = out.splitlines() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

505 assert len(out_lines) == nprocs + 1 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

506 assert all( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

507 re.fullmatch( 

508 r"(Combined data file|Skipping duplicate data) \.coverage\..*\.\d+\.X\w{6}x", 

509 line, 

510 ) 

511 for line in out_lines 

512 ) 

513 assert len(glob.glob(".coverage.*")) == 0 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

514 out = self.run_command("coverage report -m") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

515 

516 last_line = self.squeezed_lines(out)[-1] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

517 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

518 

519 def test_multiprocessing_simple(self, start_method: str) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

520 nprocs = 3 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

521 upto = 30 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

522 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

523 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

524 expected_out = f"{nprocs} pids, {total = }" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

525 self.try_multiprocessing_code( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

526 code, 

527 expected_out, 

528 threading, 

529 nprocs, 

530 start_method=start_method, 

531 ) 

532 

533 def test_multiprocessing_append(self, start_method: str) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

534 nprocs = 3 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

535 upto = 30 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

536 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

537 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

538 expected_out = f"{nprocs} pids, total = {total}" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

539 self.try_multiprocessing_code( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

540 code, 

541 expected_out, 

542 threading, 

543 nprocs, 

544 args="--append", 

545 start_method=start_method, 

546 ) 

547 

548 def test_multiprocessing_and_gevent(self, start_method: str) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

549 nprocs = 3 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

550 upto = 30 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

551 code = (SUM_RANGE_WORK + EVENTLET + SUM_RANGE_Q + MULTI_CODE).format( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

552 NPROCS=nprocs, UPTO=upto 

553 ) 

554 total = sum(sum(range((x + 1) * 100)) for x in range(upto)) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

555 expected_out = f"{nprocs} pids, total = {total}" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

556 self.try_multiprocessing_code( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

557 code, 

558 expected_out, 

559 eventlet, 

560 nprocs, 

561 concurrency="multiprocessing,eventlet", 

562 start_method=start_method, 

563 ) 

564 

565 @pytest.mark.skipif( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

566 not testenv.CAN_MEASURE_BRANCHES, reason="Can't measure branches with this core" 

567 ) 

568 def test_multiprocessing_with_branching(self, start_method: str) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

569 nprocs = 3 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

570 upto = 30 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

571 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

572 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

573 expected_out = f"{nprocs} pids, total = {total}" 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

574 self.make_file("multi.py", code) 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

575 self.make_file( 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

576 "multi.rc", 

577 """\ 

578 [run] 

579 concurrency = multiprocessing 

580 branch = True 

581 omit = */site-packages/* 

582 """, 

583 ) 

584 

585 out = self.run_command(f"coverage run --rcfile=multi.rc multi.py {start_method}") 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

586 assert out.rstrip() == expected_out 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

587 

588 out = self.run_command("coverage combine -q") # sneak in a test of -q 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

589 assert out == "" 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

590 out = self.run_command("coverage report -m") 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

591 

592 last_line = self.squeezed_lines(out)[-1] 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

593 assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line) 1xh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(

594 

595 def test_multiprocessing_bootstrap_error_handling(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

596 # An exception during bootstrapping will be reported. 

597 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

598 "multi.py", 

599 """\ 

600 import multiprocessing 

601 if __name__ == "__main__": 

602 with multiprocessing.Manager(): 

603 pass 

604 """, 

605 ) 

606 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

607 ".coveragerc", 

608 """\ 

609 [run] 

610 concurrency = multiprocessing 

611 _crash = _bootstrap 

612 """, 

613 ) 

614 out = self.run_command("coverage run multi.py", status=1) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

615 assert "Exception during multiprocessing bootstrap init" in out 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

616 assert "RuntimeError: Crashing because called by _bootstrap" in out 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

617 

618 def test_bug_890(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

619 # chdir in multiprocessing shouldn't keep us from finding the 

620 # .coveragerc file. 

621 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

622 "multi.py", 

623 """\ 

624 import multiprocessing, os, os.path 

625 if __name__ == "__main__": 

626 if not os.path.exists("./tmp"): os.mkdir("./tmp") 

627 os.chdir("./tmp") 

628 with multiprocessing.Manager(): 

629 pass 

630 print("ok") 

631 """, 

632 ) 

633 self.make_file( 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

634 ".coveragerc", 

635 """\ 

636 [run] 

637 concurrency = multiprocessing 

638 """, 

639 ) 

640 out = self.run_command("coverage run multi.py") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

641 assert out.splitlines()[-1] == "ok" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

642 

643 

644@pytest.mark.skipif(not testenv.SETTRACE_CORE, reason="gettrace is not supported with this core.") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

645def test_coverage_stop_in_threads() -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

646 has_started_coverage = [] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

647 has_stopped_coverage = [] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

648 

649 def run_thread() -> None: # pragma: nested 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

650 """Check that coverage is stopping properly in threads.""" 

651 deadline = time.time() + 5 

652 ident = threading.current_thread().ident 

653 if sys.gettrace() is not None: 

654 has_started_coverage.append(ident) 

655 while sys.gettrace() is not None: 

656 # Wait for coverage to stop 

657 time.sleep(0.01) 

658 if time.time() > deadline: 

659 return 

660 has_stopped_coverage.append(ident) 

661 

662 cov = coverage.Coverage() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

663 with cov.collect(): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

664 t = threading.Thread(target=run_thread) 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

665 t.start() 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

666 

667 time.sleep(0.1) 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

668 t.join() 1LyEsVMzFtWNAGuXOBHvYDpCoTxhwgPldiaQImejbRJnfkcSKqrU

669 

670 assert has_started_coverage == [t.ident] 1LyEsVMzFtWNAGuXOBHvYDpCoTxhwgPldiaQImejbRJnfkcSKqrU

671 assert has_stopped_coverage == [t.ident] 1LyEsVMzFtWNAGuXOBHvYDpCoTxhwgPldiaQImejbRJnfkcSKqrU

672 

673 

674def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

675 # Non-regression test for: https://github.com/coveragepy/coveragepy/issues/581 

676 

677 # Create some Python modules and put them in the path 

678 modules_dir = tmp_path / "test_modules" 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

679 modules_dir.mkdir() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

680 module_names = [f"m{i:03d}" for i in range(1000)] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

681 for module_name in module_names: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

682 (modules_dir / (module_name + ".py")).write_text("def f(): pass\n", encoding="utf-8") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

683 

684 # Shared variables for threads 

685 should_run = [True] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

686 imported = [] 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

687 

688 old_dir = os.getcwd() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

689 os.chdir(modules_dir) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

690 try: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

691 # Make sure that all dummy modules can be imported. 

692 for module_name in module_names: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

693 import_local_file(module_name) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

694 

695 def random_load() -> None: # pragma: nested 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

696 """Import modules randomly to stress coverage.""" 

697 while should_run[0]: 

698 module_name = random.choice(module_names) 

699 mod = import_local_file(module_name) 

700 mod.f() 

701 imported.append(mod) 

702 

703 # Spawn some threads with coverage enabled and attempt to read the 

704 # results right after stopping coverage collection with the threads 

705 # still running. 

706 duration = 0.01 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

707 for _ in range(3): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

708 cov = coverage.Coverage() 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

709 with cov.collect(): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

710 threads = [threading.Thread(target=random_load) for _ in range(10)] 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

711 should_run[0] = True 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

712 for t in threads: 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

713 t.start() 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

714 

715 time.sleep(duration) 1ysVztWAuXBvYpoThgPdaIebJfcKqrU

716 

717 # The following call used to crash with running background threads. 

718 cov.get_data() 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

719 

720 # Stop the threads 

721 should_run[0] = False 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

722 for t in threads: 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

723 t.join() 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

724 

725 if (not imported) and duration < 10: # pragma: only failure 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

726 duration *= 2 

727 

728 finally: 

729 os.chdir(old_dir) 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

730 should_run[0] = False 1LyEsVMzFtWNAGuXOBHvYDpCo6TxhwgZPldiaQImejbRJnfkcSKqrU

731 

732 

733@pytest.mark.skipif(env.WINDOWS, reason="SIGTERM doesn't work the same on Windows") 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

734@pytest.mark.flaky(max_runs=3) # Sometimes a test fails due to inherent randomness. Try more times. 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

735class SigtermTest(CoverageTest): 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

736 """Tests of our handling of SIGTERM.""" 

737 

738 @pytest.mark.parametrize("sigterm", [False, True]) 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

739 def test_sigterm_multiprocessing_saves_data(self, sigterm: bool) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

740 # A terminated process should save its coverage data. 

741 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

742 "clobbered.py", 

743 """\ 

744 import multiprocessing 

745 import time 

746 

747 def subproc(x): 

748 if x.value == 3: 

749 print("THREE", flush=True) # line 6, missed 

750 else: 

751 print("NOT THREE", flush=True) 

752 x.value = 0 

753 time.sleep(60) 

754 

755 if __name__ == "__main__": 

756 print("START", flush=True) 

757 x = multiprocessing.Value("L", 1) 

758 proc = multiprocessing.Process(target=subproc, args=(x,)) 

759 proc.start() 

760 while x.value != 0: 

761 time.sleep(.05) 

762 proc.terminate() 

763 print("END", flush=True) 

764 """, 

765 ) 

766 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

767 ".coveragerc", 

768 """\ 

769 [run] 

770 parallel = True 

771 concurrency = multiprocessing 

772 """ 

773 + ("sigterm = true" if sigterm else ""), 

774 ) 

775 out = self.run_command("coverage run clobbered.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

776 # Under Linux, things go wrong. Does that matter? 

777 if env.LINUX and "assert self._collectors" in out: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

778 lines = out.splitlines(True) 1EsFtGuHvCoqr

779 out = "".join(lines[:3]) 1EsFtGuHvCoqr

780 assert out == "START\nNOT THREE\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

781 self.run_command("coverage combine") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

782 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

783 if sigterm: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

784 expected = "clobbered.py 17 1 94% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

785 else: 

786 expected = "clobbered.py 17 5 71% 5-10" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

787 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

788 

789 def test_sigterm_threading_saves_data(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

790 # A terminated process should save its coverage data. 

791 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

792 "handler.py", 

793 """\ 

794 import os, signal 

795 

796 print("START", flush=True) 

797 print("SIGTERM", flush=True) 

798 os.kill(os.getpid(), signal.SIGTERM) 

799 print("NOT HERE", flush=True) 

800 """, 

801 ) 

802 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

803 ".coveragerc", 

804 """\ 

805 [run] 

806 # The default concurrency option. 

807 concurrency = thread 

808 sigterm = true 

809 """, 

810 ) 

811 status, out = self.run_command_status("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

812 assert status != 0 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

813 out_lines = out.splitlines() 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

814 assert len(out_lines) in [2, 3] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

815 assert out_lines[:2] == ["START", "SIGTERM"] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

816 if len(out_lines) == 3: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

817 assert out_lines[2] == "Terminated" 1EsFtGuHvCowg7ia0jb1kc2qr

818 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

819 expected = "handler.py 5 1 80% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

820 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

821 

822 def test_sigterm_still_runs(self) -> None: 1LyEs9VMzFt!WNAGu#XOBHv$YDpCo6Txh8wg7ZP)ld3ia0QI%me4jb1RJ'nf5kc2SK(qrU

823 # A terminated process still runs its own SIGTERM handler. 

824 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

825 "handler.py", 

826 """\ 

827 import multiprocessing 

828 import signal 

829 import time 

830 

831 def subproc(x): 

832 print("START", flush=True) 

833 def on_sigterm(signum, frame): 

834 print("SIGTERM", flush=True) 

835 

836 signal.signal(signal.SIGTERM, on_sigterm) 

837 x.value = 0 

838 try: 

839 time.sleep(.1) 

840 except OSError: # This happens on PyPy3.11 on Mac 

841 pass 

842 print("END", flush=True) 

843 

844 if __name__ == "__main__": 

845 x = multiprocessing.Value("L", 1) 

846 proc = multiprocessing.Process(target=subproc, args=(x,)) 

847 proc.start() 

848 while x.value != 0: 

849 time.sleep(.02) 

850 proc.terminate() 

851 """, 

852 ) 

853 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

854 ".coveragerc", 

855 """\ 

856 [run] 

857 parallel = True 

858 concurrency = multiprocessing 

859 sigterm = True 

860 """, 

861 ) 

862 out = self.run_command("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr

863 assert out == "START\nSIGTERM\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr