为什么只有在特定多进程场景下访问共享列表时才会出现"Broken pipe"错误?

Why does "Broken pipe" error occur only while accessing a shared list in a specific scenario of multiprocessing?

在我开始提问之前,让我提一下,我已经知道以下多处理代码已损坏。其中有 TOCTOU 个错误。下面的代码是为了我的教学目的,这样我就可以更多地了解代码是如何被破坏的。所以我的问题是关于损坏代码的一个特定方面。首先,让我展示一下我的代码。

目前,您可以完全忽略 worker_b,因为我们现在不在任何地方使用它。我们稍后再讨论。

import Queue
import multiprocessing
import time

lock = multiprocessing.Lock()

def pprint(s):
    lock.acquire()
    print(s)
    lock.release()

def worker_a(i, stack):
    if stack:
        data = stack.pop()
        pprint('worker %d got %d' % (i, data))
        time.sleep(2)
        pprint('worker %d exiting ...' % i)
    else:
        pprint('worker %d has nothing to do!' % i)

def worker_b(i, stack):
    if stack:
        data = stack.pop()
        pprint('worker %d got %d (stack length: %d)' % (i, data, len(stack)))
        time.sleep(2)
        pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack)))
    else:
        pprint('worker %d has nothing to do!' % i)

manager = multiprocessing.Manager()
stack = manager.list()

def master():
    for i in range(5):
        stack.append(i)
        pprint('master put %d' % i)

    i = 0
    while stack:
        t = multiprocessing.Process(target=worker_a, args=(i, stack))
        t.start()
        time.sleep(1)
        i += 1

    pprint('master returning ...')

master()

pprint('master returned!')

以上损坏的代码似乎工作正常。

$ python mplifo.py 
master put 0
master put 1
master put 2
master put 3
master put 4
worker 0 got 4
worker 1 got 3
worker 0 exiting ...
worker 2 got 2
worker 1 exiting ...
worker 3 got 1
worker 2 exiting ...
worker 4 got 0
worker 3 exiting ...
master returning ...
master returned!
worker 4 exiting ...

但是,如果我调用 worker_b 而不是 worker_a,即更改

        t = multiprocessing.Process(target=worker_a, args=(i, stack))

        t = multiprocessing.Process(target=worker_b, args=(i, stack))

出现以下错误。

$ python mplifo.py
master put 0
master put 1
master put 2
master put 3
master put 4
worker 0 got 4 (stack length: 4)
worker 1 got 3 (stack length: 3)
worker 0 exiting ... (stack length: 3)
worker 2 got 2 (stack length: 2)
worker 1 exiting ... (stack length: 2)
worker 3 got 1 (stack length: 1)
worker 2 exiting ... (stack length: 1)
worker 4 got 0 (stack length: 0)
worker 3 exiting ... (stack length: 0)
master returning ...
master returned!
Process Process-6:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "mplifo.py", line 27, in worker_b
    pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack)))
  File "<string>", line 2, in __len__
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe

这部分回溯给你一个提示:

  File "mplifo.py", line 27, in worker_b
    pprint('worker %d exiting ... (stack length: %d)' % (i, len(stack)))
  File "<string>", line 2, in __len__
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
    conn.send((self._id, methodname, args, kwds))

在工作进程中,stack 不是 Python 列表。它是由 multiprocessing.Manager 创建的代理,它包装了驻留在主进程中的列表。当最后一个 worker_b 退出时,它计算 len(stack),代理必须向主进程请求。但是那个时候 master 已经退出了——到它的通信管道坏了。

这不会在 worker_a 中发生,因为它不会在退出前尝试计算 len(stack)