Python 多处理池一些进程在分叉时处于死锁状态,但在生成时运行

Python multiprocessing pool some process in deadlock when forked but runs when spawned

所以我尝试尝试一些服务下载和调整图像大小(使用线程下载图像和进程来调整它们的大小)。我启动了下载线程(有一个管理线程会监视它们),一旦图像保存在本地,我就将它的路径添加到队列中。当所有图片下载完成后,管理器线程会在队列中添加一个毒丸。

主线程同时监视队列并在下载路径时从中获取路径,并从池中启动一个新的异步进程以调整图像大小。

最后,当我尝试加入池时,它有时会挂起,似乎是一个死锁。它不会每次都发生,但 IMG_URLS 列表中的 url 越多,它发生的频率就越高。如果发生这种死锁,日志会告诉我们某些进程未正确启动或立即处于死锁状态,因为“resizing {file}”日志不会为它们显示。

import logging
import multiprocessing as mp
import time
from queue import Queue
from threading import Thread


def resize_image(file):
    logging.info(f"resizing {file}")
    time.sleep(0.1)
    logging.info(f"done resizing {file}")


class Service(object):
    def __init__(self):
        self.img_queue = Queue()

    def download_image(self, url) -> None:
        logging.info(f"downloading image from URL {url}")
        time.sleep(1)
        file = f"local-{url}"
        self.img_queue.put(file)
        logging.info(f"image saved to {file}")

    def download_images(self, img_url_list: list):
        logging.info("beginning image downloads")

        threads = []
        for url in img_url_list:
            t = Thread(target=self.download_image, args=(url,))
            t.start()
            threads.append(t)

        for t in threads:
            t.join()
        logging.info("all images downloaded")
        self.img_queue.put(None)

    def resize_images(self):
        logging.info("beginning image resizing")
        with mp.Pool() as p:
            while True:
                file = self.img_queue.get()
                if file is None:
                    logging.info("got SENTINEL")
                    break
                logging.info(f"got {file}")
                p.apply_async(func=resize_image, args=(file,))
            p.close()
            p.join()
        logging.info("all images resized")

    def run(self, img_url_list):
        logging.info("START service")

        dl_manager_thread = Thread(target=self.download_images, args=(img_url_list,))
        dl_manager_thread.start()
        self.resize_images()

        logging.info(f"END service")


if __name__ == "__main__":
    FORMAT = "[%(threadName)s, %(asctime)s, %(levelname)s] %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=FORMAT)

    IMG_URLS = list(range(8))

    service = Service()
    service.run(IMG_URLS)

当 运行 使用 python 3.8.5(Ubuntu 20.04,Ryzen 2600)时。我得到以下信息:

[MainThread, 2020-11-30 19:58:01,257, INFO] START service
[Thread-1, 2020-11-30 19:58:01,257, INFO] beginning image downloads
[MainThread, 2020-11-30 19:58:01,257, INFO] beginning image resizing
[Thread-2, 2020-11-30 19:58:01,258, INFO] downloading image from URL 0
[Thread-3, 2020-11-30 19:58:01,258, INFO] downloading image from URL 1
[Thread-4, 2020-11-30 19:58:01,258, INFO] downloading image from URL 2
[Thread-5, 2020-11-30 19:58:01,259, INFO] downloading image from URL 3
[Thread-6, 2020-11-30 19:58:01,260, INFO] downloading image from URL 4
[Thread-7, 2020-11-30 19:58:01,260, INFO] downloading image from URL 5
[Thread-8, 2020-11-30 19:58:01,261, INFO] downloading image from URL 6
[Thread-9, 2020-11-30 19:58:01,262, INFO] downloading image from URL 7
[Thread-2, 2020-11-30 19:58:02,259, INFO] image saved to local-0
[MainThread, 2020-11-30 19:58:02,260, INFO] got local-0
[Thread-3, 2020-11-30 19:58:02,260, INFO] image saved to local-1
[Thread-4, 2020-11-30 19:58:02,260, INFO] image saved to local-2
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-1
[MainThread, 2020-11-30 19:58:02,261, INFO] resizing local-0
[Thread-5, 2020-11-30 19:58:02,261, INFO] image saved to local-3
[Thread-6, 2020-11-30 19:58:02,261, INFO] image saved to local-4
[MainThread, 2020-11-30 19:58:02,261, INFO] got local-2
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-3
[MainThread, 2020-11-30 19:58:02,262, INFO] resizing local-1
[Thread-7, 2020-11-30 19:58:02,262, INFO] image saved to local-5
[MainThread, 2020-11-30 19:58:02,262, INFO] got local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-3
[Thread-8, 2020-11-30 19:58:02,263, INFO] image saved to local-6
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-4
[MainThread, 2020-11-30 19:58:02,263, INFO] resizing local-5
[MainThread, 2020-11-30 19:58:02,263, INFO] got local-6
[MainThread, 2020-11-30 19:58:02,264, INFO] resizing local-6
[Thread-9, 2020-11-30 19:58:02,264, INFO] image saved to local-7
[MainThread, 2020-11-30 19:58:02,265, INFO] got local-7
[Thread-1, 2020-11-30 19:58:02,265, INFO] all images downloaded
[MainThread, 2020-11-30 19:58:02,265, INFO] got SENTINEL
[MainThread, 2020-11-30 19:58:02,265, INFO] resizing local-7
[MainThread, 2020-11-30 19:58:02,362, INFO] done resizing local-0
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-1
[MainThread, 2020-11-30 19:58:02,363, INFO] done resizing local-3
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-4
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-5
[MainThread, 2020-11-30 19:58:02,364, INFO] done resizing local-6
[MainThread, 2020-11-30 19:58:02,366, INFO] done resizing local-7

有时它会在这里开始挂起。请注意,缺少 resizing local-2 日志,因此该进程未启动或正在等待。

如果我将池更改为使用产卵而不是分叉,它可以正常工作。我想在某些情况下,叉子会复制一些处于获取状态的锁,这就是问题所在,但我不清楚在哪里以及为什么。

with mp.get_context("spawn").Pool() as p:

有什么想法吗?

有时(当你不走运时)当你的池正在旋转时,当你的下载线程正在向 logging 模块写入消息时,其中一个子进程将被“分叉”。 logging 模块使用一个受锁保护的队列来传递消息,因此当“fork”发生时,该锁可以在锁定状态下被复制。然后当下载线程完成将其消息写入队列时,只有主进程上的锁被释放,所以你剩下一个子进程等待该锁的副本将消息写入 logging。该锁永远不会被释放,因为下载线程不会被复制(fork 不会复制线程)。这就是发生的死锁。可以通过某些方式修补此类错误,但这是“生成”存在的原因之一。

此外,“spawn”是所有架构都支持的唯一方法。使用一个碰巧在引擎盖下是多线程的库而没有意识到它是如此容易,而“fork”并不是真正的多线程友好。我对“forkserver”了解不多,以防你真的需要“fork”提供的减少开销。理论上它是多线程安全的。

fork

The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Here's 对我用作主要资源的这个问题的一些参考进行了更深入的讨论

只是一些额外的信息来扩展 Aaron 的精彩回答。

这 python bug/enhancement 似乎是完全相同的东西: https://bugs.python.org/issue6721

我在另一个问同样问题的问题中发现: Deadlock with logging multiprocess/multithread python script