同时将 xarray 数据集写入 zarr - 如何使用 dask 分布式进行有效扩展
Concurrently write xarray datasets to zarr - how to efficiently scale with dask distributed
TLDR:
我如何有效地使用 dask-distributed
将大量 dask
支持的 xarray
数据集写入 AWS S3 上的 zarr
存储?
详情:
我有一个工作流,它获取 S3 上的栅格数据集列表并生成一个支持 dask 数组的 xarray 数据集。
我需要遍历多个组,对于每个组,工作流获取属于该组的栅格数据集并生成相应的 xarray 数据集。
现在我想将数据集中的数据写入 S3 上的一个 zarr 存储(同一个存储,只是使用 group
参数)。
顺序处理的伪代码如下所示:
client = Client(...) # using a distributed cluster
zarr_store = fsspec.get_mapper("s3://bucket/key.zarr")
for group_select in groups:
xr_dataset = get_dataset_for_group(group_select)
# totally unnecessary, just to illustrate that this is a lazy dataset, nothing has been loaded yet
assert dask.is_dask_collection(xr_dataset)
xr_dataset.to_zarr(zarr_store, group=group_select)
这非常有效,一旦执行 to_zarr
,数据就会加载并存储在 S3 上,任务 运行ning 并行。
现在我想 运行 使用 dask.distribuited
并行执行此操作。这是我尝试过的和遇到的问题:
1.使用.to_zarr(..., compute=False)
收集延迟任务列表
这在原则上是可行的,但速度很慢。创建一个任务大约需要 3-4 秒,我需要 运行 这 100 多次,需要 4-5 分钟才能真正开始任何计算。
2. 包装成 dask.delayed
这极大地加快了任务的创建速度,但是写入 zarr 存储的操作并没有在工作人员之间分配,而是处理任务的工作人员在加载任务完成后收集所有数据并将其写入 zarr .
3. 将 to_zarr
包装在自定义函数中并将其传递给 client.submit
这看起来是最有希望的选择。我刚刚将 to_zarr
调用包装在自定义函数中,可以从工作人员调用:
def dump(ds, target, group=None):
with worker_client() as client:
ds.to_zarr(store=target, group=group)
return True
使用 worker_client
执行此操作会将写入任务返回给调度程序并解决我在上面使用 dask.delayed
.
遇到的问题
然而,当我按照
的方式重复提交这个函数(我需要这样做100+次)
futures = [client.submit(dump, x, target, g) for x,g in zip(datasets, groups)]
我很快就用要处理的任务压垮了调度程序。
我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个数据集完成后才开始一个新的数据集。但是有没有更优雅的解决方案呢?还是 dask(分布式)中有内置功能?
在我的 experience/environment 中,很容易让调度器被太多的任务压垮(还有太多的工作人员无法协调),所以将事情分成批次通常是可行的。
要创建一个移动的工作队列,您可以在每次完成另一个任务时使用 as_completed
、submitting/adding 个任务。查看这些相关答案: and .
TLDR:
我如何有效地使用 dask-distributed
将大量 dask
支持的 xarray
数据集写入 AWS S3 上的 zarr
存储?
详情:
我有一个工作流,它获取 S3 上的栅格数据集列表并生成一个支持 dask 数组的 xarray 数据集。
我需要遍历多个组,对于每个组,工作流获取属于该组的栅格数据集并生成相应的 xarray 数据集。
现在我想将数据集中的数据写入 S3 上的一个 zarr 存储(同一个存储,只是使用 group
参数)。
顺序处理的伪代码如下所示:
client = Client(...) # using a distributed cluster
zarr_store = fsspec.get_mapper("s3://bucket/key.zarr")
for group_select in groups:
xr_dataset = get_dataset_for_group(group_select)
# totally unnecessary, just to illustrate that this is a lazy dataset, nothing has been loaded yet
assert dask.is_dask_collection(xr_dataset)
xr_dataset.to_zarr(zarr_store, group=group_select)
这非常有效,一旦执行 to_zarr
,数据就会加载并存储在 S3 上,任务 运行ning 并行。
现在我想 运行 使用 dask.distribuited
并行执行此操作。这是我尝试过的和遇到的问题:
1.使用.to_zarr(..., compute=False)
收集延迟任务列表
这在原则上是可行的,但速度很慢。创建一个任务大约需要 3-4 秒,我需要 运行 这 100 多次,需要 4-5 分钟才能真正开始任何计算。
2. 包装成 dask.delayed
这极大地加快了任务的创建速度,但是写入 zarr 存储的操作并没有在工作人员之间分配,而是处理任务的工作人员在加载任务完成后收集所有数据并将其写入 zarr .
3. 将 to_zarr
包装在自定义函数中并将其传递给 client.submit
这看起来是最有希望的选择。我刚刚将 to_zarr
调用包装在自定义函数中,可以从工作人员调用:
def dump(ds, target, group=None):
with worker_client() as client:
ds.to_zarr(store=target, group=group)
return True
使用 worker_client
执行此操作会将写入任务返回给调度程序并解决我在上面使用 dask.delayed
.
然而,当我按照
的方式重复提交这个函数(我需要这样做100+次)futures = [client.submit(dump, x, target, g) for x,g in zip(datasets, groups)]
我很快就用要处理的任务压垮了调度程序。
我能想到的唯一明显的解决方案是分批拆分数据集,只有在前一个数据集完成后才开始一个新的数据集。但是有没有更优雅的解决方案呢?还是 dask(分布式)中有内置功能?
在我的 experience/environment 中,很容易让调度器被太多的任务压垮(还有太多的工作人员无法协调),所以将事情分成批次通常是可行的。
要创建一个移动的工作队列,您可以在每次完成另一个任务时使用 as_completed
、submitting/adding 个任务。查看这些相关答案: