使用 xarray 在 netCDF 文件上使用 Prophet
using Prophet on netCDF file using xarray
我有一个 'netCDF' 文件,我用 xarray 读取了它,我想用它为文件中的每个像素生成预测。
import xarray as xr
from fbprophet import Prophet
import time
with xr.open_dataset('avi.nc',
chunks={'y': 2, 'x':2}) as avi:
print(avi)
<xarray.Dataset>
Dimensions: (ds: 104, lat: 213, lon: 177)
Coordinates:
* lat (lat) float64 -2.711e+06 -2.711e+06 -2.711e+06 -2.711e+06 ...
* lon (lon) float64 1.923e+06 1.924e+06 1.924e+06 1.924e+06 1.924e+06 ...
* ds (ds) object '1999-07-16T23:46:04.500000000' ...
Data variables:
y (ds, lat, lon) float64 dask.array<shape=(104, 213, 177),
chunksize=(104, 2, 2)>
我为每个像素创建模型的方式是:
* 遍历数组中的每个像素 (for i in range(dataset.sizes['lat']):
),
* 创建模型(m1
),
* 将模型输出发送到 pandas DataFrame (output
)
我试过 'chunking' netCDF 文件,但我看不出效率有什么不同。
下面是我目前使用的代码。
columns = ('Year','lat', 'lon')
dates = list(range(1996, 1999))
output = pd.DataFrame(columns=columns)
forecast2 = pd.DataFrame()
def GAM2 (dataset):
for i in range(dataset.sizes['lat']):
for k in range(dataset.sizes['lon']):
count +=1
df1 = dataset.y.isel(lat=slice(px_lat, (px_lat+1)), lon=slice(px_lon, (px_lon+1))).to_dataframe()
df1['ds'] = pd.to_datetime(df1.index.get_level_values(0), dayfirst=True)
df1['doy'] = df1['ds'].dt.dayofyear
m1 = Prophet(weekly_seasonality=False).fit(df1)
future1 = m1.make_future_dataframe()
output _data = {
'Year': year,
'lat': dataset.lat[px_lat].values,
'lon': dataset.lon[px_lon].values}
output = output .append(output , ignore_index=True)
if px_lon < (dataset.sizes['lon'] - 1):
px_lon += 1
else:
px_lon = 0
if px_lat < dataset.sizes['lat']:
px_lat += 1
else:
px_lat = 0
return output
问题:
- 我手动循环遍历数组(即
for i in range(dataset.sizes['lat']): ...
。
- 输出当前转到 pandas 数据框,我需要它转到与坐标相同(
lat
、lon
)的 DataArray
DataSet
用于进一步分析和可视化。
问题:
dataset.apply()
可以使用这些函数吗?例如:
def GAM2 (dataset, index_name, site_name):
m1 = Prophet(weekly_seasonality=False).fit(df1)
future1 = m1.make_future_dataframe()
output _data = {
'Year': year,
'lat': dataset.lat[px_lat].values,
'lon': dataset.lon[px_lon].values}
return output
ds.apply(GAM2)
- 我可以将输出作为变量直接存储到
DataArray
中吗?还是我必须继续使用 pandas DatraFrame
然后尝试将其转换为 DataArray
?
我相信我有你要找的答案。
与其对 xarray DataArray 的每个坐标点进行双重循环,不如使用允许并行计算的 xarray 的向量化 u_function。
如果将 FProphet 应用到 u_function,则可以生成针对每个坐标点的预测输出。
这是一个可重现的例子:
import pandas as pd
pd.set_option('display.width', 50000)
pd.set_option('display.max_rows', 50000)
pd.set_option('display.max_columns', 5000)
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from fbprophet import Prophet
#
#https://gist.github.com/scottyhq/8daa7290298c9edf2ef1eb05dc3b6c60
ds = xr.tutorial.open_dataset('rasm').load()
def parse_datetime(time):
return pd.to_datetime([str(x) for x in time])
ds.coords['time'] = parse_datetime(ds.coords['time'].values)
ds = ds.isel({'x':slice(175,180), 'y':slice(160,170)})
ds.isel({'time':0}).Tair.plot()
ds = ds.chunk({'x':40, 'y':40})
def fillna_in_array(x):
y = np.where(np.abs(x)==np.inf, 0, x)
y = np.where(np.isnan(y), 0, y)
if np.all(y) == 0:
y = np.arange(len(y))
return y
def xarray_Prophet(y, time, periods=30, freq='D'):
'''
This is a vectorized u_function of the Prophet prediction module.
It returns an array of values containing original and predicted values
according to the provided temporal sequence.
Parameters:
y (array): an array containing the y past values that will be
used for the prediction.
time (array): an array containing the time intervals of each respective
entrance of the sampled y
periods (positive int): the number of times it will be used for prediction
freq (str): the frequency that will be used in the prediction:
(i.e.: 'D', 'M', 'Y', 'm', 'H'...)
Returns:
array of predicted values of y (yhat)
'''
# Here, we ensure that all data is filled. Since Xarray has some Issues with
# sparse matrices, It is a good solution for all NaN, inf, or 0 values for
# sampled y data
with ProgressBar():
y = fillna_in_array(y)
# here the processing really starts:
forecast = pd.DataFrame()
forecast['ds'] = pd.to_datetime(time)
forecast['y'] = y
m1 = Prophet(weekly_seasonality=True,
daily_seasonality=False).fit(forecast)
forecast = m1.make_future_dataframe(periods=periods, freq=freq)
# In here, the u_function should return a simple 1-D array,
# or a pandas series.
# Therefore, we select the attribute 'yat' from the
# FProphet prediction dataframe to return solely a 1D data.
return m1.predict(forecast)['yhat']
def predict_y(ds,
dim=['time'],
dask='allowed',
new_dim_name=['predicted'],
periods=30, freq='D'):
'''
Function Description:
This function is a vectorized parallelized wrapper of
the "xarray_Prophet".
It returns a new Xarray object (dataarray or Dataset) with the new
dimension attached.
Parameters:
ds (xarray - DataSet/DataArray)
dim (list of strings): a list of the dimension that will be used in the
reduction (temporal prediction)
dask (str): allowed
new_dim_name (list of strings): it contains the name that will be used
in the reduction operation.
periods (positive int): the number of steps to be predicted based
on the parameter "freq".
freq (str): the frequency that will be used in the prediction:
(i.e.: 'D', 'M', 'Y', 'm', 'H'...)
Returns:
Xarray object (Dataset or DataArray): the type is solely dependent on
the ds object's type.
'''
with ProgressBar():
ds = ds.sortby('time', False)
time = np.unique(ds['time'].values)
kwargs = {'time':time,
'periods': periods,
'freq':freq}
filtered = xr.apply_ufunc(xarray_Prophet,
ds,
dask=dask,
vectorize=True,
input_core_dims=[dim],
#exclude_dims = dim, # This must not be setted.
output_core_dims=[new_dim_name],
kwargs=kwargs,
output_dtypes=[float],
join='outer',
dataset_fill_value=np.nan,
).compute()
return filtered
da_binned = predict_y( ds = ds['Tair'],
dim = ['time'],
dask='allowed',
new_dim_name=['predicted'],
periods=30).rename({'predicted':'time'})
print(da_binned)
我有一个 'netCDF' 文件,我用 xarray 读取了它,我想用它为文件中的每个像素生成预测。
import xarray as xr
from fbprophet import Prophet
import time
with xr.open_dataset('avi.nc',
chunks={'y': 2, 'x':2}) as avi:
print(avi)
<xarray.Dataset>
Dimensions: (ds: 104, lat: 213, lon: 177)
Coordinates:
* lat (lat) float64 -2.711e+06 -2.711e+06 -2.711e+06 -2.711e+06 ...
* lon (lon) float64 1.923e+06 1.924e+06 1.924e+06 1.924e+06 1.924e+06 ...
* ds (ds) object '1999-07-16T23:46:04.500000000' ...
Data variables:
y (ds, lat, lon) float64 dask.array<shape=(104, 213, 177),
chunksize=(104, 2, 2)>
我为每个像素创建模型的方式是:
* 遍历数组中的每个像素 (for i in range(dataset.sizes['lat']):
),
* 创建模型(m1
),
* 将模型输出发送到 pandas DataFrame (output
)
我试过 'chunking' netCDF 文件,但我看不出效率有什么不同。 下面是我目前使用的代码。
columns = ('Year','lat', 'lon')
dates = list(range(1996, 1999))
output = pd.DataFrame(columns=columns)
forecast2 = pd.DataFrame()
def GAM2 (dataset):
for i in range(dataset.sizes['lat']):
for k in range(dataset.sizes['lon']):
count +=1
df1 = dataset.y.isel(lat=slice(px_lat, (px_lat+1)), lon=slice(px_lon, (px_lon+1))).to_dataframe()
df1['ds'] = pd.to_datetime(df1.index.get_level_values(0), dayfirst=True)
df1['doy'] = df1['ds'].dt.dayofyear
m1 = Prophet(weekly_seasonality=False).fit(df1)
future1 = m1.make_future_dataframe()
output _data = {
'Year': year,
'lat': dataset.lat[px_lat].values,
'lon': dataset.lon[px_lon].values}
output = output .append(output , ignore_index=True)
if px_lon < (dataset.sizes['lon'] - 1):
px_lon += 1
else:
px_lon = 0
if px_lat < dataset.sizes['lat']:
px_lat += 1
else:
px_lat = 0
return output
问题:
- 我手动循环遍历数组(即
for i in range(dataset.sizes['lat']): ...
。 - 输出当前转到 pandas 数据框,我需要它转到与坐标相同(
lat
、lon
)的DataArray
DataSet
用于进一步分析和可视化。
问题:
dataset.apply()
可以使用这些函数吗?例如:
def GAM2 (dataset, index_name, site_name):
m1 = Prophet(weekly_seasonality=False).fit(df1)
future1 = m1.make_future_dataframe()
output _data = {
'Year': year,
'lat': dataset.lat[px_lat].values,
'lon': dataset.lon[px_lon].values}
return output
ds.apply(GAM2)
- 我可以将输出作为变量直接存储到
DataArray
中吗?还是我必须继续使用 pandasDatraFrame
然后尝试将其转换为DataArray
?
我相信我有你要找的答案。
与其对 xarray DataArray 的每个坐标点进行双重循环,不如使用允许并行计算的 xarray 的向量化 u_function。
如果将 FProphet 应用到 u_function,则可以生成针对每个坐标点的预测输出。
这是一个可重现的例子:
import pandas as pd
pd.set_option('display.width', 50000)
pd.set_option('display.max_rows', 50000)
pd.set_option('display.max_columns', 5000)
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from fbprophet import Prophet
#
#https://gist.github.com/scottyhq/8daa7290298c9edf2ef1eb05dc3b6c60
ds = xr.tutorial.open_dataset('rasm').load()
def parse_datetime(time):
return pd.to_datetime([str(x) for x in time])
ds.coords['time'] = parse_datetime(ds.coords['time'].values)
ds = ds.isel({'x':slice(175,180), 'y':slice(160,170)})
ds.isel({'time':0}).Tair.plot()
ds = ds.chunk({'x':40, 'y':40})
def fillna_in_array(x):
y = np.where(np.abs(x)==np.inf, 0, x)
y = np.where(np.isnan(y), 0, y)
if np.all(y) == 0:
y = np.arange(len(y))
return y
def xarray_Prophet(y, time, periods=30, freq='D'):
'''
This is a vectorized u_function of the Prophet prediction module.
It returns an array of values containing original and predicted values
according to the provided temporal sequence.
Parameters:
y (array): an array containing the y past values that will be
used for the prediction.
time (array): an array containing the time intervals of each respective
entrance of the sampled y
periods (positive int): the number of times it will be used for prediction
freq (str): the frequency that will be used in the prediction:
(i.e.: 'D', 'M', 'Y', 'm', 'H'...)
Returns:
array of predicted values of y (yhat)
'''
# Here, we ensure that all data is filled. Since Xarray has some Issues with
# sparse matrices, It is a good solution for all NaN, inf, or 0 values for
# sampled y data
with ProgressBar():
y = fillna_in_array(y)
# here the processing really starts:
forecast = pd.DataFrame()
forecast['ds'] = pd.to_datetime(time)
forecast['y'] = y
m1 = Prophet(weekly_seasonality=True,
daily_seasonality=False).fit(forecast)
forecast = m1.make_future_dataframe(periods=periods, freq=freq)
# In here, the u_function should return a simple 1-D array,
# or a pandas series.
# Therefore, we select the attribute 'yat' from the
# FProphet prediction dataframe to return solely a 1D data.
return m1.predict(forecast)['yhat']
def predict_y(ds,
dim=['time'],
dask='allowed',
new_dim_name=['predicted'],
periods=30, freq='D'):
'''
Function Description:
This function is a vectorized parallelized wrapper of
the "xarray_Prophet".
It returns a new Xarray object (dataarray or Dataset) with the new
dimension attached.
Parameters:
ds (xarray - DataSet/DataArray)
dim (list of strings): a list of the dimension that will be used in the
reduction (temporal prediction)
dask (str): allowed
new_dim_name (list of strings): it contains the name that will be used
in the reduction operation.
periods (positive int): the number of steps to be predicted based
on the parameter "freq".
freq (str): the frequency that will be used in the prediction:
(i.e.: 'D', 'M', 'Y', 'm', 'H'...)
Returns:
Xarray object (Dataset or DataArray): the type is solely dependent on
the ds object's type.
'''
with ProgressBar():
ds = ds.sortby('time', False)
time = np.unique(ds['time'].values)
kwargs = {'time':time,
'periods': periods,
'freq':freq}
filtered = xr.apply_ufunc(xarray_Prophet,
ds,
dask=dask,
vectorize=True,
input_core_dims=[dim],
#exclude_dims = dim, # This must not be setted.
output_core_dims=[new_dim_name],
kwargs=kwargs,
output_dtypes=[float],
join='outer',
dataset_fill_value=np.nan,
).compute()
return filtered
da_binned = predict_y( ds = ds['Tair'],
dim = ['time'],
dask='allowed',
new_dim_name=['predicted'],
periods=30).rename({'predicted':'time'})
print(da_binned)