Multiprocessing.Queue 大量数据导致 _wait_for_tstate_lock

Multiprocessing.Queue with hugh data causes _wait_for_tstate_lock

当我通过 multiprocessing.Queue.

ProcessThread 之间传输大量数据时,在 threading._wait_for_tstate_lock 中出现异常

首先,我的最小工作示例看起来有点复杂 - 抱歉。我会解释。原始应用程序将许多(不是那么重要的)文件加载到 RAM 中。这是在一个单独的过程中完成的,以节省资源。主 gui 线程不应该冻结。

  1. GUI 启动一个单独的 Thread 以防止 gui 事件循环冻结。

  2. 这个单独的 Thread 然后启动一个 Process 应该完成工作。

a) 这个 Thread 实例化了一个 multiprocess.Queue(注意这是一个 multiprocessing 而不是 threading!)

b) 这是给 Process 的,用于将数据从 Process 共享回 Thread

  1. Process 做了一些工作(3 个步骤),.put() 将结果放入 multiprocessing.Queue

  2. Process结束时,Thread再次接管并从Queue收集数据,将其存储到自己的属性MyThread.result .

  3. Thread 告诉 GUI main loop/thread 如果有时间调用回调函数。

  4. 回调函数(MyWindow::callback_thread_finished())从MyWindow.thread.result.

    获取结果

问题是如果放入 Queue 的数据太大了,我不明白会发生什么事 - MyThread 永远不会结束。我必须通过Strg+C取消申请。

我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我觉得我的问题的关键可以在那里找到。 请参阅“Pipes and Queues”(Python 3.5 文档)中的两个红色框。 那就是完整的输出

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

这是最小的工作示例

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

可能重复但完全不同,IMO 没有针对我的情况的答案:Thread._wait_for_tstate_lock() never returns

解决方法

通过将第 22 行修改为 queue = multiprocessing.Manager().Queue() 使用 Manager 解决问题。但我不知道为什么。我提出这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。即使我真的不知道 Manager() 是什么以及它是否有其他(导致问题的)影响。

根据您链接到的文档中的第二个警告框,当您在处理队列中的所有项目之前加入进程时可能会出现死锁。因此,启动流程并立即加入流程并然后 处理队列中的项目是错误的步骤顺序。您必须启动流程,然后接收项目,然后只有在接收到所有项目后才能调用 join 方法。定义一些哨兵值来表示进程已完成通过队列发送数据。 None 例如,如果这不是您期望从流程中获得的常规值。

class MyThread(threading.Thread):
    """This thread just starts the process."""

    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running MyThread...')
        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('MyProcess stoppd.')