我可以在进程内使用 `multiprocessing.Queue` 进行通信吗?

Can I use a `multiprocessing.Queue` for communication within a process?

我正在使用队列进行线程间通信。我使用 multiprocessing.Queue() 而不是 queue.Queue() 因为 multiprocessing 版本公开了一个底层文件描述符,可以用 select.select 等待 - 这意味着我可以阻止等待对象在队列中 一个数据包从同一线程到达网络接口。

但是当我尝试从队列中获取一个对象时,我得到了这个:

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

有办法吗?或者我是否坚持使用 queue.Queue() 并在套接字上有一个单独的线程 select.select() 并将结果放入队列?

编辑:我认为这是最小的可重现示例:

import multiprocessing
import threading

queue = multiprocessing.Queue()

class Msg():
    def __init__(self):
        self.lock = threading.Lock()

def source():
    queue.put(Msg())

def sink():
    obj = queue.get()
    print("Got")

threading.Thread(target=sink).start()
source()

问题是我放入队列的对象有一个 threading.Lock 对象作为字段(在多个组合层次上)。

TL;DR: threading.Lock 实例根本无法被 pickle 并且 pickle 用于序列化一个对象 putmultiprocessing.Queue 实例。但是通过 multiprocessing.Queue 将对象传递给另一个线程的价值很小,因为线程检索成为该对象的新实例的内容,除非创建该对象的副本是您目标的一部分。因此,如果您确实通过队列传递对象,那么锁就不能成为对象状态的一部分,您需要另一种方法(见下文)。

(很多)更长的答案

First, as your error message states threading.Lock` 实例不能用 pickle 序列化。这也可以很容易地证明:

>>> import pickle
>>> import threading
>>> lock = threading.Lock()
>>> serialized_lock = pickle.dumps(lock)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_thread.lock' object

其次,当您将对象放入 threading.Queue 实例时,该对象会使用 pickle 进行序列化,因此会出现上述异常。

但是,尽管您的帖子构成了一个最小的完整示例,但它并不代表可以执行任何有用操作的实际程序。你到底想完成什么?假设您能够序列化一个锁,并因此通过队列传递 Msg 的一个实例。大概锁是为了序列化一些更新对象状态的代码。但是,由于这是 Msg 的实例,而不是队列中 put 的实例,因此如果此 sink 线程创建了其他线程,则此锁的唯一有意义的用途是在其上运行这个例子。所以我们假设有一个属性,x需要在多个线程中递增。这将需要锁定,因为 += 运算符不是原子的。由于如果通过队列传递所需的锁不能成为对象状态的一部分,那么您必须单独创建锁。这只是众多可能方法中的一种:

import multiprocessing
import threading

queue = multiprocessing.Queue()

class Msg():
    def __init__(self):
        self.x = 0

    def set_lock(self, lock):
        self.lock = lock

    def compute(self):
        with self.lock:
            self.x += 1

def source():
    queue.put(Msg())

def sink():
    msg = queue.get()
    msg.set_lock(threading.Lock())
    t = threading.Thread(target=msg.compute)
    t.start()
    msg.compute()
    t.join()
    print(msg.x)


threading.Thread(target=sink).start()
source()

打印:

2

如果您不使用队列进行对象传递,那么将锁作为对象初始状态的一部分没有问题:

import queue
import socket
import os
import select
import threading

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

class Msg:
    def __init__(self, q, socket):
        # An instance of this class could be passed via a multithreading.Queue
        # A multiprocessing.Lock could also be used but is not
        # necessary if we are doing threading:
        self.lock = threading.Lock() # to be used by some method not shown
        self.q = q
        self.socket = socket

    def consume(self):
        while True:
            can_read, _, _ = select.select([q, read_socket], [], [])
            for r in can_read:
                item = r.get() if isinstance(r, queue.Queue) else r.recv(3).decode()
                print('Got:', item, 'from', type(r))

# Example code that performs polling:

if __name__ == '__main__':
    import threading
    import time

    q = PollableQueue()
    write_socket, read_socket = socket.socketpair()
    msg = Msg(q, read_socket)
    t = threading.Thread(target=msg.consume, daemon=True)
    t.start()

    # Feed data to the queues
    q.put('abc')
    write_socket.send(b'cde')
    write_socket.send(b'fgh')
    q.put('ijk')

    # Give consumer time to get all the items:
    time.sleep(1)

打印:

Got: abc from <class '__main__.PollableQueue'>
Got: ijk from <class '__main__.PollableQueue'>
Got: cde from <class 'socket.socket'>
Got: fgh from <class 'socket.socket'>