使用 dask 访问幻影块
Accessing ghosted chunks with dask
我想使用 dask 将图像数组分解成重叠的图块,执行计算(同时对所有图块),然后将结果拼接回图像。
以下工作正常,但感觉笨拙:
from dask import array as da
from dask.array import ghost
import numpy as np
test_data = np.random.random((50, 50))
x = da.from_array(test_data, chunks=(10, 10))
depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')
# Calculate the shape of the array in terms of chunks
chunk_shape = [len(c) for c in g.chunks]
chunk_nr = np.prod(chunk_shape)
# Allocate a list for results (as many entries as there are chunks)
blocks = [None,] * chunk_nr
def pack_block(block, block_id):
"""Store `block` at the correct position in `blocks`,
according to its `block_id`.
E.g., with ``block_id == (0, 3)``, the block will be stored at
``blocks[3]`.
"""
idx = np.ravel_multi_index(block_id, chunk_shape)
blocks[idx] = block
# We don't really need to return anything, but this will do
return block
g.map_blocks(pack_block).compute()
# Do some operation on the blocks; this is an over-simplified example.
# Typically, I want to do an operation that considers *all*
# blocks simultaneously, hence the need to first unpack into a list.
blocks = [b**2 for b in blocks]
def retrieve_block(_, block_id):
"""Fetch the correct block from the results set, `blocks`.
"""
idx = np.ravel_multi_index(block_id, chunk_shape)
return blocks[idx]
result = g.map_blocks(retrieve_block)
# Slice off excess from each computed chunk
result = ghost.trim_internal(result, depth)
result = result.compute()
是否有更简洁的方法来达到相同的最终结果?
面向用户的api是map_overlap方法
>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = da.from_array(x, chunks=5)
>>> def derivative(x):
... return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1, 0, 1, 1, 0, 0, -1, -1, 0])
关于您的用例的两个附加说明
通过向 from_array 提供 name=False 来避免散列成本。假设您周围没有任何花哨的哈希库,这将为您节省大约 400MB/s。
x = da.from_array(x, name=False)
小心就地计算。如果用户函数就地改变数据,Dask 不保证正确的行为。在这种特殊情况下,它可能没问题,因为无论如何我们都是为了重影而复制,但这是需要注意的事情。
第二个回答
鉴于@stefan-van-der-walt 的评论,我们将尝试另一种解决方案。
考虑使用 .to_delayed() method to get an array of chunks as dask.delayed 对象
depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')
blocks = g.todelayed()
这为您提供了一个 dask.delayed 对象的 numpy 数组,每个对象都指向一个块。您现在可以对这些块执行任意并行计算。如果我希望它们都达到相同的功能,那么我可能会调用以下内容:
result = dask.delayed(f)(blocks.tolist())
然后函数f
会得到一个numpy数组列表的列表,每个数组对应dask.array g
.
中的一个块
我想使用 dask 将图像数组分解成重叠的图块,执行计算(同时对所有图块),然后将结果拼接回图像。
以下工作正常,但感觉笨拙:
from dask import array as da
from dask.array import ghost
import numpy as np
test_data = np.random.random((50, 50))
x = da.from_array(test_data, chunks=(10, 10))
depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')
# Calculate the shape of the array in terms of chunks
chunk_shape = [len(c) for c in g.chunks]
chunk_nr = np.prod(chunk_shape)
# Allocate a list for results (as many entries as there are chunks)
blocks = [None,] * chunk_nr
def pack_block(block, block_id):
"""Store `block` at the correct position in `blocks`,
according to its `block_id`.
E.g., with ``block_id == (0, 3)``, the block will be stored at
``blocks[3]`.
"""
idx = np.ravel_multi_index(block_id, chunk_shape)
blocks[idx] = block
# We don't really need to return anything, but this will do
return block
g.map_blocks(pack_block).compute()
# Do some operation on the blocks; this is an over-simplified example.
# Typically, I want to do an operation that considers *all*
# blocks simultaneously, hence the need to first unpack into a list.
blocks = [b**2 for b in blocks]
def retrieve_block(_, block_id):
"""Fetch the correct block from the results set, `blocks`.
"""
idx = np.ravel_multi_index(block_id, chunk_shape)
return blocks[idx]
result = g.map_blocks(retrieve_block)
# Slice off excess from each computed chunk
result = ghost.trim_internal(result, depth)
result = result.compute()
是否有更简洁的方法来达到相同的最终结果?
面向用户的api是map_overlap方法
>>> x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
>>> x = da.from_array(x, chunks=5)
>>> def derivative(x):
... return x - np.roll(x, 1)
>>> y = x.map_overlap(derivative, depth=1, boundary=0)
>>> y.compute()
array([ 1, 0, 1, 1, 0, 0, -1, -1, 0])
关于您的用例的两个附加说明
通过向 from_array 提供 name=False 来避免散列成本。假设您周围没有任何花哨的哈希库,这将为您节省大约 400MB/s。
x = da.from_array(x, name=False)
小心就地计算。如果用户函数就地改变数据,Dask 不保证正确的行为。在这种特殊情况下,它可能没问题,因为无论如何我们都是为了重影而复制,但这是需要注意的事情。
第二个回答
鉴于@stefan-van-der-walt 的评论,我们将尝试另一种解决方案。
考虑使用 .to_delayed() method to get an array of chunks as dask.delayed 对象
depth = {0: 1, 1: 1}
g = ghost.ghost(x, depth=depth, boundary='reflect')
blocks = g.todelayed()
这为您提供了一个 dask.delayed 对象的 numpy 数组,每个对象都指向一个块。您现在可以对这些块执行任意并行计算。如果我希望它们都达到相同的功能,那么我可能会调用以下内容:
result = dask.delayed(f)(blocks.tolist())
然后函数f
会得到一个numpy数组列表的列表,每个数组对应dask.array g
.