Pyspark - Calculate RMSE between actuals and predictions for a groupby - AssertionError: all exprs should be Column
Pyspark - Calculate RMSE between actuals and predictions for a groupby - AssertionError: all exprs should be Column
我有一个函数可以计算整个数据帧的预测值和实际值的 RMSE:
def calculate_rmse(df, actual_column, prediction_column):
RMSE = F.udf(lambda x, y: ((x - y) ** 2))
df = df.withColumn(
"RMSE", RMSE(F.col(actual_column), F.col(prediction_column))
)
rmse = df.select(F.avg("RMSE") ** 0.5).collect()
rmse = rmse[0]["POWER(avg(RMSE), 0.5)"]
return rmse
test = calculate_rmse(my_df, 'actuals', 'preds')
3690.4535
我想将其应用于 groupby
语句,但是当我这样做时,我得到以下信息:
df_gb = my_df.groupby('start_month', 'start_week').agg(calculate_rmse(my_df, 'actuals', 'preds'))
all exprs should be Column
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/group.py", line 113, in agg
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
AssertionError: all exprs should be Column
有人能指出我正确的方向吗?我是 Pyspark 的新手。
如果你想按组计算 RMSE,我对
提出的解决方案稍作调整
import pyspark.sql.functions as psf
def compute_RMSE(expected_col, actual_col):
rmse = old_df.withColumn("squarederror",
psf.pow(psf.col(actual_col) - psf.col(expected_col),
psf.lit(2)
))
.groupby('start_month', 'start_week')
.agg(psf.avg(psf.col("squarederror")).alias("mse"))
.withColumn("rmse", psf.sqrt(psf.col("mse")))
return(rmse)
compute_RMSE("col1", "col2")
我认为您不需要为此使用 UDF - 我认为您应该能够计算两列之间的差异 (df.withColumn('difference', col('true') - col('pred'))
),然后计算该列的平方 (df.withColumn('squared_difference', pow(col('difference'), lit(2).astype(IntegerType()))
),并计算列的平均值 (df.withColumn('rmse', avg('squared_difference'))
)。将所有内容与示例放在一起:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
df = sql_context.createDataFrame([(0.0, 1.0),
(1.0, 2.0),
(3.0, 5.0),
(1.0, 8.0)], schema=['true', 'predicted'])
df = df.withColumn('difference', F.col('true') - F.col('predicted'))
df = df.withColumn('squared_difference', F.pow(F.col('difference'), F.lit(2).astype(IntegerType())))
rmse = df.select(F.avg(F.col('squared_difference')).alias('rmse'))
print(df.show())
print(rmse.show())
输出:
+----+---------+----------+------------------+
|true|predicted|difference|squared_difference|
+----+---------+----------+------------------+
| 0.0| 1.0| -1.0| 1.0|
| 1.0| 2.0| -1.0| 1.0|
| 3.0| 5.0| -2.0| 4.0|
| 1.0| 8.0| -7.0| 49.0|
+----+---------+----------+------------------+
+-----+
| rmse|
+-----+
|13.75|
+-----+
希望对您有所帮助!
编辑
抱歉,我忘了对结果进行平方根 - 最后一行变为:
rmse = df.select(F.sqrt(F.avg(F.col('squared_difference'))).alias('rmse'))
输出变为:
+------------------+
| rmse|
+------------------+
|3.7080992435478315|
+------------------+
我有一个函数可以计算整个数据帧的预测值和实际值的 RMSE:
def calculate_rmse(df, actual_column, prediction_column):
RMSE = F.udf(lambda x, y: ((x - y) ** 2))
df = df.withColumn(
"RMSE", RMSE(F.col(actual_column), F.col(prediction_column))
)
rmse = df.select(F.avg("RMSE") ** 0.5).collect()
rmse = rmse[0]["POWER(avg(RMSE), 0.5)"]
return rmse
test = calculate_rmse(my_df, 'actuals', 'preds')
3690.4535
我想将其应用于 groupby
语句,但是当我这样做时,我得到以下信息:
df_gb = my_df.groupby('start_month', 'start_week').agg(calculate_rmse(my_df, 'actuals', 'preds'))
all exprs should be Column
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/group.py", line 113, in agg
assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column"
AssertionError: all exprs should be Column
有人能指出我正确的方向吗?我是 Pyspark 的新手。
如果你想按组计算 RMSE,我对
import pyspark.sql.functions as psf
def compute_RMSE(expected_col, actual_col):
rmse = old_df.withColumn("squarederror",
psf.pow(psf.col(actual_col) - psf.col(expected_col),
psf.lit(2)
))
.groupby('start_month', 'start_week')
.agg(psf.avg(psf.col("squarederror")).alias("mse"))
.withColumn("rmse", psf.sqrt(psf.col("mse")))
return(rmse)
compute_RMSE("col1", "col2")
我认为您不需要为此使用 UDF - 我认为您应该能够计算两列之间的差异 (df.withColumn('difference', col('true') - col('pred'))
),然后计算该列的平方 (df.withColumn('squared_difference', pow(col('difference'), lit(2).astype(IntegerType()))
),并计算列的平均值 (df.withColumn('rmse', avg('squared_difference'))
)。将所有内容与示例放在一起:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
sql_context = SQLContext(spark.sparkContext)
df = sql_context.createDataFrame([(0.0, 1.0),
(1.0, 2.0),
(3.0, 5.0),
(1.0, 8.0)], schema=['true', 'predicted'])
df = df.withColumn('difference', F.col('true') - F.col('predicted'))
df = df.withColumn('squared_difference', F.pow(F.col('difference'), F.lit(2).astype(IntegerType())))
rmse = df.select(F.avg(F.col('squared_difference')).alias('rmse'))
print(df.show())
print(rmse.show())
输出:
+----+---------+----------+------------------+
|true|predicted|difference|squared_difference|
+----+---------+----------+------------------+
| 0.0| 1.0| -1.0| 1.0|
| 1.0| 2.0| -1.0| 1.0|
| 3.0| 5.0| -2.0| 4.0|
| 1.0| 8.0| -7.0| 49.0|
+----+---------+----------+------------------+
+-----+
| rmse|
+-----+
|13.75|
+-----+
希望对您有所帮助!
编辑
抱歉,我忘了对结果进行平方根 - 最后一行变为:
rmse = df.select(F.sqrt(F.avg(F.col('squared_difference'))).alias('rmse'))
输出变为:
+------------------+
| rmse|
+------------------+
|3.7080992435478315|
+------------------+