在 Spark 数据集上应用张量流概率模型

apply tensorflow probability model on spark dataset

我正在使用来自 tensorflow 概率的 sts 生成预测,它在我使用的数据样本上表现良好,但我现在想在更广泛的范围内尝试它,所以我想在 PySpark 中实现sts 模型。

我有一个看起来像这样的数据集:

Id Date value
1 01/01/2021 10
1 01/02/2021 15
1 01/03/2021 11
2 01/01/2021 100
2 01/02/2021 120
2 01/03/2021 90
... ... ...

我想找到一种方法来为每个 ID(具有相同数量的条目)创建预测,并且需要将我之前构建的模型应用于每个 ID,最好的方法是什么?

我在 pyspark 中使用 pandas_udf 解决了这个问题:

# Create the pandas udf:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def Forecast(pdf):
        # From a pandas dataframe create a serie with timestamp as index
        pdf = pdf.sort_values(by=['Date'])

        PTS = pdf.drop(columns=['Id'])

        PTS.set_index("date", inplace=True)
        PTS.index = utils.add_freq(PTS.index, 'MS')
        PTS.loc[:, 'value'] = PTS.loc[:, 'value'].astype(float)

        _train = PTS['value'][PTS.index < Split_Date]
        train = _train.to_numpy().reshape(-1, 1)

        forecast_distribution = utils.myForecast(train)

        fcst_mu = forecast_distribution.mean().numpy()[..., 0]

        list_date = pdf.loc[pdf['Date'] >= Split_Date, 'Date'].tolist()
        for i in range(len(list_date)):
            pdf.loc[pdf['Date'] == list_date[i], 'QTY'] = fcst_mu[i]

        return pdf

# Apply the function per group of Id:
df= df.groupby('Id').apply(Forecast)

其中 myForecast 是在另一个文件中创建的 STS 模型,add_freq 是添加句点的函数。