计算 MAPE 并应用于 PySpark 分组数据框 [@pandas_udf]
Calculate MAPE and apply to PySpark grouped Dataframe [@pandas_udf]
目标:
计算每个唯一 ID
.
的 mean_absolute_percentage_error
(MAPE)
y
- 实际值
yhat
- 预测值
示例 PySpark 数据框:join_df
+----------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
| Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
| Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
| Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
| Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
| Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
| Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])
我尝试创建一个 uff
并使用 apply
函数将其应用到 ID
上。
from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()
但是,我收到错误消息:
PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'
我知道我正在请求 MAPE 作为 numpy 输出,它应该是数据帧。但是我确定我是否知道为了每个 ID
.
获得 MAPE 到底需要做什么不同的事情
你需要 return 一个 DataFrame
和 PandasUDFType.GROUPED_MAP ,因为你 returning 一个 numpy 数组,因此你会看到异常。
您还需要根据函数
修改架构以实现最终的 returned 数据框
你也应该使用 - applyInPandas ,我也添加了它的用法
数据准备
s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929, 693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253, 664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392, 639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268, 665.9529
""")
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+------+----------+-------+---------+----------+----------+
Pandas UDF - 用法
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').apply(gr_mape_val).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
申请Pandas
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
目标:
计算每个唯一 ID
.
mean_absolute_percentage_error
(MAPE)
y
- 实际值yhat
- 预测值
示例 PySpark 数据框:join_df
+----------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
| Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
| Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
| Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
| Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
| Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
| Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])
我尝试创建一个 uff
并使用 apply
函数将其应用到 ID
上。
from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()
但是,我收到错误消息:
PythonException: 'TypeError: Return type of the user-defined function should be pandas.DataFrame, but is <class 'numpy.float32'>'
我知道我正在请求 MAPE 作为 numpy 输出,它应该是数据帧。但是我确定我是否知道为了每个 ID
.
你需要 return 一个 DataFrame
和 PandasUDFType.GROUPED_MAP ,因为你 returning 一个 numpy 数组,因此你会看到异常。
您还需要根据函数
修改架构以实现最终的 returned 数据框你也应该使用 - applyInPandas ,我也添加了它的用法
数据准备
s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929, 693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253, 664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392, 639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268, 665.9529
""")
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+------+----------+-------+---------+----------+----------+
| ID| ds| y| yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929| 693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253| 664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392| 639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268| 665.9529|
+------+----------+-------+---------+----------+----------+
Pandas UDF - 用法
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').apply(gr_mape_val).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+
申请Pandas
final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
def gr_mape_val(join_df):
mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"])
join_df['mape'] = mape
return join_df
sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()
+------+----------+-------+---------+----------+----------+-----------+
| ID| ds| y| yhat|yhat_lower|yhat_upper| mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552| 693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079| 664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399| 639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385| 665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+