Skip to content

Commit d22276d

Browse files
miss-islingtongraingertblurb-it[bot]gpshead
authored
[3.13] gh-131788: make resource_tracker re-entrant safe (GH-131787) (#137738)
gh-131788: make resource_tracker re-entrant safe (GH-131787) (cherry picked from commit f24a012) Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Gregory P. Smith <[email protected]>
1 parent 9417ea5 commit d22276d

File tree

2 files changed

+94
-70
lines changed

2 files changed

+94
-70
lines changed

Lib/multiprocessing/resource_tracker.py

Lines changed: 93 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import sys
2121
import threading
2222
import warnings
23+
from collections import deque
2324

2425
from . import spawn
2526
from . import util
@@ -66,6 +67,7 @@ def __init__(self):
6667
self._fd = None
6768
self._pid = None
6869
self._exitcode = None
70+
self._reentrant_messages = deque()
6971

7072
def _reentrant_call_error(self):
7173
# gh-109629: this happens if an explicit call to the ResourceTracker
@@ -102,7 +104,7 @@ def _stop_locked(
102104
# This shouldn't happen (it might when called by a finalizer)
103105
# so we check for it anyway.
104106
if self._lock._recursion_count() > 1:
105-
return self._reentrant_call_error()
107+
raise self._reentrant_call_error()
106108
if self._fd is None:
107109
# not running
108110
return
@@ -132,69 +134,99 @@ def ensure_running(self):
132134
133135
This can be run from any process. Usually a child process will use
134136
the resource created by its parent.'''
137+
return self._ensure_running_and_write()
138+
139+
def _teardown_dead_process(self):
140+
os.close(self._fd)
141+
142+
# Clean-up to avoid dangling processes.
143+
try:
144+
# _pid can be None if this process is a child from another
145+
# python process, which has started the resource_tracker.
146+
if self._pid is not None:
147+
os.waitpid(self._pid, 0)
148+
except ChildProcessError:
149+
# The resource_tracker has already been terminated.
150+
pass
151+
self._fd = None
152+
self._pid = None
153+
self._exitcode = None
154+
155+
warnings.warn('resource_tracker: process died unexpectedly, '
156+
'relaunching. Some resources might leak.')
157+
158+
def _launch(self):
159+
fds_to_pass = []
160+
try:
161+
fds_to_pass.append(sys.stderr.fileno())
162+
except Exception:
163+
pass
164+
r, w = os.pipe()
165+
try:
166+
fds_to_pass.append(r)
167+
# process will out live us, so no need to wait on pid
168+
exe = spawn.get_executable()
169+
args = [
170+
exe,
171+
*util._args_from_interpreter_flags(),
172+
'-c',
173+
f'from multiprocessing.resource_tracker import main;main({r})',
174+
]
175+
# bpo-33613: Register a signal mask that will block the signals.
176+
# This signal mask will be inherited by the child that is going
177+
# to be spawned and will protect the child from a race condition
178+
# that can make the child die before it registers signal handlers
179+
# for SIGINT and SIGTERM. The mask is unregistered after spawning
180+
# the child.
181+
prev_sigmask = None
182+
try:
183+
if _HAVE_SIGMASK:
184+
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
185+
pid = util.spawnv_passfds(exe, args, fds_to_pass)
186+
finally:
187+
if prev_sigmask is not None:
188+
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
189+
except:
190+
os.close(w)
191+
raise
192+
else:
193+
self._fd = w
194+
self._pid = pid
195+
finally:
196+
os.close(r)
197+
198+
def _ensure_running_and_write(self, msg=None):
135199
with self._lock:
136200
if self._lock._recursion_count() > 1:
137201
# The code below is certainly not reentrant-safe, so bail out
138-
return self._reentrant_call_error()
202+
if msg is None:
203+
raise self._reentrant_call_error()
204+
return self._reentrant_messages.append(msg)
205+
139206
if self._fd is not None:
140207
# resource tracker was launched before, is it still running?
141-
if self._check_alive():
142-
# => still alive
143-
return
144-
# => dead, launch it again
145-
os.close(self._fd)
146-
147-
# Clean-up to avoid dangling processes.
208+
if msg is None:
209+
to_send = b'PROBE:0:noop\n'
210+
else:
211+
to_send = msg
148212
try:
149-
# _pid can be None if this process is a child from another
150-
# python process, which has started the resource_tracker.
151-
if self._pid is not None:
152-
os.waitpid(self._pid, 0)
153-
except ChildProcessError:
154-
# The resource_tracker has already been terminated.
155-
pass
156-
self._fd = None
157-
self._pid = None
158-
self._exitcode = None
213+
self._write(to_send)
214+
except OSError:
215+
self._teardown_dead_process()
216+
self._launch()
159217

160-
warnings.warn('resource_tracker: process died unexpectedly, '
161-
'relaunching. Some resources might leak.')
218+
msg = None # message was sent in probe
219+
else:
220+
self._launch()
162221

163-
fds_to_pass = []
222+
while True:
164223
try:
165-
fds_to_pass.append(sys.stderr.fileno())
166-
except Exception:
167-
pass
168-
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
169-
r, w = os.pipe()
170-
try:
171-
fds_to_pass.append(r)
172-
# process will out live us, so no need to wait on pid
173-
exe = spawn.get_executable()
174-
args = [exe] + util._args_from_interpreter_flags()
175-
args += ['-c', cmd % r]
176-
# bpo-33613: Register a signal mask that will block the signals.
177-
# This signal mask will be inherited by the child that is going
178-
# to be spawned and will protect the child from a race condition
179-
# that can make the child die before it registers signal handlers
180-
# for SIGINT and SIGTERM. The mask is unregistered after spawning
181-
# the child.
182-
prev_sigmask = None
183-
try:
184-
if _HAVE_SIGMASK:
185-
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
186-
pid = util.spawnv_passfds(exe, args, fds_to_pass)
187-
finally:
188-
if prev_sigmask is not None:
189-
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
190-
except:
191-
os.close(w)
192-
raise
193-
else:
194-
self._fd = w
195-
self._pid = pid
196-
finally:
197-
os.close(r)
224+
reentrant_msg = self._reentrant_messages.popleft()
225+
except IndexError:
226+
break
227+
self._write(reentrant_msg)
228+
if msg is not None:
229+
self._write(msg)
198230

199231
def _check_alive(self):
200232
'''Check that the pipe has not been closed by sending a probe.'''
@@ -215,27 +247,18 @@ def unregister(self, name, rtype):
215247
'''Unregister name of resource with resource tracker.'''
216248
self._send('UNREGISTER', name, rtype)
217249

250+
def _write(self, msg):
251+
nbytes = os.write(self._fd, msg)
252+
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
253+
218254
def _send(self, cmd, name, rtype):
219-
try:
220-
self.ensure_running()
221-
except ReentrantCallError:
222-
# The code below might or might not work, depending on whether
223-
# the resource tracker was already running and still alive.
224-
# Better warn the user.
225-
# (XXX is warnings.warn itself reentrant-safe? :-)
226-
warnings.warn(
227-
f"ResourceTracker called reentrantly for resource cleanup, "
228-
f"which is unsupported. "
229-
f"The {rtype} object {name!r} might leak.")
230-
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
255+
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
231256
if len(msg) > 512:
232257
# posix guarantees that writes to a pipe of less than PIPE_BUF
233258
# bytes are atomic, and that PIPE_BUF >= 512
234259
raise ValueError('msg too long')
235-
nbytes = os.write(self._fd, msg)
236-
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
237-
nbytes, len(msg))
238260

261+
self._ensure_running_and_write(msg)
239262

240263
_resource_tracker = ResourceTracker()
241264
ensure_running = _resource_tracker.ensure_running
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe

0 commit comments

Comments
 (0)