Coverage for tests / test_concurrency.py: 100.000%
308 statements
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
« prev ^ index » next coverage.py v7.12.1a0.dev1, created at 2025-11-30 17:57 +0000
1# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
2# For details: https://github.com/coveragepy/coveragepy/blob/main/NOTICE.txt
4"""Tests for concurrency libraries."""
6from __future__ import annotations 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
8import glob 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
9import multiprocessing 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
10import os 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
11import pathlib 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
12import random 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
13import re 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
14import sys 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
15import threading 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
16import time 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
18from types import ModuleType 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
19from collections.abc import Iterable 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
21import pytest 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
23import coverage 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
24from coverage import env 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
25from coverage.data import line_counts 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
26from coverage.exceptions import ConfigError 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
27from coverage.files import abs_file 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
28from coverage.misc import import_local_file 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
30from tests import testenv 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
31from tests.coveragetest import CoverageTest 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
34# These libraries aren't always available, we'll skip tests if they aren't.
36try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
37 import eventlet 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
38except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
39 eventlet = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
41try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
42 import gevent 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
43except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
44 gevent = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
46try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
47 import greenlet 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
48except ImportError: 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(
49 greenlet = None 1DpCo6Sld3ia0YI%me4jb1QJ'nf5kc2RK(
52def measurable_line(l: str) -> bool: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
53 """Is this a line of code coverage will measure?
55 Not blank, not a comment, and not "else"
56 """
57 l = l.strip() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
58 if not l: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
59 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
60 if l.startswith("#"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
61 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
62 if l.startswith("else:"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
63 return False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
64 return True 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
67def line_count(s: str) -> int: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
68 """How many measurable lines are in `s`?"""
69 return len(list(filter(measurable_line, s.splitlines()))) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
72def print_simple_annotation(code: str, linenos: Iterable[int]) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
73 """Print the lines in `code` with X for each line number in `linenos`."""
74 for lineno, line in enumerate(code.splitlines(), start=1): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
75 print(" {} {}".format("X" if lineno in linenos else " ", line)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
78class LineCountTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
79 """Test the helpers here."""
81 run_in_temp_dir = False 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
83 def test_line_count(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
84 CODE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
85 # Hey there!
86 x = 1
87 if x:
88 print("hello")
89 else:
90 print("bye")
92 print("done")
93 """
95 assert line_count(CODE) == 5 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
98# The code common to all the concurrency models.
99SUM_RANGE_Q = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
100 # Above this will be imports defining queue and threading.
102 class Producer(threading.Thread):
103 def __init__(self, limit, q):
104 threading.Thread.__init__(self)
105 self.limit = limit
106 self.q = q
108 def run(self):
109 for i in range(self.limit):
110 self.q.put(i)
111 self.q.put(None)
113 class Consumer(threading.Thread):
114 def __init__(self, q, qresult):
115 threading.Thread.__init__(self)
116 self.q = q
117 self.qresult = qresult
119 def run(self):
120 sum = 0
121 while "no peephole".upper():
122 i = self.q.get()
123 if i is None:
124 break
125 sum += i
126 self.qresult.put(sum)
128 def sum_range(limit):
129 q = queue.Queue()
130 qresult = queue.Queue()
131 c = Consumer(q, qresult)
132 p = Producer(limit, q)
133 c.start()
134 p.start()
136 p.join()
137 c.join()
138 return qresult.get()
140 # Below this will be something using sum_range.
141 """
143PRINT_SUM_RANGE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
144 print(sum_range({QLIMIT}))
145 """
147# Import the things to use threads.
148THREAD = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
149 import threading
150 import queue
151 """
153# Import the things to use eventlet.
154EVENTLET = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
155 import eventlet.green.threading as threading
156 import eventlet.queue as queue
157 """
159# Import the things to use gevent.
160GEVENT = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
161 from gevent import monkey
162 monkey.patch_thread()
163 import threading
164 import gevent.queue as queue
165 """
167# Uncomplicated code that doesn't use any of the concurrency stuff, to test
168# the simple case under each of the regimes.
169SIMPLE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
170 total = 0
171 for i in range({QLIMIT}):
172 total += i
173 print(total)
174 """
177def cant_trace_msg(concurrency: str, the_module: ModuleType | None) -> str | None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
178 """What might coverage.py say about a concurrency setting and imported module?"""
179 # In the concurrency choices, "multiprocessing" doesn't count, so remove it.
180 if "multiprocessing" in concurrency: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
181 parts = concurrency.split(",") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
182 parts.remove("multiprocessing") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
183 concurrency = ",".join(parts) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
185 if testenv.SYS_MON and concurrency: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
186 expected_out = f"Can't use core=sysmon: it doesn't support concurrency={concurrency}" 187)30%41'52(
187 elif the_module is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
188 # We don't even have the underlying module installed, we expect
189 # coverage to alert us to this fact.
190 expected_out = f"Couldn't trace with concurrency={concurrency}, the module isn't installed." 1DpCo6SldiaYImejbQJnfkcRKqrT
191 elif testenv.C_TRACER or concurrency == "thread" or concurrency == "": 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
192 expected_out = None 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
193 else:
194 expected_out = ( 1ysUztVAuWBvXhgPqrT
195 f"Can't support concurrency={concurrency} with {testenv.REQUESTED_TRACER_CLASS}, "
196 + "only threads are supported."
197 )
198 return expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
201class ConcurrencyTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
202 """Tests of the concurrency support in coverage.py."""
204 QLIMIT = 1000 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
206 def try_some_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
207 self,
208 code: str,
209 concurrency: str,
210 the_module: ModuleType,
211 expected_out: str | None = None,
212 ) -> None:
213 """Run some concurrency testing code and see that it was all covered.
215 `code` is the Python code to execute. `concurrency` is the name of
216 the concurrency regime to test it under. `the_module` is the imported
217 module that must be available for this to work at all. `expected_out`
218 is the text we expect the code to produce.
220 """
221 self.make_file("try_it.py", code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
223 cmd = f"coverage run --concurrency={concurrency} try_it.py" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
224 _, out = self.run_command_status(cmd) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
226 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
228 if expected_cant_trace is not None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
229 assert expected_cant_trace in out 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
230 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
231 else:
232 # We can fully measure the code if we are using the C tracer, which
233 # can support all the concurrency, or if we are using threads.
234 if expected_out is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
235 expected_out = "%d\n" % (sum(range(self.QLIMIT))) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
236 print(code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
237 assert out == expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
239 # Read the coverage file and see that try_it.py has all its lines
240 # executed.
241 data = coverage.CoverageData(".coverage") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
242 data.read() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
244 # If the test fails, it's helpful to see this info:
245 fname = abs_file("try_it.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
246 linenos = data.lines(fname) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
247 assert linenos is not None 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
248 print(f"{len(linenos)}: {linenos}") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
249 print_simple_annotation(code, linenos) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
251 lines = line_count(code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
252 assert line_counts(data)["try_it.py"] == lines 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
254 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
255 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core."
256 )
257 def test_threads(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
258 code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
259 self.try_some_code(code, "thread", threading) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
261 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
262 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core."
263 )
264 def test_threads_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
265 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
266 self.try_some_code(code, "thread", threading) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
268 def test_eventlet(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
269 code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
270 self.try_some_code(code, "eventlet", eventlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
272 def test_eventlet_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
273 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
274 self.try_some_code(code, "eventlet", eventlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
276 # https://github.com/coveragepy/coveragepy/issues/663
277 @pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
278 def test_gevent(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
279 code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
280 self.try_some_code(code, "gevent", gevent) 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
282 def test_gevent_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
283 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
284 self.try_some_code(code, "gevent", gevent) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
286 def test_greenlet(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
287 GREENLET = """\ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
288 from greenlet import greenlet
290 def test1(x, y):
291 z = gr2.switch(x+y)
292 print(z)
294 def test2(u):
295 print(u)
296 gr1.switch(42)
298 gr1 = greenlet(test1)
299 gr2 = greenlet(test2)
300 gr1.switch("hello", " world")
301 """
302 self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
304 def test_greenlet_simple_code(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
305 code = SIMPLE.format(QLIMIT=self.QLIMIT) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
306 self.try_some_code(code, "greenlet", greenlet) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
308 def test_bug_330(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
309 BUG_330 = """\ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
310 from weakref import WeakKeyDictionary
311 import eventlet
313 def do():
314 eventlet.sleep(.01)
316 gts = WeakKeyDictionary()
317 for _ in range(100):
318 gts[eventlet.spawn(do)] = True
319 eventlet.sleep(.005)
321 eventlet.sleep(.1)
322 print(len(gts))
323 """
324 self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
326 # Sometimes a test fails due to inherent randomness. Try more times.
327 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
328 not testenv.CAN_MEASURE_THREADS, reason="Can't measure threads with this core."
329 )
330 @pytest.mark.flaky(max_runs=3) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
331 def test_threads_with_gevent(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
332 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
333 "both.py",
334 """\
335 import queue
336 import threading
338 import gevent
340 def work1(q):
341 q.put(1)
343 def gwork(q):
344 gevent.spawn(work1, q).join()
345 q.put(None)
346 print("done")
348 q = queue.Queue()
349 t = threading.Thread(target=gwork, args=(q,))
350 t.start()
351 t.join()
353 answer = q.get()
354 assert answer == 1
355 """,
356 )
357 _, out = self.run_command_status("coverage run --concurrency=thread,gevent both.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
358 if gevent is None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
359 assert "Couldn't trace with concurrency=gevent, the module isn't installed.\n" in out 1DpCo6SldiaYImejbQJnfkcRKqrT
360 pytest.skip("Can't run test without gevent installed.") 1DpCo6SldiaYImejbQJnfkcRKqrT
361 if not testenv.C_TRACER: 1LyEs#UMzFt$VNAGu9WOBHv!XxhwgZP
362 assert testenv.PY_TRACER 1ysUztVAuWBvXhgP
363 assert out == ( 1ysUztVAuWBvXhgP
364 "Can't support concurrency=gevent with PyTracer, only threads are supported.\n"
365 )
366 pytest.skip(f"Can't run gevent with {testenv.REQUESTED_TRACER_CLASS}.") 1ysUztVAuWBvXhgP
368 assert out == "done\n" 1LE#MF$NG9OH!xwZ
370 out = self.run_command("coverage report -m") 1LE#MF$NG9OH!xwZ
371 last_line = self.squeezed_lines(out)[-1] 1LE#MF$NG9OH!xwZ
372 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LE#MF$NG9OH!xwZ
374 def test_bad_concurrency(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
375 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
376 self.command_line("run --concurrency=nothing prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
378 def test_bad_concurrency_in_config(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
379 self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
380 with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
381 self.command_line("run prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
383 def test_no_multiple_light_concurrency(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
384 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
385 self.command_line("run --concurrency=gevent,eventlet prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
387 def test_no_multiple_light_concurrency_in_config(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
388 self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
389 with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
390 self.command_line("run prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
392 def test_multiprocessing_needs_config_file(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
393 with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
394 self.command_line("run --concurrency=multiprocessing prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
397class WithoutConcurrencyModuleTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
398 """Tests of what happens if the requested concurrency isn't installed."""
400 @pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"]) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
401 def test_missing_module(self, module: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
402 self.make_file("prog.py", "a = 1") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
403 sys.modules[module] = None # type: ignore[assignment] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
404 msg = rf"Couldn't trace with concurrency={module}, the module isn't installed." 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
405 with pytest.raises(ConfigError, match=msg): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
406 self.command_line(f"run --concurrency={module} prog.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
409SQUARE_OR_CUBE_WORK = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
410 def work(x):
411 # Use different lines in different subprocesses.
412 if x % 2:
413 y = x*x
414 else:
415 y = x*x*x
416 return y
417 """
419SUM_RANGE_WORK = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
420 def work(x):
421 return sum_range((x+1)*100)
422 """
424MULTI_CODE = """ 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
425 # Above this will be a definition of work().
426 import multiprocessing
427 import os
428 import time
429 import sys
431 def process_worker_main(args):
432 # Need to pause, or the tasks go too quickly, and some processes
433 # in the pool don't get any work, and then don't record data.
434 ret = work(*args)
435 time.sleep(0.1)
436 return os.getpid(), ret
438 if __name__ == "__main__": # pragma: no branch
439 # This if is on a single line so we can get 100% coverage
440 # even if we have no arguments.
441 if len(sys.argv) > 1: multiprocessing.set_start_method(sys.argv[1])
442 pool = multiprocessing.Pool({NPROCS})
443 inputs = [(x,) for x in range({UPTO})]
444 outputs = pool.imap_unordered(process_worker_main, inputs)
445 pids = set()
446 total = 0
447 for pid, sq in outputs:
448 pids.add(pid)
449 total += sq
450 print(f"{{len(pids)}} pids, {{total = }}")
451 pool.close()
452 pool.join()
453 """
456@pytest.fixture(params=["fork", "spawn"], name="start_method") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
457def start_method_fixture(request: pytest.FixtureRequest) -> str: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
458 """Parameterized fixture to choose the start_method for multiprocessing."""
459 start_method: str = request.param 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
460 if start_method not in multiprocessing.get_all_start_methods(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
461 # Windows doesn't support "fork".
462 pytest.skip(f"start_method={start_method} not supported here") 1#U$V9W!X6SZP)YI%QJ'RK(T
463 return start_method 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
466# Sometimes a test fails due to inherent randomness. Try more times.
467# @pytest.mark.flaky(max_runs=30)
468class MultiprocessingTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
469 """Test support of the multiprocessing module."""
471 def try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
472 self,
473 code: str,
474 expected_out: str | None,
475 the_module: ModuleType,
476 nprocs: int,
477 start_method: str,
478 concurrency: str = "multiprocessing",
479 args: str = "",
480 ) -> None:
481 """Run code using multiprocessing, it should produce `expected_out`."""
482 self.make_file("multi.py", code) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
483 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
484 ".coveragerc",
485 f"""\
486 [run]
487 concurrency = {concurrency}
488 source = .
489 """,
490 )
492 cmd = f"coverage run {args} multi.py {start_method}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
493 _, out = self.run_command_status(cmd) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
494 expected_cant_trace = cant_trace_msg(concurrency, the_module) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
496 if expected_cant_trace is not None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
497 assert expected_cant_trace in out 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
498 pytest.skip(f"Can't test: {expected_cant_trace}") 1ysUztVAuWBvXDpCo6Sh8g7P)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
499 else:
500 assert out.rstrip() == expected_out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
501 assert len(glob.glob(".coverage.*")) == nprocs + 1 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
503 out = self.run_command("coverage combine") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
504 out_lines = out.splitlines() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
505 assert len(out_lines) == nprocs + 1 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
506 assert all( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
507 re.fullmatch(
508 r"(Combined data file|Skipping duplicate data) \.coverage\..*\.\d+\.X\w{6}x",
509 line,
510 )
511 for line in out_lines
512 )
513 assert len(glob.glob(".coverage.*")) == 0 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
514 out = self.run_command("coverage report -m") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
516 last_line = self.squeezed_lines(out)[-1] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
517 assert re.search(r"TOTAL \d+ 0 100%", last_line) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
519 def test_multiprocessing_simple(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
520 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
521 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
522 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
523 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
524 expected_out = f"{nprocs} pids, {total = }" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
525 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
526 code,
527 expected_out,
528 threading,
529 nprocs,
530 start_method=start_method,
531 )
533 def test_multiprocessing_append(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
534 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
535 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
536 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
537 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
538 expected_out = f"{nprocs} pids, total = {total}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
539 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
540 code,
541 expected_out,
542 threading,
543 nprocs,
544 args="--append",
545 start_method=start_method,
546 )
548 def test_multiprocessing_and_gevent(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
549 nprocs = 3 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
550 upto = 30 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
551 code = (SUM_RANGE_WORK + EVENTLET + SUM_RANGE_Q + MULTI_CODE).format( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
552 NPROCS=nprocs, UPTO=upto
553 )
554 total = sum(sum(range((x + 1) * 100)) for x in range(upto)) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
555 expected_out = f"{nprocs} pids, total = {total}" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
556 self.try_multiprocessing_code( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
557 code,
558 expected_out,
559 eventlet,
560 nprocs,
561 concurrency="multiprocessing,eventlet",
562 start_method=start_method,
563 )
565 @pytest.mark.skipif( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
566 not testenv.CAN_MEASURE_BRANCHES, reason="Can't measure branches with this core"
567 )
568 def test_multiprocessing_with_branching(self, start_method: str) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
569 nprocs = 3 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
570 upto = 30 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
571 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
572 total = sum(x * x if x % 2 else x * x * x for x in range(upto)) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
573 expected_out = f"{nprocs} pids, total = {total}" 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
574 self.make_file("multi.py", code) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
575 self.make_file( 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
576 "multi.rc",
577 """\
578 [run]
579 concurrency = multiprocessing
580 branch = True
581 omit = */site-packages/*
582 """,
583 )
585 out = self.run_command(f"coverage run --rcfile=multi.rc multi.py {start_method}") 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
586 assert out.rstrip() == expected_out 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
588 out = self.run_command("coverage combine -q") # sneak in a test of -q 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
589 assert out == "" 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
590 out = self.run_command("coverage report -m") 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
592 last_line = self.squeezed_lines(out)[-1] 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
593 assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line) 1xh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(
595 def test_multiprocessing_bootstrap_error_handling(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
596 # An exception during bootstrapping will be reported.
597 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
598 "multi.py",
599 """\
600 import multiprocessing
601 if __name__ == "__main__":
602 with multiprocessing.Manager():
603 pass
604 """,
605 )
606 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
607 ".coveragerc",
608 """\
609 [run]
610 concurrency = multiprocessing
611 _crash = _bootstrap
612 """,
613 )
614 out = self.run_command("coverage run multi.py", status=1) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
615 assert "Exception during multiprocessing bootstrap init" in out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
616 assert "RuntimeError: Crashing because called by _bootstrap" in out 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
618 def test_bug_890(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
619 # chdir in multiprocessing shouldn't keep us from finding the
620 # .coveragerc file.
621 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
622 "multi.py",
623 """\
624 import multiprocessing, os, os.path
625 if __name__ == "__main__":
626 if not os.path.exists("./tmp"): os.mkdir("./tmp")
627 os.chdir("./tmp")
628 with multiprocessing.Manager():
629 pass
630 print("ok")
631 """,
632 )
633 self.make_file( 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
634 ".coveragerc",
635 """\
636 [run]
637 concurrency = multiprocessing
638 """,
639 )
640 out = self.run_command("coverage run multi.py") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
641 assert out.splitlines()[-1] == "ok" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
644@pytest.mark.skipif(not testenv.SETTRACE_CORE, reason="gettrace is not supported with this core.") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
645def test_coverage_stop_in_threads() -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
646 has_started_coverage = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
647 has_stopped_coverage = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
649 def run_thread() -> None: # pragma: nested 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
650 """Check that coverage is stopping properly in threads."""
651 deadline = time.time() + 5
652 ident = threading.current_thread().ident
653 if sys.gettrace() is not None:
654 has_started_coverage.append(ident)
655 while sys.gettrace() is not None:
656 # Wait for coverage to stop
657 time.sleep(0.01)
658 if time.time() > deadline:
659 return
660 has_stopped_coverage.append(ident)
662 cov = coverage.Coverage() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
663 with cov.collect(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6SxhwgZPldiaYImejbQJnfkcRKqrT
664 t = threading.Thread(target=run_thread) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
665 t.start() 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
667 time.sleep(0.1) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
668 t.join() 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT
670 assert has_started_coverage == [t.ident] 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT
671 assert has_stopped_coverage == [t.ident] 1LyEsUMzFtVNAGuWOBHvXDpCoSxhwgPldiaImejbQJnfkcRKqrT
674def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
675 # Non-regression test for: https://github.com/coveragepy/coveragepy/issues/581
677 # Create some Python modules and put them in the path
678 modules_dir = tmp_path / "test_modules" 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
679 modules_dir.mkdir() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
680 module_names = [f"m{i:03d}" for i in range(1000)] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
681 for module_name in module_names: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
682 (modules_dir / (module_name + ".py")).write_text("def f(): pass\n", encoding="utf-8") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
684 # Shared variables for threads
685 should_run = [True] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
686 imported = [] 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
688 old_dir = os.getcwd() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
689 os.chdir(modules_dir) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
690 try: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
691 # Make sure that all dummy modules can be imported.
692 for module_name in module_names: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
693 import_local_file(module_name) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
695 def random_load() -> None: # pragma: nested 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
696 """Import modules randomly to stress coverage."""
697 while should_run[0]:
698 module_name = random.choice(module_names)
699 mod = import_local_file(module_name)
700 mod.f()
701 imported.append(mod)
703 # Spawn some threads with coverage enabled and attempt to read the
704 # results right after stopping coverage collection with the threads
705 # still running.
706 duration = 0.01 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
707 for _ in range(3): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
708 cov = coverage.Coverage() 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
709 with cov.collect(): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
710 threads = [threading.Thread(target=random_load) for _ in range(10)] 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
711 should_run[0] = True 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
712 for t in threads: 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
713 t.start() 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
715 time.sleep(duration) 1ysUztVAuWBvXpoShgPdaIebJfcKqrT
717 # The following call used to crash with running background threads.
718 cov.get_data() 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
720 # Stop the threads
721 should_run[0] = False 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
722 for t in threads: 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
723 t.join() 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
725 if (not imported) and duration < 10: # pragma: only failure 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
726 duration *= 2
728 finally:
729 os.chdir(old_dir) 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
730 should_run[0] = False 1LyEsUMzFtVNAGu9WOBHv!XDpCo6SxhwgZPldiaImejbQJnfkcRKqrT
733@pytest.mark.skipif(env.WINDOWS, reason="SIGTERM doesn't work the same on Windows") 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
734@pytest.mark.flaky(max_runs=3) # Sometimes a test fails due to inherent randomness. Try more times. 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
735class SigtermTest(CoverageTest): 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
736 """Tests of our handling of SIGTERM."""
738 @pytest.mark.parametrize("sigterm", [False, True]) 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
739 def test_sigterm_multiprocessing_saves_data(self, sigterm: bool) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
740 # A terminated process should save its coverage data.
741 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
742 "clobbered.py",
743 """\
744 import multiprocessing
745 import time
747 def subproc(x):
748 if x.value == 3:
749 print("THREE", flush=True) # line 6, missed
750 else:
751 print("NOT THREE", flush=True)
752 x.value = 0
753 time.sleep(60)
755 if __name__ == "__main__":
756 print("START", flush=True)
757 x = multiprocessing.Value("L", 1)
758 proc = multiprocessing.Process(target=subproc, args=(x,))
759 proc.start()
760 while x.value != 0:
761 time.sleep(.05)
762 proc.terminate()
763 print("END", flush=True)
764 """,
765 )
766 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
767 ".coveragerc",
768 """\
769 [run]
770 parallel = True
771 concurrency = multiprocessing
772 """
773 + ("sigterm = true" if sigterm else ""),
774 )
775 out = self.run_command("coverage run clobbered.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
776 # Under Linux, things go wrong. Does that matter?
777 if env.LINUX and "assert self._collectors" in out: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
778 lines = out.splitlines(True) 1EsFtGuHvCoqr
779 out = "".join(lines[:3]) 1EsFtGuHvCoqr
780 assert out == "START\nNOT THREE\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
781 self.run_command("coverage combine") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
782 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
783 if sigterm: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
784 expected = "clobbered.py 17 1 94% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
785 else:
786 expected = "clobbered.py 17 5 71% 5-10" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
787 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
789 def test_sigterm_threading_saves_data(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
790 # A terminated process should save its coverage data.
791 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
792 "handler.py",
793 """\
794 import os, signal
796 print("START", flush=True)
797 print("SIGTERM", flush=True)
798 os.kill(os.getpid(), signal.SIGTERM)
799 print("NOT HERE", flush=True)
800 """,
801 )
802 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
803 ".coveragerc",
804 """\
805 [run]
806 # The default concurrency option.
807 concurrency = thread
808 sigterm = true
809 """,
810 )
811 status, out = self.run_command_status("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
812 assert status != 0 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
813 out_lines = out.splitlines() 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
814 assert len(out_lines) in [2, 3] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
815 assert out_lines[:2] == ["START", "SIGTERM"] 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
816 if len(out_lines) == 3: 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
817 assert out_lines[2] == "Terminated" 1EsFtGuHvCowg7ia0jb1kc2qr
818 out = self.run_command("coverage report -m") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
819 expected = "handler.py 5 1 80% 6" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
820 assert self.squeezed_lines(out)[2] == expected 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
822 def test_sigterm_still_runs(self) -> None: 1LyEs#UMzFt$VNAGu9WOBHv!XDpCo6Sxh8wg7ZP)ld3ia0YI%me4jb1QJ'nf5kc2RK(qrT
823 # A terminated process still runs its own SIGTERM handler.
824 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
825 "handler.py",
826 """\
827 import multiprocessing
828 import signal
829 import time
831 def subproc(x):
832 print("START", flush=True)
833 def on_sigterm(signum, frame):
834 print("SIGTERM", flush=True)
836 signal.signal(signal.SIGTERM, on_sigterm)
837 x.value = 0
838 try:
839 time.sleep(.1)
840 except OSError: # This happens on PyPy3.11 on Mac
841 pass
842 print("END", flush=True)
844 if __name__ == "__main__":
845 x = multiprocessing.Value("L", 1)
846 proc = multiprocessing.Process(target=subproc, args=(x,))
847 proc.start()
848 while x.value != 0:
849 time.sleep(.02)
850 proc.terminate()
851 """,
852 )
853 self.make_file( 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
854 ".coveragerc",
855 """\
856 [run]
857 parallel = True
858 concurrency = multiprocessing
859 sigterm = True
860 """,
861 )
862 out = self.run_command("coverage run handler.py") 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr
863 assert out == "START\nSIGTERM\nEND\n" 1LyEsMzFtNAGuOBHvDpCoxh8wg7ld3ia0me4jb1nf5kc2qr