将 dask 集合异步存储到 files/CSV

Storing dask collection to files/CSV asynchronously

我正在使用 dask.distributed 实现各种数据处理管道。通常原始数据从 S3 读取,最后处理的(大)集合也将写入 S3 上的 CSV。

我可以 运行 异步处理并监控进度,但我注意到所有将集合存储到文件的 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。第二,我不能轻易构造一个完整的图来稍后执行。

有没有办法 运行 例如to_csv() 异步获取未来对象而不是阻塞?

PS:我很确定我可以自己实现异步存储,例如通过将集合转换为 delayed() 并存储每个分区。但这似乎是一个常见的情况 - 除非我错过了现有的功能,否则在框架中包含这样的东西会很好。

大多数 to_* 函数都有一个 compute=True 关键字参数,可以用 compute=False 替换。在这些情况下,它将 return 一系列延迟值,然后您可以异步计算这些延迟值

values = df.to_csv('s3://...', compute=False)
futures = client.compute(values)