Fast/efficient 从多个大型 NetCDF 文件中提取数据的方法
Fast/efficient way to extract data from multiple large NetCDF files
我只需要从全局网格中提取一组特定节点的数据,这些节点由 lat/lon 坐标给出(按 5000-10000 的顺序)。数据是水力参数的时间序列,例如波高。
全球数据集很大所以分成了很多NetCDF文件。每个 NetCDF 文件大约 5GB,包含整个全球网格的数据,但仅包含一个变量(例如波高)和一年(例如 2020 年)的数据。假设我想在某个位置提取 6 个变量的完整时间序列(42 年),我需要从 6x42 = 252 个 NC 文件中提取数据,每个文件大小为 5GB。
我目前的方法是通过年份、变量和节点的三重循环。我使用 Xarray 打开每个 NC 文件,提取所有所需节点的数据并将其存储在字典中。提取字典中的所有数据后,我为每个位置创建一个 pd.dataframe,并将其存储为泡菜文件。有 6 个变量和 42 年,这导致每个位置的泡菜文件大约 7-9 MB(所以实际上不是很大)。
如果我的位置数量很少,我的方法非常有效,但一旦增加到几百个,这种方法就会花费很长时间。我的直觉是这是一个内存问题(因为所有提取的数据首先存储在一个字典中,直到每年和变量都被提取)。但是我的一位同事说Xarray实际上效率很低,这可能会导致持续时间过长。
这里有没有人遇到过类似问题或知道从大量 NC 文件中提取数据的有效方法?我把我目前使用的代码放在下面。感谢您的帮助!
# set conditions
vars = {...dictionary which contains variables}
years = np.arange(y0, y1 + 1) # year range
ndata = {} # dictionary which will contain all data
# loop through all the desired variables
for v in vars.keys():
ndata[v] = {}
# For each variable, loop through each year, open the nc file and extract the data
for y in years:
# Open file with xarray
fname = 'xxx.nc'
data = xr.open_dataset(fname)
# loop through the locations and load the data for each node as temp
for n in range(len(nodes)):
node = nodes.node_id.iloc[n]
lon = nodes.lon.iloc[n]
lat = nodes.lat.iloc[n]
temp = data.sel(longitude=lon, latitude=lat)
# For the first year, store the data into the ndata dict
if y == years[0]:
ndata[v][node] = temp
# For subsequent years, concatenate the existing array in ndata
else:
ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')
# merge the variables for the current location into one dataset
for n in range(len(nodes)):
node = nodes.node_id.iloc[n]
dset = xr.merge(ndata[v][node] for v in variables.keys())
df = dset.to_dataframe()
# save dataframe as pickle file, named by the node id
df.to_pickle('%s.xz'%(node)))
这是一个非常常见的工作流程,因此我将提供一些建议。一些建议的更改,首先是最重要的更改
使用xarray的advanced indexing一次性select所有点
您似乎在使用 pandas DataFrame nodes
列 'lat', 'lon', and 'node_id'
。与 python 中的几乎所有内容一样,尽可能删除内部 for 循环,利用用 C 编写的 array-based 操作。在这种情况下:
# create an xr.Dataset indexed by node_id with arrays `lat` and `lon
node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray()
# select all points from each file simultaneously, reshaping to be
# indexed by `node_id`
node_data = data.sel(lat=node_indexer.lat, lon=node_indexer.lon)
# dump this reshaped data to pandas, with each variable becoming a column
node_df = node_data.to_dataframe()
只重塑数组一次
在你的代码中,你循环了很多年,之后的每一年
第一个你正在分配一个有足够内存的新数组
保存的年限与您目前存储的年限一样长。
# For the first year, store the data into the ndata dict
if y == years[0]:
ndata[v][node] = temp
# For subsequent years, concatenate the existing array in ndata
else:
ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')
相反,只需收集所有年份的数据并串联起来
他们在最后。这只会为所有数据分配一次所需的数组。
使用dask, e.g. with xr.open_mfdataset
to leverage multiple cores. If you do this, you may want to consider using a format that supports multithreaded writes, e.g. zarr
总的来说,这可能看起来像这样:
# build nested filepaths
filepaths = [
['xxx.nc'.format(year=y, variable=v) for y in years
for v in variables
]
# build node indexer
node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray()
# I'm not sure if you have conflicting variable names - you'll need to
# tailor this line to your data setup. It may be that you want to just
# concatenate along years and then use `xr.merge` to combine the
# variables, or just handle one variable at a time
ds = xr.open_mfdataset(
filepaths,
combine='nested',
concat_dim=['variable', 'year'],
parallel=True,
)
# this will only schedule the operation - no work is done until the next line
ds_nodes = ds.sel(lat=node_indexer.lat, lon=node_indexer.lon)
# this triggers the operation using a dask LocalCluster, leveraging
# multiple threads on your machine (or a distributed Client if you have
# one set up)
ds_nodes.to_netcdf('all_the_data.zarr')
# alternatively, you could still dump to pandas:
df = ds_nodes.to_dataframe()
我只需要从全局网格中提取一组特定节点的数据,这些节点由 lat/lon 坐标给出(按 5000-10000 的顺序)。数据是水力参数的时间序列,例如波高。
全球数据集很大所以分成了很多NetCDF文件。每个 NetCDF 文件大约 5GB,包含整个全球网格的数据,但仅包含一个变量(例如波高)和一年(例如 2020 年)的数据。假设我想在某个位置提取 6 个变量的完整时间序列(42 年),我需要从 6x42 = 252 个 NC 文件中提取数据,每个文件大小为 5GB。
我目前的方法是通过年份、变量和节点的三重循环。我使用 Xarray 打开每个 NC 文件,提取所有所需节点的数据并将其存储在字典中。提取字典中的所有数据后,我为每个位置创建一个 pd.dataframe,并将其存储为泡菜文件。有 6 个变量和 42 年,这导致每个位置的泡菜文件大约 7-9 MB(所以实际上不是很大)。
如果我的位置数量很少,我的方法非常有效,但一旦增加到几百个,这种方法就会花费很长时间。我的直觉是这是一个内存问题(因为所有提取的数据首先存储在一个字典中,直到每年和变量都被提取)。但是我的一位同事说Xarray实际上效率很低,这可能会导致持续时间过长。
这里有没有人遇到过类似问题或知道从大量 NC 文件中提取数据的有效方法?我把我目前使用的代码放在下面。感谢您的帮助!
# set conditions
vars = {...dictionary which contains variables}
years = np.arange(y0, y1 + 1) # year range
ndata = {} # dictionary which will contain all data
# loop through all the desired variables
for v in vars.keys():
ndata[v] = {}
# For each variable, loop through each year, open the nc file and extract the data
for y in years:
# Open file with xarray
fname = 'xxx.nc'
data = xr.open_dataset(fname)
# loop through the locations and load the data for each node as temp
for n in range(len(nodes)):
node = nodes.node_id.iloc[n]
lon = nodes.lon.iloc[n]
lat = nodes.lat.iloc[n]
temp = data.sel(longitude=lon, latitude=lat)
# For the first year, store the data into the ndata dict
if y == years[0]:
ndata[v][node] = temp
# For subsequent years, concatenate the existing array in ndata
else:
ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')
# merge the variables for the current location into one dataset
for n in range(len(nodes)):
node = nodes.node_id.iloc[n]
dset = xr.merge(ndata[v][node] for v in variables.keys())
df = dset.to_dataframe()
# save dataframe as pickle file, named by the node id
df.to_pickle('%s.xz'%(node)))
这是一个非常常见的工作流程,因此我将提供一些建议。一些建议的更改,首先是最重要的更改
使用xarray的advanced indexing一次性select所有点
您似乎在使用 pandas DataFrame
nodes
列'lat', 'lon', and 'node_id'
。与 python 中的几乎所有内容一样,尽可能删除内部 for 循环,利用用 C 编写的 array-based 操作。在这种情况下:# create an xr.Dataset indexed by node_id with arrays `lat` and `lon node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray() # select all points from each file simultaneously, reshaping to be # indexed by `node_id` node_data = data.sel(lat=node_indexer.lat, lon=node_indexer.lon) # dump this reshaped data to pandas, with each variable becoming a column node_df = node_data.to_dataframe()
只重塑数组一次
在你的代码中,你循环了很多年,之后的每一年 第一个你正在分配一个有足够内存的新数组 保存的年限与您目前存储的年限一样长。
# For the first year, store the data into the ndata dict if y == years[0]: ndata[v][node] = temp # For subsequent years, concatenate the existing array in ndata else: ndata[v][node] = xr.concat([ndata[v][node],temp], dim='time')
相反,只需收集所有年份的数据并串联起来 他们在最后。这只会为所有数据分配一次所需的数组。
使用dask, e.g. with
xr.open_mfdataset
to leverage multiple cores. If you do this, you may want to consider using a format that supports multithreaded writes, e.g.zarr
总的来说,这可能看起来像这样:
# build nested filepaths
filepaths = [
['xxx.nc'.format(year=y, variable=v) for y in years
for v in variables
]
# build node indexer
node_indexer = nodes.set_index('node_id')[['lat', 'lon']].to_xarray()
# I'm not sure if you have conflicting variable names - you'll need to
# tailor this line to your data setup. It may be that you want to just
# concatenate along years and then use `xr.merge` to combine the
# variables, or just handle one variable at a time
ds = xr.open_mfdataset(
filepaths,
combine='nested',
concat_dim=['variable', 'year'],
parallel=True,
)
# this will only schedule the operation - no work is done until the next line
ds_nodes = ds.sel(lat=node_indexer.lat, lon=node_indexer.lon)
# this triggers the operation using a dask LocalCluster, leveraging
# multiple threads on your machine (or a distributed Client if you have
# one set up)
ds_nodes.to_netcdf('all_the_data.zarr')
# alternatively, you could still dump to pandas:
df = ds_nodes.to_dataframe()