计算 MAPE 并应用于 PySpark 分组数据框 [@pandas_udf]

Calculate MAPE and apply to PySpark grouped Dataframe [@pandas_udf]

目标: 计算每个唯一 ID.

mean_absolute_percentage_error (MAPE)

示例 PySpark 数据框:join_df

+----------+----------+-------+---------+----------+----------+
|        ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
|    Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|    Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|    Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|    Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|    Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|    Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+----------+----------+-------+---------+----------+----------+

final_schema =StructType([
  StructField('ds',DateType()),
  StructField('ID',IntegerType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('mape',FloatType())
  ])

我尝试创建一个 uff 并使用 apply 函数将其应用到 ID 上。

from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
  
  mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
  join_df['mape'] = mape
  
  return join_df

df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()

但是,我收到错误消息:

PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'

我知道我正在请求 MAPE 作为 numpy 输出,它应该是数据帧。但是我确定我是否知道为了每个 ID.

获得 MAPE 到底需要做什么不同的事情

你需要 return 一个 DataFramePandasUDFType.GROUPED_MAP ,因为你 returning 一个 numpy 数组,因此你会看到异常。

您还需要根据函数

修改架构以实现最终的 returned 数据框

你也应该使用 - applyInPandas ,我也添加了它的用法

数据准备

s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929,   693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253,  664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392,  639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268,  665.9529
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

+------+----------+-------+---------+----------+----------+
|    ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+------+----------+-------+---------+----------+----------+

Pandas UDF - 用法

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])

@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').apply(gr_mape_val).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

申请Pandas

final_schema =StructType([
  StructField('ID',StringType()),
  StructField('ds',StringType()),
  StructField('y',FloatType()),
  StructField('yhat',FloatType()),
  StructField('yhat_lower',FloatType()),
  StructField('yhat_upper',FloatType()),
  StructField('mape',FloatType())
  ])


def gr_mape_val(join_df):
    
    mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
    
    join_df['mape'] = mape
    
    return join_df


sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()

+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+