为什么不会这个泡菜?
Why wont this pickle?
我正在尝试 运行 将其作为来自多处理的进程,但是当启动线程时,pickler 崩溃了,我无法弄清楚是什么阻止了它进行 pickling。我已经尝试注释掉套接字代码和消息 obj 代码,但仍然无法正常工作 - 我做错了什么?
class TransmitThread(Process):
def __init__(self, send_queue, reply_queue, control_pipe, recv_timeout=2, buffer_size=4096):
"""
This init function is called when the thread is created. Function simply calls the Process class init
function, and stores the class vars.
"""
# Call process class init
Process.__init__(self)
# Store class vars
self.send_queue = send_queue
self.reply_queue = reply_queue
self.control_pipe = control_pipe
self._recv_timeout = recv_timeout
self._buffer_size = buffer_size
def run(self):
"""
This is the main function that is called when the thread is started.
The function loops forever, waiting for a send message in the queue, and processes the message to send
and fetches the response. The thread loops forever until it's terminated or the KILL THREAD command is
passed through the control pipe.
"""
# Start our forever running loop
while True:
# Check if there is anything in the pipe
if self.control_pipe.poll():
# Check if we received the kill thread command
if self.control_pipe.recv() == KILL_THREAD_COMMAND:
# Kill the while loop and end the thread
break
# Check if there is anything in message queue
if not self.send_queue.empty():
# Fetch message from the queue to send, and unpickle
message_obj, message_pickle = self.send_queue.get()
# Open socket and set timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(self._recv_timeout)
# Connect socket to the recipient
sock.connect( message_obj.recipient_address )
# Push the pickled message down the socket
sock.sendall(message_pickle)
# Check if the message we send is a request (should get a response)
if str(message_obj.message_type) == str(Message.REQUEST):
print "fetching reply"
# Lets fetch the response, and push the pickled message onto the queue
self.reply_queue.put( sock.recv(self._buffer_size) )
print "got a reply"
# All done, close the socket
sock.close()
# Add small delay to stop this thread consuming too much CPU time
sleep(0.1)
错误信息是:
File "C:/Users/oliver/OneDrive/GIT/pyke/pyke.py", line 127, in __init__
self.thread.start()
File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "C:\Python27\lib\multiprocessing\forking.py", line 277, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Python27\lib\multiprocessing\forking.py", line 199, in dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Python27\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 725, in save_inst
save(stuff)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 396, in save_reduce
save(cls)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 748, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Python27\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Python27\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Python27\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
pickler 错误来自 multiprocessing.Process
试图在内部将自身 pickle 到子进程。我很确定您的实例变量之一没有正确地腌制到子进程。你的问题哪一个不清楚
# Store class vars
self.send_queue = send_queue
self.reply_queue = reply_queue
self.control_pipe = control_pipe
self._recv_timeout = recv_timeout
self._buffer_size = buffer_size
[根据 OP 的评论进行编辑]:
问题是 send_queue
和 reply_queue
是 Queue.Queue
而不是 multiprocessing.Queue
。当分叉子工作者时,Process
尝试将自身和任何实例变量序列化给子工作者。但是 Queue.Queue
s 是不可序列化的本地对象,因此会出现错误。
与问题相关的另一个事实是,multiprocessing.Queue
re-uses the exceptions from Queue
without re-exporting these. This is actually documented,尽管有些隐藏在混乱之下:
Note: multiprocessing
uses the usual Queue.Empty
and Queue.Full
exceptions to signal a timeout. They are not available in the multiprocessing
namespace so you need to import them from Queue
.
我正在尝试 运行 将其作为来自多处理的进程,但是当启动线程时,pickler 崩溃了,我无法弄清楚是什么阻止了它进行 pickling。我已经尝试注释掉套接字代码和消息 obj 代码,但仍然无法正常工作 - 我做错了什么?
class TransmitThread(Process):
def __init__(self, send_queue, reply_queue, control_pipe, recv_timeout=2, buffer_size=4096):
"""
This init function is called when the thread is created. Function simply calls the Process class init
function, and stores the class vars.
"""
# Call process class init
Process.__init__(self)
# Store class vars
self.send_queue = send_queue
self.reply_queue = reply_queue
self.control_pipe = control_pipe
self._recv_timeout = recv_timeout
self._buffer_size = buffer_size
def run(self):
"""
This is the main function that is called when the thread is started.
The function loops forever, waiting for a send message in the queue, and processes the message to send
and fetches the response. The thread loops forever until it's terminated or the KILL THREAD command is
passed through the control pipe.
"""
# Start our forever running loop
while True:
# Check if there is anything in the pipe
if self.control_pipe.poll():
# Check if we received the kill thread command
if self.control_pipe.recv() == KILL_THREAD_COMMAND:
# Kill the while loop and end the thread
break
# Check if there is anything in message queue
if not self.send_queue.empty():
# Fetch message from the queue to send, and unpickle
message_obj, message_pickle = self.send_queue.get()
# Open socket and set timeout
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(self._recv_timeout)
# Connect socket to the recipient
sock.connect( message_obj.recipient_address )
# Push the pickled message down the socket
sock.sendall(message_pickle)
# Check if the message we send is a request (should get a response)
if str(message_obj.message_type) == str(Message.REQUEST):
print "fetching reply"
# Lets fetch the response, and push the pickled message onto the queue
self.reply_queue.put( sock.recv(self._buffer_size) )
print "got a reply"
# All done, close the socket
sock.close()
# Add small delay to stop this thread consuming too much CPU time
sleep(0.1)
错误信息是:
File "C:/Users/oliver/OneDrive/GIT/pyke/pyke.py", line 127, in __init__
self.thread.start()
File "C:\Python27\lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "C:\Python27\lib\multiprocessing\forking.py", line 277, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Python27\lib\multiprocessing\forking.py", line 199, in dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Python27\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 725, in save_inst
save(stuff)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "C:\Python27\lib\pickle.py", line 681, in _batch_setitems
save(v)
File "C:\Python27\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Python27\lib\pickle.py", line 396, in save_reduce
save(cls)
File "C:\Python27\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Python27\lib\pickle.py", line 748, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Python27\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Python27\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Python27\lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
pickler 错误来自 multiprocessing.Process
试图在内部将自身 pickle 到子进程。我很确定您的实例变量之一没有正确地腌制到子进程。你的问题哪一个不清楚
# Store class vars
self.send_queue = send_queue
self.reply_queue = reply_queue
self.control_pipe = control_pipe
self._recv_timeout = recv_timeout
self._buffer_size = buffer_size
[根据 OP 的评论进行编辑]:
问题是 send_queue
和 reply_queue
是 Queue.Queue
而不是 multiprocessing.Queue
。当分叉子工作者时,Process
尝试将自身和任何实例变量序列化给子工作者。但是 Queue.Queue
s 是不可序列化的本地对象,因此会出现错误。
与问题相关的另一个事实是,multiprocessing.Queue
re-uses the exceptions from Queue
without re-exporting these. This is actually documented,尽管有些隐藏在混乱之下:
Note:
multiprocessing
uses the usualQueue.Empty
andQueue.Full
exceptions to signal a timeout. They are not available in themultiprocessing
namespace so you need to import them fromQueue
.