Python multiprocessing.Queue.get 在第一次调用时引发 EOFError

Python multiprocessing.Queue.get raises EOFError on first call

我在 ubuntu 20.04 OS 上使用 python 3.7。我的问题陈述类似于生产者和消费者问题,其中有一对 reader 和 writer 进程。我的 reader 进程在无限循环中调用 Queue.get,(根据文档,Queue.get 会阻塞调用,直到任何数据被另一个进程放入队列中)。

进行此调用会引发 EOFError。

reader.py

import multiprocessing as mp

def reader(queue):
    while True:
        try:
            data = queue.get()
        except Exception as e:
            print(f'Exception occurred: {e}')
        # Do something

queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks

运行 这导致

    Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
    kind, result = conn.recv()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO     Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR    BrokenPipeError
Traceback (most recent call last):
  File "/usr/src/app/src/processor.py", line 775, in classification_manager
    slot_data = classification_queue.get()
  File "<string>", line 2, in get
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

与您的代码等效的线程工作正常:

import threading as th
from queue import Queue
from time import sleep

def reader(queue):
    while True:
        data = queue.get()
        print ("Reader saw",data)
        # Do something

queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()

queue.put("Expect More Data")

call_count = 0
while True:
    sleep(2)
    call_count += 1
    queue.put(call_count)

如果我只是详细说明您的代码(目前无法运行),那么就完全没有问题。

import multiprocessing as mp

def reader(queue):
    while (data := queue.get()) != 'stop':
        print(data)

def main():
    queue = mp.Manager().Queue()
    p = mp.Process(target=reader, args=(queue,))
    p.start()
    for some_data in ['Hello', 'world', 'stop']:
        queue.put(some_data)
    p.join()


if __name__ == '__main__':
    main()

输出:

Hello
world

我得到了答案。在我的例子中,Linux OOM 杀手正在杀死父进程。这就是为什么子进程得到这个 EOFError 然后是 BrokenPipeError。

您可以阅读 linux OOM 杀手 here