从 Future 创建惰性 xarray 对象

Create lazy xarray object from Future

我有一个 dask.delayed 函数,它接受一个 xarray.Dataarray 和 returns 作为参数。

我正在创建一些延迟任务并使用 dask.distributed 将它们传递给 client.compute。每次调用计算 returns 一个 distributed.client.Future 表示将返回的数据数组。

我的问题是:

有没有办法在不从 worker 加载实际数据的情况下从未来再次构建一个“惰性”数据数组?我的意图是根据第一次计算的输出构建第二个任务图。

我见过client.gather但这似乎是将所有数据拉回到客户端,这不是我想要的。

这是一个小例子:

import dask
from distributed import Client
import xarray as xr

# load example data
x = xr.tutorial.open_dataset("air_temperature")

# use first timestep
x_t0 = x.isel(time=0)

# delayed 'processing' function
@dask.delayed
def fun(x):
    return x*2

# init client
client = Client()

# compute on worker
future = client.compute(fun(x_t0))

# when done
print(future)
# <Future: finished, type: xarray.Dataset, key: fun-96cd56f4-4ed3-4eac-ade9-fe3f17e4b8c6>

## now how to get back to lazy xarray from future?

不知道你到底想达到什么目的。可能有比从未来创建新数组更好的方法来做到这一点。也就是说,这将从您的数据创建一个新的数据数组: 您必须不调用计算以保持惰性。

(如果你想要一个 dask 数组而不是 xarray 数组删除 xr.DataArray)

import dask
from distributed import Client
import xarray as xr

# load example data
x = xr.tutorial.open_dataset("air_temperature")

# use first timestep
x_t0 = x.isel(time=0)

# delayed 'processing' function
@dask.delayed
def fun(x):
    return x*2

# init client
client = Client()

# Create lazy xarray object from future:
import dask.array as da

new_ds = xr.DataArray(da.from_delayed(client.persist(fun(x_t0)), shape=x_t0.air.shape, meta='f8'), coords=x.coords)

编辑: 添加 client.persist 以在客户端留下数据

输出: