堆叠从 Xarray 生成的 Dask 数组的有效方法
Efficient way to stack Dask Arrays generated from Xarray
所以我正在尝试读取大量包含水文数据的相对较大的 netCDF 文件。 NetCDF 文件都是这样的:
<xarray.Dataset>
Dimensions: (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
* time (time) datetime64[ns] 1993-01-11T21:00:00
* reference_time (reference_time) datetime64[ns] 1993-01-01
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 ...
Data variables:
streamflow (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
q_lateral (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
velocity (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qSfcLatRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBucket (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBtmVertRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
Attributes:
featureType: timeSeries
proj4: +proj=longlat +datum=NAD83 +no_defs
model_initialization_time: 1993-01-01_00:00:00
station_dimension: feature_id
model_output_valid_time: 1993-01-11_21:00:00
stream_order_output: 1
cdm_datatype: Station
esri_pe_string: GEOGCS[GCS_North_American_1983,DATUM[D_North_...
Conventions: CF-1.6
model_version: NWM 1.2
dev_OVRTSWCRT: 1
dev_NOAH_TIMESTEP: 3600
dev_channel_only: 0
dev_channelBucket_only: 0
dev: dev_ prefix indicates development/internal me...
我有 25 年的这个数据,并且每小时记录一次。所以总共有大约 4 TB 的数据。
现在我只是想获取流量值的季节性平均值(每日和每月)。所以我创建了以下脚本。
import xarray as xr
import dask.array as da
from dask.distributed import Client
import os
workdir = '/path/to/directory/of/files'
files = [os.path.join(workdir, i) for i in os.listdir(workdir)]
client = Client(processes=False, threads_per_worker=4, n_workers=4, memory_limit='750MB')
big_array = []
for i, file in enumerate(files):
ds = xr.open_dataset(file, chunks={"feature_id": 50000})
if i == 0:
print(ds)
print(ds.streamflow)
big_array.append(ds.streamflow)
ds.close()
if i == 5:
break
dask_big_array = da.stack(big_array, axis=0)
print(dask_big_array)
ds.streamflow 对象在打印时看起来像这样,据我了解它只是一个 Dask 数组:
<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000]
奇怪的是,当我堆叠数组时,它们似乎丢失了我之前应用到它们的分块。当我打印出 big_array 对象时,我得到这个:
dask.array<stack, shape=(6, 2729077), dtype=float64, chunksize=(1, 2729077)>
我 运行 遇到的问题是当我尝试 运行 此代码时收到此警告,然后我认为内存过载所以我必须终止进程。
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk...
所以我想我有几个问题:
- 为什么 dask 数组在堆叠时会丢失分块?
- 是否有更有效的方法来堆叠所有这些数组以并行化此过程?
根据评论,big array
是这样的:
[<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000]]
这里的问题是 dask.array.stack()
无法将 xarray.DataArray
对象识别为保存 dask 数组,因此它将它们全部转换为 NumPy 数组。这就是你最终耗尽记忆力的方式。
您可以通过几种不同的可能方式解决此问题:
- 在 dask 数组列表上调用
dask.array.stack()
,例如,将 big_array.append(ds.streamflow)
切换为 big_array.append(ds.streamflow.data)
。
- 使用
xarray.concat()
代替dask.array.stack()
,例如写dask_big_array = xarray.concat(big_array, dim='time')
.
- 使用
xarray.open_mfdataset()
结合了打开许多文件并将它们堆叠在一起的过程,例如,将此处的所有逻辑替换为 xarray.open_mfdataset('/path/to/directory/of/files/*')
。
所以我正在尝试读取大量包含水文数据的相对较大的 netCDF 文件。 NetCDF 文件都是这样的:
<xarray.Dataset>
Dimensions: (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
* time (time) datetime64[ns] 1993-01-11T21:00:00
* reference_time (reference_time) datetime64[ns] 1993-01-01
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 ...
Data variables:
streamflow (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
q_lateral (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
velocity (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qSfcLatRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBucket (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBtmVertRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
Attributes:
featureType: timeSeries
proj4: +proj=longlat +datum=NAD83 +no_defs
model_initialization_time: 1993-01-01_00:00:00
station_dimension: feature_id
model_output_valid_time: 1993-01-11_21:00:00
stream_order_output: 1
cdm_datatype: Station
esri_pe_string: GEOGCS[GCS_North_American_1983,DATUM[D_North_...
Conventions: CF-1.6
model_version: NWM 1.2
dev_OVRTSWCRT: 1
dev_NOAH_TIMESTEP: 3600
dev_channel_only: 0
dev_channelBucket_only: 0
dev: dev_ prefix indicates development/internal me...
我有 25 年的这个数据,并且每小时记录一次。所以总共有大约 4 TB 的数据。
现在我只是想获取流量值的季节性平均值(每日和每月)。所以我创建了以下脚本。
import xarray as xr
import dask.array as da
from dask.distributed import Client
import os
workdir = '/path/to/directory/of/files'
files = [os.path.join(workdir, i) for i in os.listdir(workdir)]
client = Client(processes=False, threads_per_worker=4, n_workers=4, memory_limit='750MB')
big_array = []
for i, file in enumerate(files):
ds = xr.open_dataset(file, chunks={"feature_id": 50000})
if i == 0:
print(ds)
print(ds.streamflow)
big_array.append(ds.streamflow)
ds.close()
if i == 5:
break
dask_big_array = da.stack(big_array, axis=0)
print(dask_big_array)
ds.streamflow 对象在打印时看起来像这样,据我了解它只是一个 Dask 数组:
<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000]
奇怪的是,当我堆叠数组时,它们似乎丢失了我之前应用到它们的分块。当我打印出 big_array 对象时,我得到这个:
dask.array<stack, shape=(6, 2729077), dtype=float64, chunksize=(1, 2729077)>
我 运行 遇到的问题是当我尝试 运行 此代码时收到此警告,然后我认为内存过载所以我必须终止进程。
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk...
所以我想我有几个问题:
- 为什么 dask 数组在堆叠时会丢失分块?
- 是否有更有效的方法来堆叠所有这些数组以并行化此过程?
根据评论,big array
是这样的:
[<xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000], <xarray.DataArray 'streamflow' (feature_id: 2729077)>
dask.array<shape=(2729077,), dtype=float64, chunksize=(50000,)>
Coordinates:
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 851 ...
Attributes:
long_name: River Flow
units: m3 s-1
coordinates: latitude longitude
valid_range: [ 0 50000000]]
这里的问题是 dask.array.stack()
无法将 xarray.DataArray
对象识别为保存 dask 数组,因此它将它们全部转换为 NumPy 数组。这就是你最终耗尽记忆力的方式。
您可以通过几种不同的可能方式解决此问题:
- 在 dask 数组列表上调用
dask.array.stack()
,例如,将big_array.append(ds.streamflow)
切换为big_array.append(ds.streamflow.data)
。 - 使用
xarray.concat()
代替dask.array.stack()
,例如写dask_big_array = xarray.concat(big_array, dim='time')
. - 使用
xarray.open_mfdataset()
结合了打开许多文件并将它们堆叠在一起的过程,例如,将此处的所有逻辑替换为xarray.open_mfdataset('/path/to/directory/of/files/*')
。