调用 Pythons mutliprocessing.Process.join() 失败
Call to Pythons mutliprocessing.Process.join() fails
当我运行下面的代码偶尔会失败
time_start = time.time()
job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
job.start() # This is line 37 in the source code linked below
# timeout=None in the call to join() solves the problem
job.join(deadline)
elapsed = time.time()-time_start
if elapsed < deadline and job.is_alive():
# I am getting here from time to time
logger.error(f"#{job_counter}: job.join() returned while process {job.pid} is still alive elapsed={elapsed} deadline={deadline}")
显示问题的 Python 3.7 容器 (Docker) 在这里 https://github.com/larytet-py/multiprocess
如果我 运行 在 4 核 Ubuntu 18.04 主机上运行几分钟的代码,我会得到
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "main.py", line 37, in spawn_job
job.start()
File "/usr/lib/python3.7/multiprocessing/process.py", line 111, in start
_cleanup()
File "/usr/lib/python3.7/multiprocessing/process.py", line 56, in _cleanup
if p._popen.poll() is not None:
AttributeError: 'NoneType' object has no attribute 'poll'
我做错了什么?
我的解决方法是用轮询替换对 job.join() 的调用并检查 is_alive()。不幸的是,这种方法会影响延迟。有更好的选择吗?
def join_process(job, timeout):
time_start = time.time()
# Typical processing time is 100ms I want to reduce latency impact
# 10ms looks ok.
# TODO I can end up in a tight loop here.
polling_time = min(0.1*timeout, 0.010)
while time.time()-time_start < timeout and job.is_alive():
time.sleep(polling_time)
continue
更新。我试过 multiprocessing.Event() 而不是 Process.join() 代码失败并出现相同的异常
更新2。我已经在根本不调用 Process.join() 的代码中重现了这个问题。它需要更多时间和更多负载,但最终 Process.start() 会崩溃。
更新3。 https://bugs.python.org/issue40860 被接受了吗?我仍在寻找解决方法。
同步对 Process.start() 的调用会有所帮助。这是一个公平的解决方法。没有其他答案。我接受我自己的回答。
diff --git a/main.py b/main.py
index d09dc53..49d68f0 100644
--- a/main.py
+++ b/main.py
@@ -26,17 +26,24 @@ def load_cpu(deadline):
while time.time() - start < 0.2*deadline:
math.pow(random.randint(0, 1), random.randint(0, 1))
+def join_process(job, timeout):
+ time_start = time.time()
+ while time.time()-time_start < timeout and job.is_alive():
+ time.sleep(0.1 * timeout)
+ continue
+
job_counter = 0
+lock = threading.Lock()
def spawn_job(deadline):
'''
Creat a new Process, call join(), process errors
'''
global job_counter
time_start = time.time()
- job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
- job.start()
- # timeout=None in the call to join() solves the problem
- job.join(deadline)
+ with lock:
+ job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
+ job.start()
+ join_process(job, deadline)
我的最终版本使用 os.fork()。我完全放弃了多处理。多进程不是线程安全的(我不是在开玩笑)https://gist.github.com/larytet/3ca9f9a32b1dc089a24cb7011455141f
当我运行下面的代码偶尔会失败
time_start = time.time()
job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
job.start() # This is line 37 in the source code linked below
# timeout=None in the call to join() solves the problem
job.join(deadline)
elapsed = time.time()-time_start
if elapsed < deadline and job.is_alive():
# I am getting here from time to time
logger.error(f"#{job_counter}: job.join() returned while process {job.pid} is still alive elapsed={elapsed} deadline={deadline}")
显示问题的 Python 3.7 容器 (Docker) 在这里 https://github.com/larytet-py/multiprocess 如果我 运行 在 4 核 Ubuntu 18.04 主机上运行几分钟的代码,我会得到
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "main.py", line 37, in spawn_job
job.start()
File "/usr/lib/python3.7/multiprocessing/process.py", line 111, in start
_cleanup()
File "/usr/lib/python3.7/multiprocessing/process.py", line 56, in _cleanup
if p._popen.poll() is not None:
AttributeError: 'NoneType' object has no attribute 'poll'
我做错了什么? 我的解决方法是用轮询替换对 job.join() 的调用并检查 is_alive()。不幸的是,这种方法会影响延迟。有更好的选择吗?
def join_process(job, timeout):
time_start = time.time()
# Typical processing time is 100ms I want to reduce latency impact
# 10ms looks ok.
# TODO I can end up in a tight loop here.
polling_time = min(0.1*timeout, 0.010)
while time.time()-time_start < timeout and job.is_alive():
time.sleep(polling_time)
continue
更新。我试过 multiprocessing.Event() 而不是 Process.join() 代码失败并出现相同的异常
更新2。我已经在根本不调用 Process.join() 的代码中重现了这个问题。它需要更多时间和更多负载,但最终 Process.start() 会崩溃。
更新3。 https://bugs.python.org/issue40860 被接受了吗?我仍在寻找解决方法。
同步对 Process.start() 的调用会有所帮助。这是一个公平的解决方法。没有其他答案。我接受我自己的回答。
diff --git a/main.py b/main.py
index d09dc53..49d68f0 100644
--- a/main.py
+++ b/main.py
@@ -26,17 +26,24 @@ def load_cpu(deadline):
while time.time() - start < 0.2*deadline:
math.pow(random.randint(0, 1), random.randint(0, 1))
+def join_process(job, timeout):
+ time_start = time.time()
+ while time.time()-time_start < timeout and job.is_alive():
+ time.sleep(0.1 * timeout)
+ continue
+
job_counter = 0
+lock = threading.Lock()
def spawn_job(deadline):
'''
Creat a new Process, call join(), process errors
'''
global job_counter
time_start = time.time()
- job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
- job.start()
- # timeout=None in the call to join() solves the problem
- job.join(deadline)
+ with lock:
+ job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
+ job.start()
+ join_process(job, deadline)
我的最终版本使用 os.fork()。我完全放弃了多处理。多进程不是线程安全的(我不是在开玩笑)https://gist.github.com/larytet/3ca9f9a32b1dc089a24cb7011455141f