os.scandir 和多处理 - ThreadPool 工作,但多进程池不工作

os.scandir and multiprocessing - ThreadPool works, but multi-process Pool doesn't

我在 Python 脚本中有一个任务,以前主要是 IO 绑定,所以我使用了 ThreadPools 并且一切正常。现在我的任务变得更加 CPU-bound,所以我想切换到具有多个进程的池。

我认为这两个接口的行为几乎相同,所以我只是切换了导入,我应该可以开始了。但是,突然之间我的辅助函数不再在池中执行了。

在尝试了几件事之后,这似乎与我将 DirEntry 从 os.scandir() 传递到我的辅助函数这一事实相对应。将 'entry' 替换为硬编码字符串,执行我的辅助函数。用条目替换它,它停止工作。用ThreadPool替换import,又可以了。

# This works.
from multiprocessing.pool import ThreadPool as Pool
import os

pool_size = 3

def worker(entry):
    print("Did some useful stuff!")

pool = Pool(pool_size)

for entry in os.scandir("Samples/"):
    if entry.is_file():
        pool.apply_async(worker, (entry,))

pool.close()
pool.join()

print("Finished multiprocessing task.")

输出:

Did some useful stuff! (~150x)
Finished multiprocessing task.

from multiprocessing.pool import ThreadPool as Pool替换为from multiprocessing import Pool,我现在得到的唯一输出是:

Finished multiprocessing task.

现在,如果我插入一个随机字符串而不是循环中的条目到 pool.apply_async(worker, (entry,)),那么例如pool.apply_async(worker, ("Why does this work?",)),worker 函数工作并且 returns 与 ThreadPools 相同的输出,但显然我不想在我的实际脚本中使用参数。

这里发生了什么?

问题是,传递给子进程的东西正在被腌制,这不适用于 scandir 导致的 DirEntry。不幸的是 apply_async 你看不到相应的失败。你会用简单的 apply 这就是我追踪的方式,一旦你看到发生了什么,它实际上是有道理的:

TypeError: can't pickle posix.DirEntry objects

根据您的需要,您可以传递 entry.path 或其他属性(可以腌制,所以真的,也 name 否则您必须使用 return 其方法的值) DirEntry 到您的工作程序中,您的代码应该可以正常工作。


至于学习失败,或者,您可以编写一个小函数,例如:

def print_failed(caught):
    traceback.print_exc(file=sys.stderr)

并通过添加 error_callback=print_failed.

在您的 apply_async 调用中注册它