我可以在进程内使用 `multiprocessing.Queue` 进行通信吗?
Can I use a `multiprocessing.Queue` for communication within a process?
我正在使用队列进行线程间通信。我使用 multiprocessing.Queue()
而不是 queue.Queue()
因为 multiprocessing
版本公开了一个底层文件描述符,可以用 select.select
等待 - 这意味着我可以阻止等待对象在队列中 或 一个数据包从同一线程到达网络接口。
但是当我尝试从队列中获取一个对象时,我得到了这个:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
有办法吗?或者我是否坚持使用 queue.Queue()
并在套接字上有一个单独的线程 select.select()
并将结果放入队列?
编辑:我认为这是最小的可重现示例:
import multiprocessing
import threading
queue = multiprocessing.Queue()
class Msg():
def __init__(self):
self.lock = threading.Lock()
def source():
queue.put(Msg())
def sink():
obj = queue.get()
print("Got")
threading.Thread(target=sink).start()
source()
问题是我放入队列的对象有一个 threading.Lock
对象作为字段(在多个组合层次上)。
TL;DR: threading.Lock
实例根本无法被 pickle 并且 pickle 用于序列化一个对象 put
到 multiprocessing.Queue
实例。但是通过 multiprocessing.Queue
将对象传递给另一个线程的价值很小,因为线程检索成为该对象的新实例的内容,除非创建该对象的副本是您目标的一部分。因此,如果您确实通过队列传递对象,那么锁就不能成为对象状态的一部分,您需要另一种方法(见下文)。
(很多)更长的答案
First, as your error message states
threading.Lock` 实例不能用 pickle 序列化。这也可以很容易地证明:
>>> import pickle
>>> import threading
>>> lock = threading.Lock()
>>> serialized_lock = pickle.dumps(lock)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_thread.lock' object
其次,当您将对象放入 threading.Queue
实例时,该对象会使用 pickle 进行序列化,因此会出现上述异常。
但是,尽管您的帖子构成了一个最小的完整示例,但它并不代表可以执行任何有用操作的实际程序。你到底想完成什么?假设您能够序列化一个锁,并因此通过队列传递 Msg
的一个实例。大概锁是为了序列化一些更新对象状态的代码。但是,由于这是 Msg
的实例,而不是队列中 put
的实例,因此如果此 sink
线程创建了其他线程,则此锁的唯一有意义的用途是在其上运行这个例子。所以我们假设有一个属性,x
需要在多个线程中递增。这将需要锁定,因为 +=
运算符不是原子的。由于如果通过队列传递所需的锁不能成为对象状态的一部分,那么您必须单独创建锁。这只是众多可能方法中的一种:
import multiprocessing
import threading
queue = multiprocessing.Queue()
class Msg():
def __init__(self):
self.x = 0
def set_lock(self, lock):
self.lock = lock
def compute(self):
with self.lock:
self.x += 1
def source():
queue.put(Msg())
def sink():
msg = queue.get()
msg.set_lock(threading.Lock())
t = threading.Thread(target=msg.compute)
t.start()
msg.compute()
t.join()
print(msg.x)
threading.Thread(target=sink).start()
source()
打印:
2
如果您不使用队列进行对象传递,那么将锁作为对象初始状态的一部分没有问题:
import queue
import socket
import os
import select
import threading
class PollableQueue(queue.Queue):
def __init__(self):
super().__init__()
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
class Msg:
def __init__(self, q, socket):
# An instance of this class could be passed via a multithreading.Queue
# A multiprocessing.Lock could also be used but is not
# necessary if we are doing threading:
self.lock = threading.Lock() # to be used by some method not shown
self.q = q
self.socket = socket
def consume(self):
while True:
can_read, _, _ = select.select([q, read_socket], [], [])
for r in can_read:
item = r.get() if isinstance(r, queue.Queue) else r.recv(3).decode()
print('Got:', item, 'from', type(r))
# Example code that performs polling:
if __name__ == '__main__':
import threading
import time
q = PollableQueue()
write_socket, read_socket = socket.socketpair()
msg = Msg(q, read_socket)
t = threading.Thread(target=msg.consume, daemon=True)
t.start()
# Feed data to the queues
q.put('abc')
write_socket.send(b'cde')
write_socket.send(b'fgh')
q.put('ijk')
# Give consumer time to get all the items:
time.sleep(1)
打印:
Got: abc from <class '__main__.PollableQueue'>
Got: ijk from <class '__main__.PollableQueue'>
Got: cde from <class 'socket.socket'>
Got: fgh from <class 'socket.socket'>
我正在使用队列进行线程间通信。我使用 multiprocessing.Queue()
而不是 queue.Queue()
因为 multiprocessing
版本公开了一个底层文件描述符,可以用 select.select
等待 - 这意味着我可以阻止等待对象在队列中 或 一个数据包从同一线程到达网络接口。
但是当我尝试从队列中获取一个对象时,我得到了这个:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
有办法吗?或者我是否坚持使用 queue.Queue()
并在套接字上有一个单独的线程 select.select()
并将结果放入队列?
编辑:我认为这是最小的可重现示例:
import multiprocessing
import threading
queue = multiprocessing.Queue()
class Msg():
def __init__(self):
self.lock = threading.Lock()
def source():
queue.put(Msg())
def sink():
obj = queue.get()
print("Got")
threading.Thread(target=sink).start()
source()
问题是我放入队列的对象有一个 threading.Lock
对象作为字段(在多个组合层次上)。
TL;DR: threading.Lock
实例根本无法被 pickle 并且 pickle 用于序列化一个对象 put
到 multiprocessing.Queue
实例。但是通过 multiprocessing.Queue
将对象传递给另一个线程的价值很小,因为线程检索成为该对象的新实例的内容,除非创建该对象的副本是您目标的一部分。因此,如果您确实通过队列传递对象,那么锁就不能成为对象状态的一部分,您需要另一种方法(见下文)。
(很多)更长的答案
First, as your error message states
threading.Lock` 实例不能用 pickle 序列化。这也可以很容易地证明:
>>> import pickle
>>> import threading
>>> lock = threading.Lock()
>>> serialized_lock = pickle.dumps(lock)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_thread.lock' object
其次,当您将对象放入 threading.Queue
实例时,该对象会使用 pickle 进行序列化,因此会出现上述异常。
但是,尽管您的帖子构成了一个最小的完整示例,但它并不代表可以执行任何有用操作的实际程序。你到底想完成什么?假设您能够序列化一个锁,并因此通过队列传递 Msg
的一个实例。大概锁是为了序列化一些更新对象状态的代码。但是,由于这是 Msg
的实例,而不是队列中 put
的实例,因此如果此 sink
线程创建了其他线程,则此锁的唯一有意义的用途是在其上运行这个例子。所以我们假设有一个属性,x
需要在多个线程中递增。这将需要锁定,因为 +=
运算符不是原子的。由于如果通过队列传递所需的锁不能成为对象状态的一部分,那么您必须单独创建锁。这只是众多可能方法中的一种:
import multiprocessing
import threading
queue = multiprocessing.Queue()
class Msg():
def __init__(self):
self.x = 0
def set_lock(self, lock):
self.lock = lock
def compute(self):
with self.lock:
self.x += 1
def source():
queue.put(Msg())
def sink():
msg = queue.get()
msg.set_lock(threading.Lock())
t = threading.Thread(target=msg.compute)
t.start()
msg.compute()
t.join()
print(msg.x)
threading.Thread(target=sink).start()
source()
打印:
2
如果您不使用队列进行对象传递,那么将锁作为对象初始状态的一部分没有问题:
import queue
import socket
import os
import select
import threading
class PollableQueue(queue.Queue):
def __init__(self):
super().__init__()
# Create a pair of connected sockets
if os.name == 'posix':
self._putsocket, self._getsocket = socket.socketpair()
else:
# Compatibility on non-POSIX systems
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 0))
server.listen(1)
self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._putsocket.connect(server.getsockname())
self._getsocket, _ = server.accept()
server.close()
def fileno(self):
return self._getsocket.fileno()
def put(self, item):
super().put(item)
self._putsocket.send(b'x')
def get(self):
self._getsocket.recv(1)
return super().get()
class Msg:
def __init__(self, q, socket):
# An instance of this class could be passed via a multithreading.Queue
# A multiprocessing.Lock could also be used but is not
# necessary if we are doing threading:
self.lock = threading.Lock() # to be used by some method not shown
self.q = q
self.socket = socket
def consume(self):
while True:
can_read, _, _ = select.select([q, read_socket], [], [])
for r in can_read:
item = r.get() if isinstance(r, queue.Queue) else r.recv(3).decode()
print('Got:', item, 'from', type(r))
# Example code that performs polling:
if __name__ == '__main__':
import threading
import time
q = PollableQueue()
write_socket, read_socket = socket.socketpair()
msg = Msg(q, read_socket)
t = threading.Thread(target=msg.consume, daemon=True)
t.start()
# Feed data to the queues
q.put('abc')
write_socket.send(b'cde')
write_socket.send(b'fgh')
q.put('ijk')
# Give consumer time to get all the items:
time.sleep(1)
打印:
Got: abc from <class '__main__.PollableQueue'>
Got: ijk from <class '__main__.PollableQueue'>
Got: cde from <class 'socket.socket'>
Got: fgh from <class 'socket.socket'>