Python multiprocessing.Queue.get 在第一次调用时引发 EOFError
Python multiprocessing.Queue.get raises EOFError on first call
我在 ubuntu 20.04 OS 上使用 python 3.7。我的问题陈述类似于生产者和消费者问题,其中有一对 reader 和 writer 进程。我的 reader 进程在无限循环中调用 Queue.get,(根据文档,Queue.get 会阻塞调用,直到任何数据被另一个进程放入队列中)。
进行此调用会引发 EOFError。
reader.py
import multiprocessing as mp
def reader(queue):
while True:
try:
data = queue.get()
except Exception as e:
print(f'Exception occurred: {e}')
# Do something
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks
运行 这导致
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
kind, result = conn.recv()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR BrokenPipeError
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
与您的代码等效的线程工作正常:
import threading as th
from queue import Queue
from time import sleep
def reader(queue):
while True:
data = queue.get()
print ("Reader saw",data)
# Do something
queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()
queue.put("Expect More Data")
call_count = 0
while True:
sleep(2)
call_count += 1
queue.put(call_count)
如果我只是详细说明您的代码(目前无法运行),那么就完全没有问题。
import multiprocessing as mp
def reader(queue):
while (data := queue.get()) != 'stop':
print(data)
def main():
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
for some_data in ['Hello', 'world', 'stop']:
queue.put(some_data)
p.join()
if __name__ == '__main__':
main()
输出:
Hello
world
我得到了答案。在我的例子中,Linux OOM 杀手正在杀死父进程。这就是为什么子进程得到这个 EOFError 然后是 BrokenPipeError。
您可以阅读 linux OOM 杀手 here。
我在 ubuntu 20.04 OS 上使用 python 3.7。我的问题陈述类似于生产者和消费者问题,其中有一对 reader 和 writer 进程。我的 reader 进程在无限循环中调用 Queue.get,(根据文档,Queue.get 会阻塞调用,直到任何数据被另一个进程放入队列中)。
进行此调用会引发 EOFError。
reader.py
import multiprocessing as mp
def reader(queue):
while True:
try:
data = queue.get()
except Exception as e:
print(f'Exception occurred: {e}')
# Do something
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
# Prepare some data to send
queue.put(some_data)
# Do my own tasks
运行 这导致
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 819, in _callmethod
kind, result = conn.recv()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
2022-02-18 20:45:01,528 classification_1 INFO Waiting for Data!
2022-02-18 20:45:01,528 classification_1 ERROR BrokenPipeError
Traceback (most recent call last):
File "/usr/src/app/src/processor.py", line 775, in classification_manager
slot_data = classification_queue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.7/multiprocessing/managers.py", line 818, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
与您的代码等效的线程工作正常:
import threading as th
from queue import Queue
from time import sleep
def reader(queue):
while True:
data = queue.get()
print ("Reader saw",data)
# Do something
queue = Queue()
p = th.Thread(target=reader, args=(queue,))
p.start()
queue.put("Expect More Data")
call_count = 0
while True:
sleep(2)
call_count += 1
queue.put(call_count)
如果我只是详细说明您的代码(目前无法运行),那么就完全没有问题。
import multiprocessing as mp
def reader(queue):
while (data := queue.get()) != 'stop':
print(data)
def main():
queue = mp.Manager().Queue()
p = mp.Process(target=reader, args=(queue,))
p.start()
for some_data in ['Hello', 'world', 'stop']:
queue.put(some_data)
p.join()
if __name__ == '__main__':
main()
输出:
Hello
world
我得到了答案。在我的例子中,Linux OOM 杀手正在杀死父进程。这就是为什么子进程得到这个 EOFError 然后是 BrokenPipeError。
您可以阅读 linux OOM 杀手 here。