从 dask 数组产生矢量输出
produce vector output from a dask array
我有一个大的 dask 数组 (labeled_arr
),它实际上是一个带标签的光栅图像(dtype 是 int64)。我想使用 rasterio 将标记区域转换为多边形并将它们组合成一个多边形列表(或只有一个几何列的地理系列)。这是单个数组上的一项简单任务,但我无法弄清楚如何告诉 dask 我希望它对每个块和 return 不是数组的东西执行此操作。
应用于每个块的函数:
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'), transform=trans))[:-1]
# Note: rasterio.features.shapes returns an iterator, hence the conversion to a list here
return polys
试图让 dask 执行此操作的代码行:
test_polygons = da.blockwise(get_polys, '', labeled_arr, 'ij')
test_polygons.compute()
其中 labeled_arr
是输入分块 dask 数组。
运行 return 是一个错误,说我必须为 da.blockwise
指定数据类型。指定 dtype returns 一个 AttributeError 因为输出列表类型没有 dtype 属性。我发现了 meta
关键字,但仍然无法获得将我的输出转换为系列或列表的正确语法。
我不喜欢上述方法,但我的总体目标是:采用带标签的、分块的 dask 数据阵列(它并不完全适合内存),根据每个块的计算提取一个列表,并生成一个串联列表(或 pandas 数据对象),其中包含我原始分块数组中所有块的输出。
这可能有效:
import dask
import dask.array as da
# we expect to see 4 blocks here
test_array = da.random.random((4, 4), chunks=(2, 2))
@dask.delayed
def my_func(block):
# do something fancy
return list(block)
results = dask.compute([my_func(x) for x in test_array.to_delayed().ravel()])
如您所述,问题是 list
没有 dtype
。一种解决方法是将 list
转换为 np.array
,但我不确定这是否适用于所有 geometry
对象(对于 Points
,但由于长度不同,多边形可能会出现问题)。由于您对将这些几何图形强制放入数组不感兴趣,因此最好将单个块视为 delayed
对象,一次将它们一个一个地送入您的函数(但按比例缩放 workers/processes)。
这是我最初得到的解决方案,但考虑到 concatenate=True kwarg,它仍然需要大量 RAM。
poss_list = []
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'), transform=trans))[:-1]
poss_list.append(polys)
da.blockwise(get_bergs, '', labeled_arr, 'ij',
meta=pd.DataFrame({'c':[]}), concatenate=True).compute()
如果我的解释正确,这不会将块输入到我的函数 workers/processes 中(看来我现在可以逃脱)。
更新 - 使用 dask.delayed 改进了答案,建立在@SultanOrazbayev
接受的答案的基础上
import dask
# onedem = original_xarray_dataarray
poss_list = []
@dask.delayed
def get_bergs(labeled_blocks, pointer, chunk0, chunk1):
# Note: I'm using this in a CRS (polar stereo) with negative y coordinates - it hasn't been tested for other CRSs
def getpx(chunkid, chunksz):
amin = chunkid[0] * chunksz[0][0]
amax = amin + chunksz[0][0]
bmin = chunkid[1] * chunksz[1][0]
bmax = bmin + chunksz[1][0]
return (amin, amax, bmin, bmax)
# order of all inputs (and outputs) should be y, x when axis order is used
chunksz = (onedem.chunks['y'], onedem.chunks['x'])
ymini, ymaxi, xmini, xmaxi = getpx((chunk0, chunk1), chunksz)
# use rasterio Windows and rioxarray to construct transform
# https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#window-transforms
chwindow = rasterio.windows.Window(xmini, ymini, xmaxi-xmini, ymaxi-ymini) #.from_slices[ymini, ymaxi],[xmini, xmaxi])
trans = onedem.rio.isel_window(chwindow).rio.transform(recalc=True)
return list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(labeled_blocks.astype('int32'), transform=trans))[:-1]
for __, obj in enumerate(labeled_arr.to_delayed()):
for bl in obj:
piece = dask.delayed(get_bergs)(bl, *bl.key)
poss_list.append(piece)
poss_list = dask.compute(*poss_list)
# unnest the list of polygons returned by using dask to polygonize
concat_list = [item for sublist in poss_list for item in sublist if len(item)!=0]
我有一个大的 dask 数组 (labeled_arr
),它实际上是一个带标签的光栅图像(dtype 是 int64)。我想使用 rasterio 将标记区域转换为多边形并将它们组合成一个多边形列表(或只有一个几何列的地理系列)。这是单个数组上的一项简单任务,但我无法弄清楚如何告诉 dask 我希望它对每个块和 return 不是数组的东西执行此操作。
应用于每个块的函数:
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'), transform=trans))[:-1]
# Note: rasterio.features.shapes returns an iterator, hence the conversion to a list here
return polys
试图让 dask 执行此操作的代码行:
test_polygons = da.blockwise(get_polys, '', labeled_arr, 'ij')
test_polygons.compute()
其中 labeled_arr
是输入分块 dask 数组。
运行 return 是一个错误,说我必须为 da.blockwise
指定数据类型。指定 dtype returns 一个 AttributeError 因为输出列表类型没有 dtype 属性。我发现了 meta
关键字,但仍然无法获得将我的输出转换为系列或列表的正确语法。
我不喜欢上述方法,但我的总体目标是:采用带标签的、分块的 dask 数据阵列(它并不完全适合内存),根据每个块的计算提取一个列表,并生成一个串联列表(或 pandas 数据对象),其中包含我原始分块数组中所有块的输出。
这可能有效:
import dask
import dask.array as da
# we expect to see 4 blocks here
test_array = da.random.random((4, 4), chunks=(2, 2))
@dask.delayed
def my_func(block):
# do something fancy
return list(block)
results = dask.compute([my_func(x) for x in test_array.to_delayed().ravel()])
如您所述,问题是 list
没有 dtype
。一种解决方法是将 list
转换为 np.array
,但我不确定这是否适用于所有 geometry
对象(对于 Points
,但由于长度不同,多边形可能会出现问题)。由于您对将这些几何图形强制放入数组不感兴趣,因此最好将单个块视为 delayed
对象,一次将它们一个一个地送入您的函数(但按比例缩放 workers/processes)。
这是我最初得到的解决方案,但考虑到 concatenate=True kwarg,它仍然需要大量 RAM。
poss_list = []
def get_polys(labeled_blocks):
polys = list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(
labeled_blocks.astype('int32'), transform=trans))[:-1]
poss_list.append(polys)
da.blockwise(get_bergs, '', labeled_arr, 'ij',
meta=pd.DataFrame({'c':[]}), concatenate=True).compute()
如果我的解释正确,这不会将块输入到我的函数 workers/processes 中(看来我现在可以逃脱)。
更新 - 使用 dask.delayed 改进了答案,建立在@SultanOrazbayev
接受的答案的基础上import dask
# onedem = original_xarray_dataarray
poss_list = []
@dask.delayed
def get_bergs(labeled_blocks, pointer, chunk0, chunk1):
# Note: I'm using this in a CRS (polar stereo) with negative y coordinates - it hasn't been tested for other CRSs
def getpx(chunkid, chunksz):
amin = chunkid[0] * chunksz[0][0]
amax = amin + chunksz[0][0]
bmin = chunkid[1] * chunksz[1][0]
bmax = bmin + chunksz[1][0]
return (amin, amax, bmin, bmax)
# order of all inputs (and outputs) should be y, x when axis order is used
chunksz = (onedem.chunks['y'], onedem.chunks['x'])
ymini, ymaxi, xmini, xmaxi = getpx((chunk0, chunk1), chunksz)
# use rasterio Windows and rioxarray to construct transform
# https://rasterio.readthedocs.io/en/latest/topics/windowed-rw.html#window-transforms
chwindow = rasterio.windows.Window(xmini, ymini, xmaxi-xmini, ymaxi-ymini) #.from_slices[ymini, ymaxi],[xmini, xmaxi])
trans = onedem.rio.isel_window(chwindow).rio.transform(recalc=True)
return list(poly[0]['coordinates'][0] for poly in rasterio.features.shapes(labeled_blocks.astype('int32'), transform=trans))[:-1]
for __, obj in enumerate(labeled_arr.to_delayed()):
for bl in obj:
piece = dask.delayed(get_bergs)(bl, *bl.key)
poss_list.append(piece)
poss_list = dask.compute(*poss_list)
# unnest the list of polygons returned by using dask to polygonize
concat_list = [item for sublist in poss_list for item in sublist if len(item)!=0]