使用 dask 访问幻影块

Accessing ghosted chunks with dask

我想使用 dask 将图像数组分解成重叠的图块,执行计算(同时对所有图块),然后将结果拼接回图像。

以下工作正常,但感觉笨拙:

from dask import array as da
from dask.array import ghost

import numpy as np


test_data = np.random.random((50, 50))
x = da.from_array(test_data, chunks=(10, 10))

depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')

# Calculate the shape of the array in terms of chunks
chunk_shape = [len(c) for c in g.chunks]
chunk_nr = np.prod(chunk_shape)

# Allocate a list for results (as many entries as there are chunks)
blocks = [None,] * chunk_nr

def pack_block(block, block_id):
    """Store `block` at the correct position in `blocks`,
    according to its `block_id`.

    E.g., with ``block_id == (0, 3)``, the block will be stored at
    ``blocks[3]`.
    """
    idx = np.ravel_multi_index(block_id, chunk_shape)
    blocks[idx] = block

    # We don't really need to return anything, but this will do
    return block

g.map_blocks(pack_block).compute()

# Do some operation on the blocks; this is an over-simplified example.
# Typically, I want to do an operation that considers *all*
# blocks simultaneously, hence the need to first unpack into a list.
blocks = [b**2 for b in blocks]

def retrieve_block(_, block_id):
    """Fetch the correct block from the results set, `blocks`.
    """
    idx = np.ravel_multi_index(block_id, chunk_shape)
    return blocks[idx]

result = g.map_blocks(retrieve_block)

# Slice off excess from each computed chunk
result = ghost.trim_internal(result, depth)
result = result.compute()

是否有更简洁的方法来达到相同的最终结果?

面向用户的api是map_overlap方法

>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = da.from_array(x, chunks=5)
>>> def derivative(x):
...     return x - np.roll(x, 1)

>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1,  0,  1,  1,  0,  0, -1, -1,  0])

关于您的用例的两个附加说明

  1. 通过向 from_array 提供 name=False 来避免散列成本。假设您周围没有任何花哨的哈希库,这将为您节省大约 400MB/s。

    x = da.from_array(x, name=False)
    
  2. 小心就地计算。如果用户函数就地改变数据,Dask 不保证正确的行为。在这种特殊情况下,它可能没问题,因为无论如何我们都是为了重影而复制,但这是需要注意的事情。

第二个回答

鉴于@stefan-van-der-walt 的评论,我们将尝试另一种解决方案。

考虑使用 .to_delayed() method to get an array of chunks as dask.delayed 对象

depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')
blocks = g.todelayed()

这为您提供了一个 dask.delayed 对象的 numpy 数组,每个对象都指向一个块。您现在可以对这些块执行任意并行计算。如果我希望它们都达到相同的功能,那么我可能会调用以下内容:

result = dask.delayed(f)(blocks.tolist())

然后函数f会得到一个numpy数组列表的列表,每个数组对应dask.array g.

中的一个块