os.scandir 和多处理 - ThreadPool 工作,但多进程池不工作
os.scandir and multiprocessing - ThreadPool works, but multi-process Pool doesn't
我在 Python 脚本中有一个任务,以前主要是 IO 绑定,所以我使用了 ThreadPools 并且一切正常。现在我的任务变得更加 CPU-bound,所以我想切换到具有多个进程的池。
我认为这两个接口的行为几乎相同,所以我只是切换了导入,我应该可以开始了。但是,突然之间我的辅助函数不再在池中执行了。
在尝试了几件事之后,这似乎与我将 DirEntry 从 os.scandir() 传递到我的辅助函数这一事实相对应。将 'entry' 替换为硬编码字符串,执行我的辅助函数。用条目替换它,它停止工作。用ThreadPool替换import,又可以了。
# This works.
from multiprocessing.pool import ThreadPool as Pool
import os
pool_size = 3
def worker(entry):
print("Did some useful stuff!")
pool = Pool(pool_size)
for entry in os.scandir("Samples/"):
if entry.is_file():
pool.apply_async(worker, (entry,))
pool.close()
pool.join()
print("Finished multiprocessing task.")
输出:
Did some useful stuff! (~150x)
Finished multiprocessing task.
将from multiprocessing.pool import ThreadPool as Pool
替换为from multiprocessing import Pool
,我现在得到的唯一输出是:
Finished multiprocessing task.
现在,如果我插入一个随机字符串而不是循环中的条目到 pool.apply_async(worker, (entry,))
,那么例如pool.apply_async(worker, ("Why does this work?",))
,worker 函数工作并且 returns 与 ThreadPools 相同的输出,但显然我不想在我的实际脚本中使用参数。
这里发生了什么?
问题是,传递给子进程的东西正在被腌制,这不适用于 scandir
导致的 DirEntry
。不幸的是 apply_async
你看不到相应的失败。你会用简单的 apply
这就是我追踪的方式,一旦你看到发生了什么,它实际上是有道理的:
TypeError: can't pickle posix.DirEntry objects
根据您的需要,您可以传递 entry.path
或其他属性(可以腌制,所以真的,也 name
否则您必须使用 return 其方法的值) DirEntry
到您的工作程序中,您的代码应该可以正常工作。
至于学习失败,或者,您可以编写一个小函数,例如:
def print_failed(caught):
traceback.print_exc(file=sys.stderr)
并通过添加 error_callback=print_failed
.
在您的 apply_async
调用中注册它
我在 Python 脚本中有一个任务,以前主要是 IO 绑定,所以我使用了 ThreadPools 并且一切正常。现在我的任务变得更加 CPU-bound,所以我想切换到具有多个进程的池。
我认为这两个接口的行为几乎相同,所以我只是切换了导入,我应该可以开始了。但是,突然之间我的辅助函数不再在池中执行了。
在尝试了几件事之后,这似乎与我将 DirEntry 从 os.scandir() 传递到我的辅助函数这一事实相对应。将 'entry' 替换为硬编码字符串,执行我的辅助函数。用条目替换它,它停止工作。用ThreadPool替换import,又可以了。
# This works.
from multiprocessing.pool import ThreadPool as Pool
import os
pool_size = 3
def worker(entry):
print("Did some useful stuff!")
pool = Pool(pool_size)
for entry in os.scandir("Samples/"):
if entry.is_file():
pool.apply_async(worker, (entry,))
pool.close()
pool.join()
print("Finished multiprocessing task.")
输出:
Did some useful stuff! (~150x)
Finished multiprocessing task.
将from multiprocessing.pool import ThreadPool as Pool
替换为from multiprocessing import Pool
,我现在得到的唯一输出是:
Finished multiprocessing task.
现在,如果我插入一个随机字符串而不是循环中的条目到 pool.apply_async(worker, (entry,))
,那么例如pool.apply_async(worker, ("Why does this work?",))
,worker 函数工作并且 returns 与 ThreadPools 相同的输出,但显然我不想在我的实际脚本中使用参数。
这里发生了什么?
问题是,传递给子进程的东西正在被腌制,这不适用于 scandir
导致的 DirEntry
。不幸的是 apply_async
你看不到相应的失败。你会用简单的 apply
这就是我追踪的方式,一旦你看到发生了什么,它实际上是有道理的:
TypeError: can't pickle posix.DirEntry objects
根据您的需要,您可以传递 entry.path
或其他属性(可以腌制,所以真的,也 name
否则您必须使用 return 其方法的值) DirEntry
到您的工作程序中,您的代码应该可以正常工作。
至于学习失败,或者,您可以编写一个小函数,例如:
def print_failed(caught):
traceback.print_exc(file=sys.stderr)
并通过添加 error_callback=print_failed
.
apply_async
调用中注册它