joblib.Parallel 用于嵌套列表理解

joblib.Parallel for nested list comprehension

我有一个嵌套列表理解,看起来像这样:

>>> nested = [[1, 2], [3, 4, 5]]
>>> [[sqrt(i) for i in j] for j in nested]
[[1.0, 1.4142135623730951], [1.7320508075688772, 2.0, 2.23606797749979]]

是否可以使用 standard joblib approach for embarrassingly parallel for loops 对其进行并行化?如果是这样,delayed 的正确语法是什么?

据我所知,文档没有提及或给出嵌套输入的任何示例。我尝试了一些天真的实现,但无济于事:

>>> #this syntax fails:
>>> Parallel(n_jobs = 2) (delayed(sqrt)(i for i in j) for j in nested)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Python27\lib\site-packages\joblib\parallel.py", line 660, in __call__
    self.retrieve()
  File "C:\Python27\lib\site-packages\joblib\parallel.py", line 512, in retrieve
    self._output.append(job.get())
  File "C:\Python27\lib\multiprocessing\pool.py", line 558, in get
    raise self._value
pickle.PicklingError: Can't pickle <type 'generator'>: it's not found as __builtin__.generator
>>> #this syntax doesn't fail, but gives the wrong output:
>>> Parallel(n_jobs = 2) (delayed(sqrt)(i) for i in j for j in nested)
[1.7320508075688772, 1.7320508075688772, 2.0, 2.0, 2.23606797749979, 2.23606797749979]

如果这是不可能的,我显然可以在将列表传递给 Parallel 之前和之后重组列表。但是,我的实际列表很长,每一项都很大,所以这样做并不理想。

据我了解,您不能在该位置输入表达式(如 (i for i in j))- 您将函数的参数放在那里。您可以通过编写一个对列表进行解包的函数来实现您想要的,例如这里:

def sqrt_n(j):
   return [i**i for i in j]

Parallel(n_jobs = 2) (delayed(sqrt_n)(j) for j in nested)

我不太确定你第二次尝试时发生了什么,但我很清楚第一次尝试: sqrt(i for i in j) 后面括号中的表达式生成一个 "generator" 对象,该对象被传递到并行处理管道。不幸的是,生成器的输出可能依赖于之前的调用。在您的情况下,它会在每次调用时提供 j 的下一个元素,但它也可能会进行一些内部计算,这意味着不同的过程相互依赖,并且您的结果可能会依赖于顺序其中执行并行进程。因此,multiprocessing 图书馆拒绝继续。

正如我所说,我不太确定第二个示例中发生了什么,但可能只是您不小心欺骗了 multiprocessing,使其完全按照第一个示例中试图避免的方式进行操作。

可能的解决方案:

1:分离迭代层次

...例如,正如 j_n 所建议的那样,通过定义一个将在低级列表上迭代的函数。这很容易实现,但可能无法从并行化中获得太多好处,具体取决于各个列表的长度。 对外部循环使用非并行列表理解但对内部循环或什至两者并行化也可能是一种选择——这是否有用在很大程度上取决于数据的结构。

2:迭代嵌套列表的线性化版本

这样,每次执行都是并行完成的,但这意味着您需要先 "flatten" 列表,然后再重组它。

如果您的嵌套列表结构规则(即,如果它包含 n 个列表,每个列表有 m 个元素:

,这很容易

从嵌套列表中创建一个 numpy array,如下所示:

import numpy as np
# convert to array -- only works well if you have a regular structure!
nested_arr = np.array(nested)
# the shape of the array, for later
shape = nested_arr.shape

# generate an (n*m) linear array from an (n, m) 2D one
linear = nested_arr.ravel()

# run the parallel calculation
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linear)

# get everything back into shape:
results = results_lin.reshape(shape)

实际上,这可能更简单,因为 np.nditer() 在多维数组上按元素迭代。不过,我不确定它是否会与 joblibmultiprocessing 合作。 如果您有常规数据(并且您真的只想做比求平方根更复杂的事情),您还应该考虑简单地使用 np.sqrt(nested_arr) - 这比迭代数字列表和平方要快得多它们分别按数量级排列!

如果您的嵌套列表是不规则的,线性化会变得更加复杂:

# store lengths of the sub-lists
structure = [len(e) for e in nested]

# make one linear list
linlist = []
for l in nested:
    linlist.extend(l)

# finally run the parallel computation:
results_lin = Parallel(n_jobs = 2) (delayed(sqrt)(e) for e in linlist)

# ...and bring it all back into shape:
results = []
i = 0

for n in structure:
    results.append(results_lin[i:i+n])

这一切是否再次有意义取决于您处理的数据量以及列表的复杂性。对于您的简单示例,显然排序将比计算平方根花费更长的时间。

你真的需要并行化吗?

如果您所做的只是对大量数字进行简单的数学运算,请考虑使用 np.array。您可以将数组放入大多数方程式中,就好像它们是数字一样,并且计算速度 运行 快得多:

In [14]: time resl = [sqrt(e) for e in range(1000000)]
CPU times: user 2.1 s, sys: 194 ms, total: 2.29 s
Wall time: 2.19 s

In [15]: time res = np.sqrt(np.arange(1000000))
CPU times: user 10.4 ms, sys: 0 ns, total: 10.4 ms
Wall time: 10.1 ms

比列表上的操作可以加速到的速度 ,甚至 运行 在 24 核上并行。 (事实上​​ ,您需要大约 216 个并行进程才能跟上 numpy,而且我确信 mutliprocessing 将负载分配给那么多进程的计算工作无论如何都会使尝试失败。