Multiprocessing.Queue 大量数据导致 _wait_for_tstate_lock
Multiprocessing.Queue with hugh data causes _wait_for_tstate_lock
当我通过 multiprocessing.Queue
.
在 Process
和 Thread
之间传输大量数据时,在 threading._wait_for_tstate_lock
中出现异常
首先,我的最小工作示例看起来有点复杂 - 抱歉。我会解释。原始应用程序将许多(不是那么重要的)文件加载到 RAM 中。这是在一个单独的过程中完成的,以节省资源。主 gui 线程不应该冻结。
GUI 启动一个单独的 Thread
以防止 gui 事件循环冻结。
这个单独的 Thread
然后启动一个 Process
应该完成工作。
a) 这个 Thread
实例化了一个 multiprocess.Queue
(注意这是一个 multiprocessing
而不是 threading
!)
b) 这是给 Process
的,用于将数据从 Process
共享回 Thread
。
Process
做了一些工作(3 个步骤),.put()
将结果放入 multiprocessing.Queue
。
当Process
结束时,Thread
再次接管并从Queue
收集数据,将其存储到自己的属性MyThread.result
.
Thread
告诉 GUI main loop/thread 如果有时间调用回调函数。
回调函数(MyWindow::callback_thread_finished()
)从MyWindow.thread.result
.
获取结果
问题是如果放入 Queue
的数据太大了,我不明白会发生什么事 - MyThread
永远不会结束。我必须通过Strg+C取消申请。
我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我觉得我的问题的关键可以在那里找到。
请参阅“Pipes and Queues”(Python 3.5 文档)中的两个红色框。
那就是完整的输出
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
这是最小的工作示例
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
可能重复但完全不同,IMO 没有针对我的情况的答案:Thread._wait_for_tstate_lock() never returns。
解决方法
通过将第 22 行修改为 queue = multiprocessing.Manager().Queue()
使用 Manager 解决问题。但我不知道为什么。我提出这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。即使我真的不知道 Manager()
是什么以及它是否有其他(导致问题的)影响。
根据您链接到的文档中的第二个警告框,当您在处理队列中的所有项目之前加入进程时可能会出现死锁。因此,启动流程并立即加入流程并然后 处理队列中的项目是错误的步骤顺序。您必须启动流程,然后接收项目,然后只有在接收到所有项目后才能调用 join 方法。定义一些哨兵值来表示进程已完成通过队列发送数据。 None
例如,如果这不是您期望从流程中获得的常规值。
class MyThread(threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
self.result = []
def run(self):
print('Running MyThread...')
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
while True:
process_result = queue.get()
if process_result is None:
break
self.result.append(process_result)
process.join()
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x' * 102048))
self.queue.put(None)
print('MyProcess stoppd.')
当我通过 multiprocessing.Queue
.
Process
和 Thread
之间传输大量数据时,在 threading._wait_for_tstate_lock
中出现异常
首先,我的最小工作示例看起来有点复杂 - 抱歉。我会解释。原始应用程序将许多(不是那么重要的)文件加载到 RAM 中。这是在一个单独的过程中完成的,以节省资源。主 gui 线程不应该冻结。
GUI 启动一个单独的
Thread
以防止 gui 事件循环冻结。这个单独的
Thread
然后启动一个Process
应该完成工作。
a) 这个 Thread
实例化了一个 multiprocess.Queue
(注意这是一个 multiprocessing
而不是 threading
!)
b) 这是给 Process
的,用于将数据从 Process
共享回 Thread
。
Process
做了一些工作(3 个步骤),.put()
将结果放入multiprocessing.Queue
。当
Process
结束时,Thread
再次接管并从Queue
收集数据,将其存储到自己的属性MyThread.result
.Thread
告诉 GUI main loop/thread 如果有时间调用回调函数。回调函数(
获取结果MyWindow::callback_thread_finished()
)从MyWindow.thread.result
.
问题是如果放入 Queue
的数据太大了,我不明白会发生什么事 - MyThread
永远不会结束。我必须通过Strg+C取消申请。
我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我觉得我的问题的关键可以在那里找到。 请参阅“Pipes and Queues”(Python 3.5 文档)中的两个红色框。 那就是完整的输出
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
这是最小的工作示例
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
可能重复但完全不同,IMO 没有针对我的情况的答案:Thread._wait_for_tstate_lock() never returns。
解决方法
通过将第 22 行修改为 queue = multiprocessing.Manager().Queue()
使用 Manager 解决问题。但我不知道为什么。我提出这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。即使我真的不知道 Manager()
是什么以及它是否有其他(导致问题的)影响。
根据您链接到的文档中的第二个警告框,当您在处理队列中的所有项目之前加入进程时可能会出现死锁。因此,启动流程并立即加入流程并然后 处理队列中的项目是错误的步骤顺序。您必须启动流程,然后接收项目,然后只有在接收到所有项目后才能调用 join 方法。定义一些哨兵值来表示进程已完成通过队列发送数据。 None
例如,如果这不是您期望从流程中获得的常规值。
class MyThread(threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
self.result = []
def run(self):
print('Running MyThread...')
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
while True:
process_result = queue.get()
if process_result is None:
break
self.result.append(process_result)
process.join()
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x' * 102048))
self.queue.put(None)
print('MyProcess stoppd.')