使用 Pool() 的大型计算期间的多处理死锁。apply_async

Multiprocessing deadlocks during large computation using Pool().apply_async

我在 Python 3.7.3 中遇到一个问题,我的多处理操作(使用队列、池和 apply_async)在处理大型计算任务时会死锁。

对于小型计算,这个多处理任务工作得很好。然而,当处理更大的进程时,多进程任务停止或死锁,完全没有退出进程!我读到如果你 "grow your queue without bounds, and you are joining up to a subprocess that is waiting for room in the queue [...] your main process is stalled waiting for that one to complete, and it never will." ()

就会发生这种情况

我无法将此概念转换为代码。我将非常感谢有关重构我在下面编写的代码的指导:

import multiprocessing as mp

def listener(q, d):  # task to queue information into a manager dictionary
    while True:
        item_to_write = q.get()
        if item_to_write == 'kill':
            break
        foo = d['region']
        foo.add(item_to_write) 
        d['region'] = foo  # add items and set to manager dictionary


def main():
    manager = mp.Manager()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()

    pool = mp.Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d))
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))  # task for multiprocessing
        jobs.append(job)
    for job in jobs:
        job.get()  # begin multiprocessing task
    q.put('kill')  # kill multiprocessing task (view listener function)
    pool.close()
    pool.join()

    print('process complete')


if __name__ == '__main__':
    main()

最终,我想完全防止死锁,以促进可以无限期运行直到完成的多处理任务。


下面是在 BASH

中退出死锁时的回溯
^CTraceback (most recent call last):
  File "multithread_search_cl_gamma.py", line 260, in <module>
    main(GEOTAG)
  File "multithread_search_cl_gamma.py", line 248, in main
    job.get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 651, in get
Process ForkPoolWorker-28:
Process ForkPoolWorker-31:
Process ForkPoolWorker-30:
Process ForkPoolWorker-27:
Process ForkPoolWorker-29:
Process ForkPoolWorker-26:
    self.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 648, in wait
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
     self._event.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 552, in wait
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 352, in get
    res = self._reader.recv_bytes()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
    signaled = self._cond.wait(timeout)
  File "/Users/Ira/anaconda3/lib/python3.7/threading.py", line 296, in wait
    waiter.acquire()
KeyboardInterrupt
   with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/queues.py", line 351, in get
    with self._rlock:
  File "/Users/Ira/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

下面是更新后的脚本:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
            while True:
                item_to_write = q.get(False)
                if item_to_write == 'kill':
                    break
                foo = d['region']
                foo.add(item_to_write)
                d['region'] = foo
        except queue.Empty:
            pass

        time.sleep(0.5)
        if not q.empty():
            continue


def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    for job in jobs:
        job.get()
    q.put('kill')
    pool.close()
    pool.join()
    print('process complete')


if __name__ == '__main__':
    main()

更新::

execute_command 执行搜索所需的几个过程,所以我在 q.put() 所在的位置输入代码。

独自一人,脚本将需要 > 72 小时才能完成。每个多进程从不完成整个任务,而是单独工作并引用 manager.dict() 以避免重复任务。这些任务一直工作到 manager.dict() 中的每个元组都已处理。

def area(self, tup, housing_dict, q):
    state, reg, sub_reg = tup[0], tup[1], tup[2]
    for cat in housing_dict:
        """
        computationally expensive, takes > 72 hours
        for a list of 512 tup(s)
        """
        result = self.search_geotag(
            state, reg, cat, area=sub_reg
            )
    q.put(tup)

最终将q.put(tup)放在listener函数中将tup添加到manager.dict()

由于 listenerexecute_search 共享同一个队列对象,因此可能存在竞争, 其中 execute_searchlistener 之前从队列中获取 'kill',因此 listener 将永远阻塞 get(),因为没有更多新项目。

对于这种情况,您可以使用事件对象来通知所有进程停止:

import multiprocessing as mp
import queue

def listener(q, d, stop_event):
    while not stop_event.is_set():
        try:
           item_to_write = q.get(timeout=0.1)
           foo = d['region']
           foo.add(item_to_write)
           d['region'] = foo
        except queue.Empty:
            pass
    print("Listener process stopped")

def main():
    manager = mp.Manager()
    stop_event = manager.Event()
    q = manager.Queue()
    d = manager.dict()
    d['region'] = set()
    pool = mp.get_context("spawn").Pool(mp.cpu_count() + 2)
    watcher = pool.apply_async(listener, (q, d, stop_event))
    stop_event.set()
    jobs = []
    for i in range(24):
        job = pool.apply_async(execute_search, (q, d))
        jobs.append(job)
    try:
        for job in jobs: 
            job.get(300) #get the result or throws a timeout exception after 300 seconds
    except multiprocessing.TimeoutError:
         pool.terminate()
    stop_event.set() # stop listener process
    print('process complete')


if __name__ == '__main__':
    main()