delayed() 函数有什么作用(在 Python 中与 joblib 一起使用时)

What does the delayed() function do (when used with joblib in Python)

我已通读 documentation,但我不明白以下内容的含义: The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.

我正在使用它来迭代我要操作的列表 (allImages),如下所示:

def joblib_loop():
    Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)

这个 returns 我的 HOG 功能,就像我想要的那样(并且使用我所有的 8 个内核提高了速度),但我只是不确定它实际上在做什么。

我的 Python 知识充其量不过,很可能我遗漏了一些基本知识。任何正确方向的指示将不胜感激

参考资料https://wiki.python.org/moin/ParallelProcessing Parallel 对象创建一个多处理池,它在多个进程中派生 Python 解释器来执行列表中的每一项。延迟函数是一个简单的技巧,可以使用函数调用语法创建元组 (function, args, kwargs)。

我想建议的另一件事是,我们可以像这样概括而不是明确定义核心数:

import multiprocessing
num_core=multiprocessing.cpu_count()

因此,您希望能够做的是堆积一组函数调用及其参数,以便您可以将它们有效地传递给 scheduler/executor。 Delayed 是一个 decorator,它接受一个函数及其参数,并将它们包装到一个对象中,该对象可以放入列表中并根据需要弹出。 Dask 有同样的东西,它部分使用它来提供给它的图形调度程序。

如果我们看看如果我们简单地写下会发生什么,事情可能会变得更清楚

Parallel(n_jobs=8)(getHog(i) for i in allImages)

在这种情况下,可以更自然地表达为:

  1. 使用 n_jobs=8
  2. 创建一个 Parallel 实例
  3. 创建列表[getHog(i) for i in allImages]
  4. 将该列表传递给 Parallel 实例

有什么问题吗?当列表传递给 Parallel 对象时,所有 getHog(i) 调用都已经返回 - 所以没有什么可以并行执行的了!所有工作都已在主线程中按顺序完成。

我们实际上想要的是告诉Python我们想用什么参数调用什么函数,而不实际调用它们 - 换句话说,我们想要延迟执行。

这就是 delayed 允许我们使用清晰的语法方便地做的事情。如果我们想告诉 Python 我们想稍后调用 foo(2, g=3),我们可以简单地写成 delayed(foo)(2, g=3)。返回的是元组(foo, [2], {g: 3}),包含:

  • 对我们要调用的函数的引用,例如foo
  • 所有arguments(简称“args”)没有关键字,e.g.t 2
  • 所有关键字参数(简称“kwargs”),例如g=3

所以,通过写 Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages),而不是上面的序列,现在会发生以下情况:

  1. 创建了 Parallel 个具有 n_jobs=8 的实例

  2. 榜单

     [delayed(getHog)(i) for i in allImages]
    

    已创建,评估为

     [(getHog, [img1], {}), (getHog, [img2], {}), ... ]
    
  3. 该列表传递给 Parallel 实例

  4. Parallel实例创建8个线程并将列表中的元组分配给它们

  5. 最后,这些线程中的每一个都开始执行元组,即,它们调用第一个元素,将第二个和第三个元素解压为参数 tup[0](*tup[1], **tup[2]),将元组转回我们实际打算做的调用,getHog(img2).

我们需要一个循环来测试不同模型配置的列表。这是驱动网格搜索过程的主要函数,将为每个模型配置调用 score_model() 函数。我们可以通过并行评估模型配置来显着加快网格搜索过程。一种方法是使用 Joblib 库。我们可以使用要使用的内核数定义一个 Parallel 对象,并将其设置为在您的硬件中检测到的分数数。

定义执行器

executor = Parallel(n_jobs=cpu_count(), backend= 'multiprocessing' )

然后创建一个要并行执行的任务列表,这将是对我们拥有的每个模型配置的评分 model() 函数的一次调用。

假设def score_model(data, n_test, cfg): ........................

定义任务列表

tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)

我们可以使用 Parallel 对象并行执行任务列表。

scores = executor(tasks)