分组火花数据帧上的最大聚合 returns 错误值
max aggregation on grouped spark dataframe returns wrong value
我有一个包含 2 列(CPID
和 PluginDuration
)的 spark 数据框。我需要为数据帧中的每个 CPID
找到最大值 pluginDuration
和平均值 pluginDuration
。
为 CPID
AN04773
数据框返回的行在以下行中返回:
df.filter('CPID = "AN04773"').show(10)
Result:
+-------+--------------+
| CPID|PluginDuration|
+-------+--------------+
|AN04773| 1.933333333|
|AN04773| 13.03444444|
|AN04773| 9.2875|
|AN04773| 20.50027778|
+-------+--------------+
当我在数据框的 PID
列上执行 groupBy
以查找 max
和 avg
插件持续时间时,我发现某些 PID 返回的最大值不如预期。例如,对于 PID
AN04773
(与我用来显示原始 df 中的行相同的 PID
)。最大值 pluginDuration
应该是 20.50027778
但是从下面代码的结果来看,最大值是 9.2875
这是不对的。
from pyspark.sql import functions as F
fdf = df.groupBy('CPID').agg(F.max('PluginDuration').alias('max_duration'),F.avg('PluginDuration').alias('avg_duration'))
fdf.filter('CPID = "AN04773"').show()
Result:
+-------+------------+--------------+
| CPID|max_duration| avg_duration|
+-------+------------+--------------+
|AN04773| 9.2875|11.18888888825|
+-------+------------+--------------+
想知道为什么它没有按预期运行。
计算错误是因为 PluginDuration
没有定义为数字数据类型,而是定义为字符串列。您所要做的就是将 PluginDuration
列转换为数字类型(double、float 等)。
这是您的问题(转载于 scala
但在 PySpark
中同样有效):
val data = Seq(("AN04773", "1.933333333"), ("AN04773", "13.03444444"), ("AN04773", "9.2875"), ("AN04773", "20.50027778")).toDF("id", "value")
data.groupBy("id").agg(functions.max("value"), avg("value")).show
// output:
+-------+----------+--------------+
| id|max(value)| avg(value)|
+-------+----------+--------------+
|AN04773| 9.2875|11.18888888825|
+-------+----------+--------------+
但是在将 value
列转换为 Double
数据类型后,我们得到了正确的计算值:
data.withColumn("value",col("value").cast("double")).groupBy("id").agg(functions.max("value"), avg("value")).show
// output:
+-------+-----------+--------------+
| id| max(value)| avg(value)|
+-------+-----------+--------------+
|AN04773|20.50027778|11.18888888825|
+-------+-----------+--------------+
由于列中没有定义数据类型,因此 Scala 将其视为字符串并且字符串 9 大于 2,因此最大值为 9.285
注意:如果将 pyspark 中的数据类型转换为字符串,您将获得与在 Scala 中相同的结果。
我有一个包含 2 列(CPID
和 PluginDuration
)的 spark 数据框。我需要为数据帧中的每个 CPID
找到最大值 pluginDuration
和平均值 pluginDuration
。
为 CPID
AN04773
数据框返回的行在以下行中返回:
df.filter('CPID = "AN04773"').show(10)
Result:
+-------+--------------+
| CPID|PluginDuration|
+-------+--------------+
|AN04773| 1.933333333|
|AN04773| 13.03444444|
|AN04773| 9.2875|
|AN04773| 20.50027778|
+-------+--------------+
当我在数据框的 PID
列上执行 groupBy
以查找 max
和 avg
插件持续时间时,我发现某些 PID 返回的最大值不如预期。例如,对于 PID
AN04773
(与我用来显示原始 df 中的行相同的 PID
)。最大值 pluginDuration
应该是 20.50027778
但是从下面代码的结果来看,最大值是 9.2875
这是不对的。
from pyspark.sql import functions as F
fdf = df.groupBy('CPID').agg(F.max('PluginDuration').alias('max_duration'),F.avg('PluginDuration').alias('avg_duration'))
fdf.filter('CPID = "AN04773"').show()
Result:
+-------+------------+--------------+
| CPID|max_duration| avg_duration|
+-------+------------+--------------+
|AN04773| 9.2875|11.18888888825|
+-------+------------+--------------+
想知道为什么它没有按预期运行。
计算错误是因为 PluginDuration
没有定义为数字数据类型,而是定义为字符串列。您所要做的就是将 PluginDuration
列转换为数字类型(double、float 等)。
这是您的问题(转载于 scala
但在 PySpark
中同样有效):
val data = Seq(("AN04773", "1.933333333"), ("AN04773", "13.03444444"), ("AN04773", "9.2875"), ("AN04773", "20.50027778")).toDF("id", "value")
data.groupBy("id").agg(functions.max("value"), avg("value")).show
// output:
+-------+----------+--------------+
| id|max(value)| avg(value)|
+-------+----------+--------------+
|AN04773| 9.2875|11.18888888825|
+-------+----------+--------------+
但是在将 value
列转换为 Double
数据类型后,我们得到了正确的计算值:
data.withColumn("value",col("value").cast("double")).groupBy("id").agg(functions.max("value"), avg("value")).show
// output:
+-------+-----------+--------------+
| id| max(value)| avg(value)|
+-------+-----------+--------------+
|AN04773|20.50027778|11.18888888825|
+-------+-----------+--------------+
由于列中没有定义数据类型,因此 Scala 将其视为字符串并且字符串 9 大于 2,因此最大值为 9.285
注意:如果将 pyspark 中的数据类型转换为字符串,您将获得与在 Scala 中相同的结果。