在 spark 中使用 Pandas udf 与 facebook prophet 进行预测
Forecasting with facebook prophet using Pandas udf in spark
我正在尝试在 Zeppelin 环境中使用 Facebook prophet in spark,并且我尝试按照 https://github.com/facebook/prophet/issues/517 中的确切步骤进行操作,但是,我收到如下错误。我只是不确定我要在这里纠正什么或如何调试它。
我的数据包含一个名为 ds
的日期时间特征,我想预测的体积 y
和 segment
,我正在尝试为每个细分构建一个模型。
File"/hadoop14/yarn/nm/usercache/khasbab/appcache/application_1588090646020_2412/container_e168_1588090646020_2412_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o3737.showString.
%livycd.pyspark
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd
result_schema = StructType([
StructField('segment', StringType(), True),
StructField('ds', TimestampType(), True),
StructField('trend', ArrayType(DoubleType()), True),
StructField('trend_upper', ArrayType(DoubleType()), True),
StructField('trend_lower', ArrayType(DoubleType()), True),
StructField('yearly', ArrayType(DoubleType()), True),
StructField('yearly_upper', ArrayType(DoubleType()), True),
StructField('yearly_lower', ArrayType(DoubleType()), True),
StructField('yhat', ArrayType(DoubleType()), True),
StructField('yhat_upper', ArrayType(DoubleType()), True),
StructField('yhat_lower', ArrayType(DoubleType()), True),
StructField('multiplicative_terms', ArrayType(DoubleType()), True),
StructField('multiplicative_terms_upper', ArrayType(DoubleType()), True),
StructField('multiplicative_terms_lower', ArrayType(DoubleType()), True),
StructField('additive_terms', ArrayType(DoubleType()), True),
StructField('additive_terms_upper', ArrayType(DoubleType()), True),
StructField('additive_terms_lower', ArrayType(DoubleType()), True),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(history_pd):
# instantiate the model, configure the parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
#history_pd['ds'] = pd.to_datetime(history_pd['ds'], errors = 'coerce', format = '%Y-%m-%d')
#.apply(lambda x: datetime.strptime(x,'%Y-%m-%d'))
# fit the model
model.fit(history_pd.loc[:,['ds','y']])
# configure predictions
future_pd = model.make_future_dataframe(
periods=20,
freq='W')
# make predictions
results_pd = model.predict(future_pd)
# return predictions
return pd.DataFrame({
'segment':history_pd['segment'].values[0],
'ds': [results_pd.loc[:,'ds'].values.tolist()],
'trend': [results_pd.loc[:,'ds'].values.tolist()],
'trend_upper': [results_pd.loc[:,'trend_upper'].values.tolist()],
'trend_lower': [results_pd.loc[:,'trend_lower'].values.tolist()],
'yearly': [results_pd.loc[:,'yearly'].values.tolist()],
'yearly_upper': [results_pd.loc[:,'yearly_upper'].values.tolist()],
'yearly_lower': [results_pd.loc[:,'yearly_lower'].values.tolist()],
'yhat': [results_pd.loc[:,'yhat'].values.tolist()],
'yhat_upper': [results_pd.loc[:,'yhat_upper'].values.tolist()],
'yhat_lower': [results_pd.loc[:,'yhat_lower'].values.tolist()],
'multiplicative_terms': [results_pd.loc[:,'multiplicative_terms'].values.tolist()],
'multiplicative_terms_upper': [results_pd.loc[:,'multiplicative_terms_upper'].values.tolist()],
'multiplicative_terms_lower': [results_pd.loc[:,'multiplicative_terms_lower'].values.tolist()],
'additive_terms': [results_pd.loc[:,'additive_terms'].values.tolist()],
'additive_terms_upper': [results_pd.loc[:,'additive_terms_upper'].values.tolist()],
'additive_terms_lower': [results_pd.loc[:,'additive_terms_lower'].values.tolist()]
})
#return pd.concat([pd.DataFrame(results_pd),pd.DataFrame(history_pd[['segment']].values[0])], axis = 1)
results =df3.groupBy('segment').apply(forecast_loans)
results.show()
我已将我的代码调整为以下内容,并按照此处 的建议降级为 pyarrow 0.14 并且一切正常!我相信将 pyarrow 降级到 0.14 是 Whosebug 上评论的 spark 2.x 版本的关键。
评论如下"The issue is not with pyarrow's new release, it is spark which has to upgrade and become compatible with pyarrow.(i am afraid we have to wait for spark 3.0 to use the latest pyarrow)"
%livycd.pyspark
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd
result_schema = StructType([
StructField('segment', StringType(), True),
StructField('ds', TimestampType(), True),
StructField('trend', DoubleType(), True),
StructField('trend_upper', DoubleType(), True),
StructField('trend_lower', DoubleType(), True),
StructField('yearly', DoubleType(), True),
StructField('yearly_upper', DoubleType(), True),
StructField('yearly_lower', DoubleType(), True),
StructField('yhat', DoubleType(), True),
StructField('yhat_upper', DoubleType(), True),
StructField('yhat_lower', DoubleType(), True),
StructField('multiplicative_terms', DoubleType(), True),
StructField('multiplicative_terms_upper', DoubleType(), True),
StructField('multiplicative_terms_lower', DoubleType(), True),
StructField('additive_terms', DoubleType(), True),
StructField('additive_terms_upper', DoubleType(), True),
StructField('additive_terms_lower', DoubleType(), True),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(df):
def prophet_model(df,test_start_date):
df['ds'] = pd.to_datetime(df['ds'])
# train
ts_train = (df
.query('ds < @test_start_date')
.sort_values('ds')
)
# test
ts_test = (df
.query('ds >= @test_start_date')
.sort_values('ds')
.drop('y', axis=1)
)
print(ts_test.columns)
# instantiate the model, configure the parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model
model.fit(ts_train.loc[:,['ds','y']])
# configure predictions
future_pd = model.make_future_dataframe(
periods=len(ts_test),
freq='W')
# make predictions
results_pd = model.predict(future_pd)
results_pd = pd.concat([results_pd,df['segment']],axis = 1)
return pd.DataFrame(results_pd, columns = result_schema.fieldNames())
# return predictions
return prophet_model(df, test_start_date= '2019-03-31')
results =df3.groupBy('segment').apply(forecast_loans)
假设您使用的是 Spark 2.3.x 或 2.4.x 并且 PyArrow >= 0.15.0,则有一个已知的 compatibility issue between PyArrow and Spark.
最简单的解决方案是设置环境变量ARROW_PRE_0_15_IPC_FORMAT=1
。 Spark 文档建议在 conf/spark-env.sh
中设置它,但您可以在 Linux shell 中设置它,也可以在 spark_session
中创建之前设置它 Python 脚本或 shell.
import os
os.ENVIRON["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
spark_session = ...
或者,如果您愿意,您可以降级 PyArrow,如其他答案中所述。
我正在尝试在 Zeppelin 环境中使用 Facebook prophet in spark,并且我尝试按照 https://github.com/facebook/prophet/issues/517 中的确切步骤进行操作,但是,我收到如下错误。我只是不确定我要在这里纠正什么或如何调试它。
我的数据包含一个名为 ds
的日期时间特征,我想预测的体积 y
和 segment
,我正在尝试为每个细分构建一个模型。
File"/hadoop14/yarn/nm/usercache/khasbab/appcache/application_1588090646020_2412/container_e168_1588090646020_2412_01_000001/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o3737.showString.
%livycd.pyspark
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd
result_schema = StructType([
StructField('segment', StringType(), True),
StructField('ds', TimestampType(), True),
StructField('trend', ArrayType(DoubleType()), True),
StructField('trend_upper', ArrayType(DoubleType()), True),
StructField('trend_lower', ArrayType(DoubleType()), True),
StructField('yearly', ArrayType(DoubleType()), True),
StructField('yearly_upper', ArrayType(DoubleType()), True),
StructField('yearly_lower', ArrayType(DoubleType()), True),
StructField('yhat', ArrayType(DoubleType()), True),
StructField('yhat_upper', ArrayType(DoubleType()), True),
StructField('yhat_lower', ArrayType(DoubleType()), True),
StructField('multiplicative_terms', ArrayType(DoubleType()), True),
StructField('multiplicative_terms_upper', ArrayType(DoubleType()), True),
StructField('multiplicative_terms_lower', ArrayType(DoubleType()), True),
StructField('additive_terms', ArrayType(DoubleType()), True),
StructField('additive_terms_upper', ArrayType(DoubleType()), True),
StructField('additive_terms_lower', ArrayType(DoubleType()), True),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(history_pd):
# instantiate the model, configure the parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
#history_pd['ds'] = pd.to_datetime(history_pd['ds'], errors = 'coerce', format = '%Y-%m-%d')
#.apply(lambda x: datetime.strptime(x,'%Y-%m-%d'))
# fit the model
model.fit(history_pd.loc[:,['ds','y']])
# configure predictions
future_pd = model.make_future_dataframe(
periods=20,
freq='W')
# make predictions
results_pd = model.predict(future_pd)
# return predictions
return pd.DataFrame({
'segment':history_pd['segment'].values[0],
'ds': [results_pd.loc[:,'ds'].values.tolist()],
'trend': [results_pd.loc[:,'ds'].values.tolist()],
'trend_upper': [results_pd.loc[:,'trend_upper'].values.tolist()],
'trend_lower': [results_pd.loc[:,'trend_lower'].values.tolist()],
'yearly': [results_pd.loc[:,'yearly'].values.tolist()],
'yearly_upper': [results_pd.loc[:,'yearly_upper'].values.tolist()],
'yearly_lower': [results_pd.loc[:,'yearly_lower'].values.tolist()],
'yhat': [results_pd.loc[:,'yhat'].values.tolist()],
'yhat_upper': [results_pd.loc[:,'yhat_upper'].values.tolist()],
'yhat_lower': [results_pd.loc[:,'yhat_lower'].values.tolist()],
'multiplicative_terms': [results_pd.loc[:,'multiplicative_terms'].values.tolist()],
'multiplicative_terms_upper': [results_pd.loc[:,'multiplicative_terms_upper'].values.tolist()],
'multiplicative_terms_lower': [results_pd.loc[:,'multiplicative_terms_lower'].values.tolist()],
'additive_terms': [results_pd.loc[:,'additive_terms'].values.tolist()],
'additive_terms_upper': [results_pd.loc[:,'additive_terms_upper'].values.tolist()],
'additive_terms_lower': [results_pd.loc[:,'additive_terms_lower'].values.tolist()]
})
#return pd.concat([pd.DataFrame(results_pd),pd.DataFrame(history_pd[['segment']].values[0])], axis = 1)
results =df3.groupBy('segment').apply(forecast_loans)
results.show()
我已将我的代码调整为以下内容,并按照此处
评论如下"The issue is not with pyarrow's new release, it is spark which has to upgrade and become compatible with pyarrow.(i am afraid we have to wait for spark 3.0 to use the latest pyarrow)"
%livycd.pyspark
from pyspark.sql.types import StructType,StructField,StringType,TimestampType,ArrayType,DoubleType
from pyspark.sql.functions import current_date
from pyspark.sql.functions import pandas_udf, PandasUDFType
from fbprophet import Prophet
from datetime import datetime
import pandas as pd
result_schema = StructType([
StructField('segment', StringType(), True),
StructField('ds', TimestampType(), True),
StructField('trend', DoubleType(), True),
StructField('trend_upper', DoubleType(), True),
StructField('trend_lower', DoubleType(), True),
StructField('yearly', DoubleType(), True),
StructField('yearly_upper', DoubleType(), True),
StructField('yearly_lower', DoubleType(), True),
StructField('yhat', DoubleType(), True),
StructField('yhat_upper', DoubleType(), True),
StructField('yhat_lower', DoubleType(), True),
StructField('multiplicative_terms', DoubleType(), True),
StructField('multiplicative_terms_upper', DoubleType(), True),
StructField('multiplicative_terms_lower', DoubleType(), True),
StructField('additive_terms', DoubleType(), True),
StructField('additive_terms_upper', DoubleType(), True),
StructField('additive_terms_lower', DoubleType(), True),
])
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def forecast_loans(df):
def prophet_model(df,test_start_date):
df['ds'] = pd.to_datetime(df['ds'])
# train
ts_train = (df
.query('ds < @test_start_date')
.sort_values('ds')
)
# test
ts_test = (df
.query('ds >= @test_start_date')
.sort_values('ds')
.drop('y', axis=1)
)
print(ts_test.columns)
# instantiate the model, configure the parameters
model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=False,
weekly_seasonality=False,
yearly_seasonality=True,
seasonality_mode='multiplicative'
)
# fit the model
model.fit(ts_train.loc[:,['ds','y']])
# configure predictions
future_pd = model.make_future_dataframe(
periods=len(ts_test),
freq='W')
# make predictions
results_pd = model.predict(future_pd)
results_pd = pd.concat([results_pd,df['segment']],axis = 1)
return pd.DataFrame(results_pd, columns = result_schema.fieldNames())
# return predictions
return prophet_model(df, test_start_date= '2019-03-31')
results =df3.groupBy('segment').apply(forecast_loans)
假设您使用的是 Spark 2.3.x 或 2.4.x 并且 PyArrow >= 0.15.0,则有一个已知的 compatibility issue between PyArrow and Spark.
最简单的解决方案是设置环境变量ARROW_PRE_0_15_IPC_FORMAT=1
。 Spark 文档建议在 conf/spark-env.sh
中设置它,但您可以在 Linux shell 中设置它,也可以在 spark_session
中创建之前设置它 Python 脚本或 shell.
import os
os.ENVIRON["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
spark_session = ...
或者,如果您愿意,您可以降级 PyArrow,如其他答案中所述。