在 Spark 数据集上应用张量流概率模型
apply tensorflow probability model on spark dataset
我正在使用来自 tensorflow 概率的 sts 生成预测,它在我使用的数据样本上表现良好,但我现在想在更广泛的范围内尝试它,所以我想在 PySpark 中实现sts 模型。
我有一个看起来像这样的数据集:
Id
Date
value
1
01/01/2021
10
1
01/02/2021
15
1
01/03/2021
11
2
01/01/2021
100
2
01/02/2021
120
2
01/03/2021
90
...
...
...
我想找到一种方法来为每个 ID(具有相同数量的条目)创建预测,并且需要将我之前构建的模型应用于每个 ID,最好的方法是什么?
我在 pyspark 中使用 pandas_udf 解决了这个问题:
# Create the pandas udf:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def Forecast(pdf):
# From a pandas dataframe create a serie with timestamp as index
pdf = pdf.sort_values(by=['Date'])
PTS = pdf.drop(columns=['Id'])
PTS.set_index("date", inplace=True)
PTS.index = utils.add_freq(PTS.index, 'MS')
PTS.loc[:, 'value'] = PTS.loc[:, 'value'].astype(float)
_train = PTS['value'][PTS.index < Split_Date]
train = _train.to_numpy().reshape(-1, 1)
forecast_distribution = utils.myForecast(train)
fcst_mu = forecast_distribution.mean().numpy()[..., 0]
list_date = pdf.loc[pdf['Date'] >= Split_Date, 'Date'].tolist()
for i in range(len(list_date)):
pdf.loc[pdf['Date'] == list_date[i], 'QTY'] = fcst_mu[i]
return pdf
# Apply the function per group of Id:
df= df.groupby('Id').apply(Forecast)
其中 myForecast
是在另一个文件中创建的 STS 模型,add_freq
是添加句点的函数。
我正在使用来自 tensorflow 概率的 sts 生成预测,它在我使用的数据样本上表现良好,但我现在想在更广泛的范围内尝试它,所以我想在 PySpark 中实现sts 模型。
我有一个看起来像这样的数据集:
Id | Date | value |
---|---|---|
1 | 01/01/2021 | 10 |
1 | 01/02/2021 | 15 |
1 | 01/03/2021 | 11 |
2 | 01/01/2021 | 100 |
2 | 01/02/2021 | 120 |
2 | 01/03/2021 | 90 |
... | ... | ... |
我想找到一种方法来为每个 ID(具有相同数量的条目)创建预测,并且需要将我之前构建的模型应用于每个 ID,最好的方法是什么?
我在 pyspark 中使用 pandas_udf 解决了这个问题:
# Create the pandas udf:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def Forecast(pdf):
# From a pandas dataframe create a serie with timestamp as index
pdf = pdf.sort_values(by=['Date'])
PTS = pdf.drop(columns=['Id'])
PTS.set_index("date", inplace=True)
PTS.index = utils.add_freq(PTS.index, 'MS')
PTS.loc[:, 'value'] = PTS.loc[:, 'value'].astype(float)
_train = PTS['value'][PTS.index < Split_Date]
train = _train.to_numpy().reshape(-1, 1)
forecast_distribution = utils.myForecast(train)
fcst_mu = forecast_distribution.mean().numpy()[..., 0]
list_date = pdf.loc[pdf['Date'] >= Split_Date, 'Date'].tolist()
for i in range(len(list_date)):
pdf.loc[pdf['Date'] == list_date[i], 'QTY'] = fcst_mu[i]
return pdf
# Apply the function per group of Id:
df= df.groupby('Id').apply(Forecast)
其中 myForecast
是在另一个文件中创建的 STS 模型,add_freq
是添加句点的函数。