调用 Pythons mutliprocessing.Process.join() 失败

Call to Pythons mutliprocessing.Process.join() fails

当我运行下面的代码偶尔会失败

time_start = time.time()
job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
job.start() # This is line 37 in the source code linked below
# timeout=None in the call to join() solves the problem
job.join(deadline)
elapsed = time.time()-time_start
if elapsed < deadline and job.is_alive():
    # I am getting here from time to time
    logger.error(f"#{job_counter}: job.join() returned while process {job.pid} is still alive elapsed={elapsed} deadline={deadline}")

显示问题的 Python 3.7 容器 (Docker) 在这里 https://github.com/larytet-py/multiprocess 如果我 运行 在 4 核 Ubuntu 18.04 主机上运行几分钟的代码,我会得到

Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "main.py", line 37, in spawn_job
    job.start()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 111, in start
    _cleanup()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 56, in _cleanup
    if p._popen.poll() is not None:
AttributeError: 'NoneType' object has no attribute 'poll'

我做错了什么? 我的解决方法是用轮询替换对 job.join() 的调用并检查 is_alive()。不幸的是,这种方法会影响延迟。有更好的选择吗?

def join_process(job, timeout):
    time_start = time.time()
    # Typical processing time is 100ms I want to reduce latency impact
    # 10ms looks ok. 
    # TODO I can end up in a tight loop here. 
    polling_time = min(0.1*timeout, 0.010)
    while time.time()-time_start < timeout and job.is_alive():
        time.sleep(polling_time)
        continue

更新。我试过 multiprocessing.Event() 而不是 Process.join() 代码失败并出现相同的异常

更新2。我已经在根本不调用 Process.join() 的代码中重现了这个问题。它需要更多时间和更多负载,但最终 Process.start() 会崩溃。

更新3。 https://bugs.python.org/issue40860 被接受了吗?我仍在寻找解决方法。

同步对 Process.start() 的调用会有所帮助。这是一个公平的解决方法。没有其他答案。我接受我自己的回答。

diff --git a/main.py b/main.py
index d09dc53..49d68f0 100644
--- a/main.py
+++ b/main.py
@@ -26,17 +26,24 @@ def load_cpu(deadline):
     while time.time() - start < 0.2*deadline:
         math.pow(random.randint(0, 1), random.randint(0, 1))
 
+def join_process(job, timeout):
+    time_start = time.time()
+    while time.time()-time_start < timeout and job.is_alive():
+        time.sleep(0.1   * timeout)
+        continue
+
 job_counter = 0
+lock = threading.Lock()
 def spawn_job(deadline):
     '''
     Creat a new Process, call join(), process errors
     '''    
     global job_counter
     time_start = time.time()
-    job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
-    job.start()
-    # timeout=None in the call to join() solves the problem
-    job.join(deadline)
+    with lock:
+        job = multiprocessing.Process(target=load_cpu, args=(deadline, ))
+        job.start()
+    join_process(job, deadline)

我的最终版本使用 os.fork()。我完全放弃了多处理。多进程不是线程安全的(我不是在开玩笑)https://gist.github.com/larytet/3ca9f9a32b1dc089a24cb7011455141f