在 dask.array 和 gil 锁内循环

Loop within dask.array and gil lock

GIL 锁是否会显着降低以下代码的性能?

每个块的函数使用 python 循环而不是 numpy 函数。由于外部库,我必须使用 python 循环。

测试代码:

import numpy as np
import dask.array as da
import dask.sharedict as sharedict
from itertools import product


def block_func(block):
    for i in range(len(block)):  # <--- the python loop ...
        block[i] += 1
    return block


def darr_func(x, name='test'):
    dsk = {}
    for idx in product(*map(range, x.numblocks)):
        dsk[(name,) + idx] = (block_func, (x.name,) + idx)
    dsk2 = sharedict.merge((name, dsk), x.dask)
    return da.Array(dsk2, name, x.chunks, x.dtype)


def main():
    n = 1000
    chunks = 100
    arr = np.arange(n*n).reshape(n, n)
    darr = da.from_array(arr, chunks=chunks)
    result = darr_func(darr)
    print(result.compute())


main()

如果是这样,设置调度程序的上下文是否有帮助? 如何通过 dask 数组为函数设置上下文?我想使用默认的 dask 调度程序对 dask 数组进行其他操作。

从 wiki 上,我看到了为计算而不是函数设置调度程序的方法:

# As a context manager
>>> with dask.set_options(get=dask.multiprocessing.get):
...     x.sum().compute()

# Set globally
>>> dask.set_options(get=dask.multiprocessing.get)
>>> x.sum().compute()

Python for 循环不释放 GIL,因此很难与线程并行化。在这种情况下,您有几个选择

  1. 使用 Numba 或 Cython 等项目编写释放 GIL 的 for 循环代码
  2. 使用将计算拆分到多个进程的调度程序。我个人的建议是在本地使用 dask.distributed 调度器,这可以通过 运行 下面两行来完成:

    from dask.distributed import Client
    client = Client()
    

然而,您应该一如既往地分析您的代码并尝试一些事情。上面给出的建议取决于 许多 因素。例如,如果循环体释放 GIL,Python for 循环可能不是问题。