如何使用 Dask 编写模板

How to programm a stencil with Dask

在很多情况下,科学家们使用 Stencil 模拟系统的动力学,这是在网格上卷积数学运算符。通常,此操作会消耗大量计算资源。 Here 很好地解释了这个想法。

在 numpy 中,编写 2D 5 点模板的规范方法如下:

for i in range(rows):
    for j in range(cols):
        grid[i, j] = ( grid[i,j] + grid[i-1,j] + grid[i+1,j] + grid[i,j-1] + grid[i,j+1]) / 5

或者更有效地使用切片:

grid[1:-1,1:-1] = ( grid[1:-1,1:-1] + grid[0:-2,1:-1] + grid[2:,1:-1] + grid[1:-1,0:-2] + grid[1:-1,2:] ) / 5

但是,如果你的网格很大,它不会固定在你的内存中,或者如果卷积操作真的很复杂,它会花费很长时间,并行编程技术可以解决这个问题或者简单地更快地得到结果。 Dask 等工具允许科学家以并行几乎透明的方式自行编写此模拟程序。目前,Dask 不支持项目分配,因此,我如何使用 Dask 编写模板。

问得好。 dask.array do 提供并行计算是正确的,但 don't 不支持项目分配。我们可以通过创建一个函数来一次对一块 numpy 数据进行操作,然后将该函数映射到我们的数组中,边界稍微重叠,从而解决模板计算问题。

纯函数

您应该创建一个函数,该函数接受一个 numpy 数组和 returns 一个应用了模板的新 numpy 数组。这应该不会修改原始数组。

def apply_stencil(x):
    out = np.empty_like(x)
    ...  # do arbitrary computations on out    
    return out

映射具有重叠区域的函数

Dask 数组通过将数组分解为不相交的较小数组块来并行运行。模板计算等操作需要相邻块之间有一些重叠。幸运的是,这可以通过 dask.array.ghost module, and the dask.array.map_overlap 方法来处理。

实际上,map_overlap 文档字符串中的示例是一维正向有限差分计算

>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = from_array(x, chunks=5)
>>> def derivative(x):
...     return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1,  0,  1,  1,  0,  0, -1, -1,  0])

Dask 在内部将数组划分为更小的 numpy 数组,当您使用 dask.array 创建数组时,您必须提供一些有关如何将其划分为 的信息,例如这个:

grid = dask.array.zeros((100,100), chunks=(50,50))

请求一个 100 x 100 的数组,分为 4 个块。现在,要对新创建的数组进行卷积运算,必须共享块的边界信息。 Dask ghost cells,处理这样的情况。

一个常见的工作流程意味着:

  1. 正在创建数组(如果之前不存在)
  2. 命令创建幽灵细胞
  3. 映射计算
  4. 修剪边框

例如,

import dask.array as da
grid = da.zeros((100,100), chunks=(50,50))
g = da.ghost.ghost(grid, depth={0:1,1:1}, boundary={0:0,1:1})
g2 = g.map_blocks( some_function ) 
s = da.ghost.trim_internals(g2, {0:1,1:1})
s.compute()

请记住,Dask 创建了一个字典来表示任务图,真正的计算是由 s.compute() 触发的。正如 MRocklin 所指出的,映射函数必须 return 一个 numpy 数组。

关于调度程序的说明

默认情况下,dask.array使用dask.theated调度器来提升性能,但是一旦共享了信息,类似于stencil的问题就很难并行了,这意味着没有资源和信息必须共享,并且计算可以映射到不同的核心甚至不同的计算机。为此,可以指示 dask 使用不同的调度程序,例如 dask.multiprocessing:

import dask.multiprocessing
import dask

dask.set_options(get=dask.multiprocessing.get)

compute() 被触发时,Dask 将创建多个 python 实例,如果您的应用程序足够大以支付创建这个新实例的开销,也许 dask.multiprocessing 会交付更好的表现。 有关 Dask 调度程序的更多信息,请参见 here