delayed() 函数有什么作用(在 Python 中与 joblib 一起使用时)
What does the delayed() function do (when used with joblib in Python)
我已通读 documentation,但我不明白以下内容的含义:
The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.
我正在使用它来迭代我要操作的列表 (allImages),如下所示:
def joblib_loop():
Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)
这个 returns 我的 HOG 功能,就像我想要的那样(并且使用我所有的 8 个内核提高了速度),但我只是不确定它实际上在做什么。
我的 Python 知识充其量不过,很可能我遗漏了一些基本知识。任何正确方向的指示将不胜感激
参考资料https://wiki.python.org/moin/ParallelProcessing
Parallel 对象创建一个多处理池,它在多个进程中派生 Python 解释器来执行列表中的每一项。延迟函数是一个简单的技巧,可以使用函数调用语法创建元组 (function, args, kwargs)。
我想建议的另一件事是,我们可以像这样概括而不是明确定义核心数:
import multiprocessing
num_core=multiprocessing.cpu_count()
因此,您希望能够做的是堆积一组函数调用及其参数,以便您可以将它们有效地传递给 scheduler/executor。 Delayed 是一个 decorator,它接受一个函数及其参数,并将它们包装到一个对象中,该对象可以放入列表中并根据需要弹出。 Dask 有同样的东西,它部分使用它来提供给它的图形调度程序。
如果我们看看如果我们简单地写下会发生什么,事情可能会变得更清楚
Parallel(n_jobs=8)(getHog(i) for i in allImages)
在这种情况下,可以更自然地表达为:
- 使用
n_jobs=8
创建一个 Parallel
实例
- 创建列表
[getHog(i) for i in allImages]
- 将该列表传递给
Parallel
实例
有什么问题吗?当列表传递给 Parallel
对象时,所有 getHog(i)
调用都已经返回 - 所以没有什么可以并行执行的了!所有工作都已在主线程中按顺序完成。
我们实际上想要的是告诉Python我们想用什么参数调用什么函数,而不实际调用它们 - 换句话说,我们想要延迟执行。
这就是 delayed
允许我们使用清晰的语法方便地做的事情。如果我们想告诉 Python 我们想稍后调用 foo(2, g=3)
,我们可以简单地写成 delayed(foo)(2, g=3)
。返回的是元组(foo, [2], {g: 3})
,包含:
- 对我们要调用的函数的引用,例如
foo
- 所有arguments(简称“args”)没有关键字,e.g.t
2
- 所有关键字参数(简称“kwargs”),例如
g=3
所以,通过写 Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)
,而不是上面的序列,现在会发生以下情况:
创建了 Parallel
个具有 n_jobs=8
的实例
榜单
[delayed(getHog)(i) for i in allImages]
已创建,评估为
[(getHog, [img1], {}), (getHog, [img2], {}), ... ]
该列表传递给 Parallel
实例
Parallel
实例创建8个线程并将列表中的元组分配给它们
最后,这些线程中的每一个都开始执行元组,即,它们调用第一个元素,将第二个和第三个元素解压为参数 tup[0](*tup[1], **tup[2])
,将元组转回我们实际打算做的调用,getHog(img2)
.
我们需要一个循环来测试不同模型配置的列表。这是驱动网格搜索过程的主要函数,将为每个模型配置调用 score_model() 函数。我们可以通过并行评估模型配置来显着加快网格搜索过程。一种方法是使用 Joblib 库。我们可以使用要使用的内核数定义一个 Parallel 对象,并将其设置为在您的硬件中检测到的分数数。
定义执行器
executor = Parallel(n_jobs=cpu_count(), backend= 'multiprocessing' )
然后创建一个要并行执行的任务列表,这将是对我们拥有的每个模型配置的评分 model() 函数的一次调用。
假设def score_model(data, n_test, cfg):
........................
定义任务列表
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
我们可以使用 Parallel 对象并行执行任务列表。
scores = executor(tasks)
我已通读 documentation,但我不明白以下内容的含义:
The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.
我正在使用它来迭代我要操作的列表 (allImages),如下所示:
def joblib_loop():
Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)
这个 returns 我的 HOG 功能,就像我想要的那样(并且使用我所有的 8 个内核提高了速度),但我只是不确定它实际上在做什么。
我的 Python 知识充其量不过,很可能我遗漏了一些基本知识。任何正确方向的指示将不胜感激
参考资料https://wiki.python.org/moin/ParallelProcessing Parallel 对象创建一个多处理池,它在多个进程中派生 Python 解释器来执行列表中的每一项。延迟函数是一个简单的技巧,可以使用函数调用语法创建元组 (function, args, kwargs)。
我想建议的另一件事是,我们可以像这样概括而不是明确定义核心数:
import multiprocessing
num_core=multiprocessing.cpu_count()
因此,您希望能够做的是堆积一组函数调用及其参数,以便您可以将它们有效地传递给 scheduler/executor。 Delayed 是一个 decorator,它接受一个函数及其参数,并将它们包装到一个对象中,该对象可以放入列表中并根据需要弹出。 Dask 有同样的东西,它部分使用它来提供给它的图形调度程序。
如果我们看看如果我们简单地写下会发生什么,事情可能会变得更清楚
Parallel(n_jobs=8)(getHog(i) for i in allImages)
在这种情况下,可以更自然地表达为:
- 使用
n_jobs=8
创建一个 - 创建列表
[getHog(i) for i in allImages]
- 将该列表传递给
Parallel
实例
Parallel
实例
有什么问题吗?当列表传递给 Parallel
对象时,所有 getHog(i)
调用都已经返回 - 所以没有什么可以并行执行的了!所有工作都已在主线程中按顺序完成。
我们实际上想要的是告诉Python我们想用什么参数调用什么函数,而不实际调用它们 - 换句话说,我们想要延迟执行。
这就是 delayed
允许我们使用清晰的语法方便地做的事情。如果我们想告诉 Python 我们想稍后调用 foo(2, g=3)
,我们可以简单地写成 delayed(foo)(2, g=3)
。返回的是元组(foo, [2], {g: 3})
,包含:
- 对我们要调用的函数的引用,例如
foo
- 所有arguments(简称“args”)没有关键字,e.g.t
2
- 所有关键字参数(简称“kwargs”),例如
g=3
所以,通过写 Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)
,而不是上面的序列,现在会发生以下情况:
创建了
Parallel
个具有n_jobs=8
的实例榜单
[delayed(getHog)(i) for i in allImages]
已创建,评估为
[(getHog, [img1], {}), (getHog, [img2], {}), ... ]
该列表传递给
Parallel
实例Parallel
实例创建8个线程并将列表中的元组分配给它们最后,这些线程中的每一个都开始执行元组,即,它们调用第一个元素,将第二个和第三个元素解压为参数
tup[0](*tup[1], **tup[2])
,将元组转回我们实际打算做的调用,getHog(img2)
.
我们需要一个循环来测试不同模型配置的列表。这是驱动网格搜索过程的主要函数,将为每个模型配置调用 score_model() 函数。我们可以通过并行评估模型配置来显着加快网格搜索过程。一种方法是使用 Joblib 库。我们可以使用要使用的内核数定义一个 Parallel 对象,并将其设置为在您的硬件中检测到的分数数。
定义执行器
executor = Parallel(n_jobs=cpu_count(), backend= 'multiprocessing' )
然后创建一个要并行执行的任务列表,这将是对我们拥有的每个模型配置的评分 model() 函数的一次调用。
假设def score_model(data, n_test, cfg):
........................
定义任务列表
tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
我们可以使用 Parallel 对象并行执行任务列表。
scores = executor(tasks)