从 dask 数组产生矢量输出

produce vector output from a dask array

我有一个大的 dask 数组 (labeled_arr),它实际上是一个带标签的光栅图像(dtype 是 int64)。我想使用 rasterio 将标记区域转换为多边形并将它们组合成一个多边形列表(或只有一个几何列的地理系列)。这是单个数组上的一项简单任务,但我无法弄清楚如何告诉 dask 我希望它对每个块和 return 不是数组的东西执行此操作。

应用于每个块的函数:

def get_polys(labeled_blocks):
    polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
                                labeled_blocks.astype('int32'), transform=trans))[:-1]
    # Note: rasterio.features.shapes returns an iterator, hence the conversion to a list here
    return polys

试图让 dask 执行此操作的代码行:

test_polygons = da.blockwise(get_polys, '', labeled_arr, 'ij')
test_polygons.compute()

其中 labeled_arr 是输入分块 dask 数组。

运行 return 是一个错误,说我必须为 da.blockwise 指定数据类型。指定 dtype returns 一个 AttributeError 因为输出列表类型没有 dtype 属性。我发现了 meta 关键字,但仍然无法获得将我的输出转换为系列或列表的正确语法。

我不喜欢上述方法,但我的总体目标是:采用带标签的、分块的 dask 数据阵列(它并不完全适合内存),根据每个块的计算提取一个列表,并生成一个串联列表(或 pandas 数据对象),其中包含我原始分块数组中所有块的输出。

这可能有效:

import dask
import dask.array as da

# we expect to see 4 blocks here
test_array = da.random.random((4, 4), chunks=(2, 2))

@dask.delayed
def my_func(block):
    # do something fancy
    return list(block)

results = dask.compute([my_func(x) for x in test_array.to_delayed().ravel()])

如您所述,问题是 list 没有 dtype。一种解决方法是将 list 转换为 np.array,但我不确定这是否适用于所有 geometry 对象(对于 Points,但由于长度不同,多边形可能会出现问题)。由于您对将这些几何图形强制放入数组不感兴趣,因此最好将单个块视为 delayed 对象,一次将它们一个一个地送入您的函数(但按比例缩放 workers/processes)。

这是我最初得到的解决方案,但考虑到 concatenate=True kwarg,它仍然需要大量 RAM。

poss_list = []
def get_polys(labeled_blocks):
    polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
                        labeled_blocks.astype('int32'), transform=trans))[:-1]
    poss_list.append(polys)
        
da.blockwise(get_bergs, '', labeled_arr, 'ij', 
                meta=pd.DataFrame({'c':[]}), concatenate=True).compute()

如果我的解释正确,这不会将块输入到我的函数 workers/processes 中(看来我现在可以逃脱)。

更新 - 使用 dask.delayed 改进了答案,建立在@SultanOrazbayev

接受的答案的基础上
import dask
# onedem = original_xarray_dataarray
poss_list = []

@dask.delayed
def get_bergs(labeled_blocks, pointer, chunk0, chunk1):
            
    # Note: I'm using this in a CRS (polar stereo) with negative y coordinates - it hasn't been tested for other CRSs
    def getpx(chunkid, chunksz):
       amin = chunkid[0] * chunksz[0][0]
       amax = amin + chunksz[0][0]
       bmin = chunkid[1] * chunksz[1][0]
       bmax = bmin + chunksz[1][0]
       return (amin, amax, bmin, bmax)

    # order of all inputs (and outputs) should be y, x when axis order is used
    chunksz = (onedem.chunks['y'], onedem.chunks['x'])
    ymini, ymaxi, xmini, xmaxi = getpx((chunk0, chunk1), chunksz) 

    # use rasterio Windows and rioxarray to construct transform
    # https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#window-transforms
    chwindow = rasterio.windows.Window(xmini, ymini, xmaxi-xmini, ymaxi-ymini) #.from_slices[ymini, ymaxi],[xmini, xmaxi])
    trans = onedem.rio.isel_window(chwindow).rio.transform(recalc=True)

    return list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(labeled_blocks.astype('int32'), transform=trans))[:-1]

        
for __, obj in enumerate(labeled_arr.to_delayed()):
   for bl in obj:
      piece = dask.delayed(get_bergs)(bl, *bl.key)
      poss_list.append(piece)
        
poss_list = dask.compute(*poss_list)
        
# unnest the list of polygons returned by using dask to polygonize
concat_list = [item for sublist in poss_list for item in sublist if len(item)!=0]