在 spark 中使用 Pandas udf 与 facebook prophet 进行预测

Forecasting with facebook prophet using Pandas udf in spark

我正在尝试在 Zeppelin 环境中使用 Facebook prophet in spark,并且我尝试按照 https://github.com/facebook/prophet/issues/517 中的确切步骤进行操作,但是,我收到如下错误。我只是不确定我要在这里纠正什么或如何调试它。

我的数据包含一个名为 ds 的日期时间特征,我想预测的体积 ysegment,我正在尝试为每个细分构建一个模型。

File"/hadoop14/yarn/nm/usercache/khasbab/appcache/application_1588090646020_2412/container_e168_1588090646020_2412_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o3737.showString.

%livycd.pyspark

from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd


result_schema = StructType([

    StructField('segment', StringType(), True),
    StructField('ds', TimestampType(), True),
    StructField('trend', ArrayType(DoubleType()), True),
    StructField('trend_upper', ArrayType(DoubleType()), True),
    StructField('trend_lower', ArrayType(DoubleType()), True),
    StructField('yearly', ArrayType(DoubleType()), True),
    StructField('yearly_upper', ArrayType(DoubleType()), True),
    StructField('yearly_lower', ArrayType(DoubleType()), True),
    StructField('yhat', ArrayType(DoubleType()), True),
    StructField('yhat_upper', ArrayType(DoubleType()), True),
    StructField('yhat_lower', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms_upper', ArrayType(DoubleType()), True),
    StructField('multiplicative_terms_lower', ArrayType(DoubleType()), True),
    StructField('additive_terms', ArrayType(DoubleType()), True),
    StructField('additive_terms_upper', ArrayType(DoubleType()), True),
    StructField('additive_terms_lower', ArrayType(DoubleType()), True),

    ])

@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(history_pd):

    # instantiate the model, configure the parameters
    model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=False,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
    )

    #history_pd['ds'] = pd.to_datetime(history_pd['ds'], errors = 'coerce', format = '%Y-%m-%d')
    #.apply(lambda x: datetime.strptime(x,'%Y-%m-%d')) 

    # fit the model
    model.fit(history_pd.loc[:,['ds','y']])

    # configure predictions
    future_pd = model.make_future_dataframe(
        periods=20,
        freq='W')

    # make predictions
    results_pd = model.predict(future_pd)

    # return predictions
    return pd.DataFrame({

        'segment':history_pd['segment'].values[0],
        'ds': [results_pd.loc[:,'ds'].values.tolist()],
        'trend': [results_pd.loc[:,'ds'].values.tolist()],
        'trend_upper': [results_pd.loc[:,'trend_upper'].values.tolist()],
        'trend_lower': [results_pd.loc[:,'trend_lower'].values.tolist()],
        'yearly': [results_pd.loc[:,'yearly'].values.tolist()],
        'yearly_upper': [results_pd.loc[:,'yearly_upper'].values.tolist()],
        'yearly_lower': [results_pd.loc[:,'yearly_lower'].values.tolist()],
        'yhat': [results_pd.loc[:,'yhat'].values.tolist()],
        'yhat_upper': [results_pd.loc[:,'yhat_upper'].values.tolist()],
        'yhat_lower': [results_pd.loc[:,'yhat_lower'].values.tolist()],
        'multiplicative_terms': [results_pd.loc[:,'multiplicative_terms'].values.tolist()],
        'multiplicative_terms_upper': [results_pd.loc[:,'multiplicative_terms_upper'].values.tolist()],
        'multiplicative_terms_lower': [results_pd.loc[:,'multiplicative_terms_lower'].values.tolist()],
        'additive_terms': [results_pd.loc[:,'additive_terms'].values.tolist()],
        'additive_terms_upper': [results_pd.loc[:,'additive_terms_upper'].values.tolist()],
        'additive_terms_lower': [results_pd.loc[:,'additive_terms_lower'].values.tolist()]

    })
    #return pd.concat([pd.DataFrame(results_pd),pd.DataFrame(history_pd[['segment']].values[0])], axis = 1)




results =df3.groupBy('segment').apply(forecast_loans)


results.show()

我已将我的代码调整为以下内容,并按照此处 的建议降级为 pyarrow 0.14 并且一切正常!我相信将 pyarrow 降级到 0.14 是 Whosebug 上评论的 spark 2.x 版本的关键。

评论如下"The issue is not with pyarrow's new release, it is spark which has to upgrade and become compatible with pyarrow.(i am afraid we have to wait for spark 3.0 to use the latest pyarrow)"

%livycd.pyspark

from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd


result_schema = StructType([

    StructField('segment', StringType(), True),
    StructField('ds', TimestampType(), True),
    StructField('trend', DoubleType(), True),
    StructField('trend_upper', DoubleType(), True),
    StructField('trend_lower', DoubleType(), True),
    StructField('yearly', DoubleType(), True),
    StructField('yearly_upper', DoubleType(), True),
    StructField('yearly_lower', DoubleType(), True),
    StructField('yhat', DoubleType(), True),
    StructField('yhat_upper', DoubleType(), True),
    StructField('yhat_lower', DoubleType(), True),
    StructField('multiplicative_terms', DoubleType(), True),
    StructField('multiplicative_terms_upper', DoubleType(), True),
    StructField('multiplicative_terms_lower', DoubleType(), True),
    StructField('additive_terms', DoubleType(), True),
    StructField('additive_terms_upper', DoubleType(), True),
    StructField('additive_terms_lower', DoubleType(), True),

    ])


@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(df):

    def prophet_model(df,test_start_date):

        df['ds'] = pd.to_datetime(df['ds'])

        # train
        ts_train = (df
                    .query('ds < @test_start_date')
                    .sort_values('ds')
                    )
        # test
        ts_test = (df
                   .query('ds >= @test_start_date')
                   .sort_values('ds')
                   .drop('y', axis=1)
                   )

        print(ts_test.columns)

        # instantiate the model, configure the parameters
        model = Prophet(
            interval_width=0.95,
            growth='linear',
            daily_seasonality=False,
            weekly_seasonality=False,
            yearly_seasonality=True,
            seasonality_mode='multiplicative'
        )

        # fit the model

        model.fit(ts_train.loc[:,['ds','y']])

        # configure predictions
        future_pd = model.make_future_dataframe(
            periods=len(ts_test),
            freq='W')

        # make predictions
        results_pd = model.predict(future_pd)
        results_pd = pd.concat([results_pd,df['segment']],axis = 1)

        return pd.DataFrame(results_pd, columns = result_schema.fieldNames())

    # return predictions
    return prophet_model(df, test_start_date= '2019-03-31')




results =df3.groupBy('segment').apply(forecast_loans)

假设您使用的是 Spark 2.3.x 或 2.4.x 并且 PyArrow >= 0.15.0,则有一个已知的 compatibility issue between PyArrow and Spark.

最简单的解决方案是设置环境变量ARROW_PRE_0_15_IPC_FORMAT=1。 Spark 文档建议在 conf/spark-env.sh 中设置它,但您可以在 Linux shell 中设置它,也可以在 spark_session 中创建之前设置它 Python 脚本或 shell.

import os
os.ENVIRON["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
spark_session = ...

或者,如果您愿意,您可以降级 PyArrow,如其他答案中所述。