Fast/efficient 从多个大型 NetCDF 文件中提取数据的方法

Fast/efficient way to extract data from multiple large NetCDF files

我只需要从全局网格中提取一组特定节点的数据,这些节点由 lat/lon 坐标给出(按 5000-10000 的顺序)。数据是水力参数的时间序列,例如波高。

全球数据集很大所以分成了很多NetCDF文件。每个 NetCDF 文件大约 5GB,包含整个全球网格的数据,但仅包含一个变量(例如波高)和一年(例如 2020 年)的数据。假设我想在某个位置提取 6 个变量的完整时间序列(42 年),我需要从 6x42 = 252 个 NC 文件中提取数据,每个文件大小为 5GB。

我目前的方法是通过年份、变量和节点的三重循环。我使用 Xarray 打开每个 NC 文件,提取所有所需节点的数据并将其存储在字典中。提取字典中的所有数据后,我为每个位置创建一个 pd.dataframe,并将其存储为泡菜文件。有 6 个变量和 42 年,这导致每个位置的泡菜文件大约 7-9 MB(所以实际上不是很大)。

如果我的位置数量很少,我的方法非常有效,但一旦增加到几百个,这种方法就会花费很长时间。我的直觉是这是一个内存问题(因为所有提取的数据首先存储在一个字典中,直到每年和变量都被提取)。但是我的一位同事说Xarray实际上效率很低,这可能会导致持续时间过长。

这里有没有人遇到过类似问题或知道从大量 NC 文件中提取数据的有效方法?我把我目前使用的代码放在下面。感谢您的帮助!

# set conditions
vars = {...dictionary which contains variables}
years = np.arange(y0, y1 + 1)   # year range
ndata = {}                      # dictionary which will contain all data

# loop through all the desired variables
for v in vars.keys():
    ndata[v] = {}

    # For each variable, loop through each year, open the nc file and extract the data
    for y in years:
        
        # Open file with xarray
        fname = 'xxx.nc'
        data = xr.open_dataset(fname)
        
        # loop through the locations and load the data for each node as temp
        for n in range(len(nodes)):
            node = nodes.node_id.iloc[n]
            lon = nodes.lon.iloc[n]
            lat = nodes.lat.iloc[n]    
            
            temp = data.sel(longitude=lon, latitude=lat)
            
            # For the first year, store the data into the ndata dict
            if y == years[0]:
                ndata[v][node] = temp
            # For subsequent years, concatenate the existing array in ndata
            else:
                ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')

# merge the variables for the current location into one dataset
for n in range(len(nodes)):
    node = nodes.node_id.iloc[n]
    
    dset = xr.merge(ndata[v][node] for v in variables.keys())
    df = dset.to_dataframe()

    # save dataframe as pickle file, named by the node id
    df.to_pickle('%s.xz'%(node)))

这是一个非常常见的工作流程,因此我将提供一些建议。一些建议的更改,首先是最重要的更改

  1. 使用xarray的advanced indexing一次性select所有点

    您似乎在使用 pandas DataFrame nodes'lat', 'lon', and 'node_id'。与 python 中的几乎所有内容一样,尽可能删除内部 for 循环,利用用 C 编写的 array-based 操作。在这种情况下:

    # create an xr.Dataset indexed by node_id with arrays `lat` and `lon
    node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray()
    
    # select all points from each file simultaneously, reshaping to be
    # indexed by `node_id`
    node_data = data.sel(lat=node_indexer.lat, lon=node_indexer.lon)
    
    # dump this reshaped data to pandas, with each variable becoming a column
    node_df = node_data.to_dataframe()
    
  2. 只重塑数组一次

    在你的代码中,你循环了很多年,之后的每一年 第一个你正在分配一个有足够内存的新数组 保存的年限与您目前存储的年限一样长。

    # For the first year, store the data into the ndata dict
    if y == years[0]:
        ndata[v][node] = temp
    # For subsequent years, concatenate the existing array in ndata
    else:
        ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')
    

    相反,只需收集所有年份的数据并串联起来 他们在最后。这只会为所有数据分配一次所需的数组。

  3. 使用dask, e.g. with xr.open_mfdataset to leverage multiple cores. If you do this, you may want to consider using a format that supports multithreaded writes, e.g. zarr

总的来说,这可能看起来像这样:

# build nested filepaths
filepaths = [
    ['xxx.nc'.format(year=y, variable=v) for y in years
    for v in variables
]

# build node indexer
node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray()

# I'm not sure if you have conflicting variable names - you'll need to
# tailor this line to your data setup. It may be that you want to just
# concatenate along years and then use `xr.merge` to combine the
# variables, or just handle one variable at a time
ds = xr.open_mfdataset(
    filepaths,
    combine='nested',
    concat_dim=['variable', 'year'],
    parallel=True,
)

# this will only schedule the operation - no work is done until the next line
ds_nodes = ds.sel(lat=node_indexer.lat, lon=node_indexer.lon)

# this triggers the operation using a dask LocalCluster, leveraging
# multiple threads on your machine (or a distributed Client if you have
# one set up)
ds_nodes.to_netcdf('all_the_data.zarr')

# alternatively, you could still dump to pandas:
df = ds_nodes.to_dataframe()