将 dask 集合异步存储到 files/CSV
Storing dask collection to files/CSV asynchronously
我正在使用 dask.distributed 实现各种数据处理管道。通常原始数据从 S3 读取,最后处理的(大)集合也将写入 S3 上的 CSV。
我可以 运行 异步处理并监控进度,但我注意到所有将集合存储到文件的 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。第二,我不能轻易构造一个完整的图来稍后执行。
有没有办法 运行 例如to_csv() 异步获取未来对象而不是阻塞?
PS:我很确定我可以自己实现异步存储,例如通过将集合转换为 delayed() 并存储每个分区。但这似乎是一个常见的情况 - 除非我错过了现有的功能,否则在框架中包含这样的东西会很好。
大多数 to_*
函数都有一个 compute=True
关键字参数,可以用 compute=False
替换。在这些情况下,它将 return 一系列延迟值,然后您可以异步计算这些延迟值
values = df.to_csv('s3://...', compute=False)
futures = client.compute(values)
我正在使用 dask.distributed 实现各种数据处理管道。通常原始数据从 S3 读取,最后处理的(大)集合也将写入 S3 上的 CSV。
我可以 运行 异步处理并监控进度,但我注意到所有将集合存储到文件的 to_xxx() 方法似乎都是同步调用。它的一个缺点是调用可能会阻塞很长时间。第二,我不能轻易构造一个完整的图来稍后执行。
有没有办法 运行 例如to_csv() 异步获取未来对象而不是阻塞?
PS:我很确定我可以自己实现异步存储,例如通过将集合转换为 delayed() 并存储每个分区。但这似乎是一个常见的情况 - 除非我错过了现有的功能,否则在框架中包含这样的东西会很好。
大多数 to_*
函数都有一个 compute=True
关键字参数,可以用 compute=False
替换。在这些情况下,它将 return 一系列延迟值,然后您可以异步计算这些延迟值
values = df.to_csv('s3://...', compute=False)
futures = client.compute(values)