如何避免任务图中的大对象
How to avoid large objects in task graph
我正在 运行 使用 dask.distributed 进行模拟。我的模型是在一个延迟函数中定义的,我堆叠了几个实现。
此代码片段中给出了我所做操作的简化版本:
import numpy as np
import xarray as xr
import dask.array as da
import dask
from dask.distributed import Client
from itertools import repeat
@dask.delayed
def run_model(n_time,a,b):
result = np.array([a*np.random.randn(n_time)+b])
return result
client = Client()
# Parameters
n_sims = 10000
n_time = 100
a_vals = np.random.randn(n_sims)
b_vals = np.random.randn(n_sims)
output_file = 'out.nc'
# Run simulations
out = da.stack([da.from_delayed(run_model(n_time,a,b),(1,n_time,),np.float64) for a,b in zip(a_vals, b_vals)])
# Store output in a dataframe
ds = xr.Dataset({'var1': (['realization', 'time'], out[:,0,:])},
coords={'realization': np.arange(n_sims),
'time': np.arange(n_time)*.1})
# Save to a netcdf file -> at this point, computations will be carried out
ds.to_netcdf(output_file)
如果我想 运行 大量模拟,我会收到以下警告:
/home/user/miniconda3/lib/python3.6/site-packages/distributed/worker.py:840: UserWarning: Large object of size 2.73 MB detected in task graph:
("('getitem-32103d4a23823ad4f97dcb3faed7cf07', 0, ... cd39>]), False)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
据我了解(来自 and 问题),警告提出的方法有助于将大数据导入函数。但是,我的输入都是标量值,所以它们应该不会占用将近 3MB 的内存。即使函数 run_model()
根本不接受任何参数(因此没有传递任何参数),我也会收到相同的警告。
我还查看了任务图,看是否有一些步骤需要加载大量数据。对于三个实现,它看起来像这样:
所以在我看来,每个实现都是单独处理的,这应该使要处理的数据量保持在较低水平。
我想了解生成大对象的实际步骤是什么,以及我需要做什么才能将其分解成更小的部分。
在这种情况下,该消息具有轻微的误导性。该问题由以下内容证明:
> len(out[:, 0, :].dask)
40000
> out[:, 0, :].npartitions
10000
并且该图的 pickled 大小(其头部是消息中的 getitem
键)是 ~3MB。通过为计算的每个元素创建一个 dask-array,你最终得到一个堆叠数组,其中包含与元素一样多的分区,并且模型 运行 操作和项目选择,以及存储操作被应用于每个单个并存储在图中。是的,它们是独立的,很可能整个计算都会完成,但这一切都是非常浪费的,除非模型制作函数 运行s 在每个输入标量上花费相当长的时间。
在您的实际情况下,内部数组实际上可能比您提供的单元素版本更大,但在对数组进行 numpy 操作的一般情况下,创建数组是正常的工作人员(具有随机或某些加载功能)并在大小 >100MB 的分区上运行。
我正在 运行 使用 dask.distributed 进行模拟。我的模型是在一个延迟函数中定义的,我堆叠了几个实现。 此代码片段中给出了我所做操作的简化版本:
import numpy as np
import xarray as xr
import dask.array as da
import dask
from dask.distributed import Client
from itertools import repeat
@dask.delayed
def run_model(n_time,a,b):
result = np.array([a*np.random.randn(n_time)+b])
return result
client = Client()
# Parameters
n_sims = 10000
n_time = 100
a_vals = np.random.randn(n_sims)
b_vals = np.random.randn(n_sims)
output_file = 'out.nc'
# Run simulations
out = da.stack([da.from_delayed(run_model(n_time,a,b),(1,n_time,),np.float64) for a,b in zip(a_vals, b_vals)])
# Store output in a dataframe
ds = xr.Dataset({'var1': (['realization', 'time'], out[:,0,:])},
coords={'realization': np.arange(n_sims),
'time': np.arange(n_time)*.1})
# Save to a netcdf file -> at this point, computations will be carried out
ds.to_netcdf(output_file)
如果我想 运行 大量模拟,我会收到以下警告:
/home/user/miniconda3/lib/python3.6/site-packages/distributed/worker.py:840: UserWarning: Large object of size 2.73 MB detected in task graph:
("('getitem-32103d4a23823ad4f97dcb3faed7cf07', 0, ... cd39>]), False)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
% (format_bytes(len(b)), s))
据我了解(来自run_model()
根本不接受任何参数(因此没有传递任何参数),我也会收到相同的警告。
我还查看了任务图,看是否有一些步骤需要加载大量数据。对于三个实现,它看起来像这样:
所以在我看来,每个实现都是单独处理的,这应该使要处理的数据量保持在较低水平。
我想了解生成大对象的实际步骤是什么,以及我需要做什么才能将其分解成更小的部分。
在这种情况下,该消息具有轻微的误导性。该问题由以下内容证明:
> len(out[:, 0, :].dask)
40000
> out[:, 0, :].npartitions
10000
并且该图的 pickled 大小(其头部是消息中的 getitem
键)是 ~3MB。通过为计算的每个元素创建一个 dask-array,你最终得到一个堆叠数组,其中包含与元素一样多的分区,并且模型 运行 操作和项目选择,以及存储操作被应用于每个单个并存储在图中。是的,它们是独立的,很可能整个计算都会完成,但这一切都是非常浪费的,除非模型制作函数 运行s 在每个输入标量上花费相当长的时间。
在您的实际情况下,内部数组实际上可能比您提供的单元素版本更大,但在对数组进行 numpy 操作的一般情况下,创建数组是正常的工作人员(具有随机或某些加载功能)并在大小 >100MB 的分区上运行。