Spark:数据集上的地图组
Spark: Mapgroups on a Dataset
我正在下面的数据集上尝试这个 mapgroups 函数
并且不确定为什么我在 "Total Value" 列中得到 0。
我在这里错过了什么吗???请指教
Spark 版本 - 2.0
Scala 版本 - 2.11
case class Record(Hour: Int, Category: String,TotalComm: Double, TotalValue: Int)
val ss = (SparkSession)
import ss.implicits._
val df: DataFrame = ss.sparkContext.parallelize(Seq(
(0, "cat26", 30.9, 200), (0, "cat26", 22.1, 100), (0, "cat95", 19.6, 300), (1, "cat4", 1.3, 100),
(1, "cat23", 28.5, 100), (1, "cat4", 26.8, 400), (1, "cat13", 12.6, 250), (1, "cat23", 5.3, 300),
(0, "cat26", 39.6, 30), (2, "cat40", 29.7, 500), (1, "cat4", 27.9, 600), (2, "cat68", 9.8, 100),
(1, "cat23", 35.6, 500))).toDF("Hour", "Category","TotalComm", "TotalValue")
val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
.groupByKey(_._1).mapGroups{case(k,iter) => (k._1,k._2,iter.map(x => x._2._1).sum,iter.map(y => y._2._2).sum)}
.toDF("KeyHour","KeyCategory","TotalComm","TotalValue").orderBy(asc("KeyHour"))
resultSum.show()
+-------+-----------+---------+----------+
|KeyHour|KeyCategory|TotalComm|TotalValue|
+-------+-----------+---------+----------+
| 0| cat26| 92.6| 0|
| 0| cat95| 19.6| 0|
| 1| cat13| 12.6| 0|
| 1| cat23| 69.4| 0|
| 1| cat4| 56.0| 0|
| 2| cat40| 29.7| 0|
| 2| cat68| 9.8| 0|
+-------+-----------+---------+----------+
iter
在mapGroups
里面是一个缓冲区并且计算只能执行一次。因此,当您求和为 iter.map(x => x._2._1).sum
时, iter 缓冲区 中没有任何剩余,因此 iter.map(y => y._2._2).sum
操作产生 0。所以你必须找到一种机制来计算同一次迭代中两者的总和
for 循环与 ListBuffers
为了简单起见,我使用了 for
循环和 ListBuffer
来同时求和
val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
.groupByKey(_._1).mapGroups{case(k,iter) => {
val listBuffer1 = new ListBuffer[Double]
val listBuffer2 = new ListBuffer[Int]
for(a <- iter){
listBuffer1 += a._2._1
listBuffer2 += a._2._2
}
(k._1, k._2, listBuffer1.sum, listBuffer2.sum)
}}
.toDF("KeyHour","KeyCategory","TotalComm","TotalValue").orderBy($"KeyHour".asc)
这应该会给你正确的结果
+-------+-----------+---------+----------+
|KeyHour|KeyCategory|TotalComm|TotalValue|
+-------+-----------+---------+----------+
| 0| cat26| 92.6| 330|
| 0| cat95| 19.6| 300|
| 1| cat23| 69.4| 900|
| 1| cat13| 12.6| 250|
| 1| cat4| 56.0| 1100|
| 2| cat68| 9.8| 100|
| 2| cat40| 29.7| 500|
+-------+-----------+---------+----------+
希望回答对你有帮助
正如 Ramesh Maharjan 指出的那样,问题在于两次使用迭代器,这将导致 TotalValue
列为 0。但是,甚至没有必要使用 groupByKey
和mapGroups
从头开始。可以使用 groupBy
和 agg
来完成相同的操作,这将使代码更加清晰易读。作为一个加号,它也避免使用慢 groupByKey
。
以下内容同样有效:
val resultSum = df.groupBy($"Hour", $"Category")
.agg(sum($"TotalComm").as("TotalComm"), sum($"TotalValue").as("TotalValue"))
.orderBy(asc("Hour"))
结果:
+----+--------+---------+----------+
|Hour|Category|TotalComm|TotalValue|
+----+--------+---------+----------+
| 0| cat95| 19.6| 300|
| 0| cat26| 92.6| 330|
| 1| cat23| 69.4| 900|
| 1| cat13| 12.6| 250|
| 1| cat4| 56.0| 1100|
| 2| cat68| 9.8| 100|
| 2| cat40| 29.7| 500|
+----+--------+---------+----------+
如果您仍想更改小时和类别列的名称,只需将 groupBy
更改为
即可轻松完成
groupBy($"Hour".as("KeyHour"), $"Category".as("KeyCategory"))
我正在下面的数据集上尝试这个 mapgroups 函数 并且不确定为什么我在 "Total Value" 列中得到 0。 我在这里错过了什么吗???请指教
Spark 版本 - 2.0 Scala 版本 - 2.11
case class Record(Hour: Int, Category: String,TotalComm: Double, TotalValue: Int)
val ss = (SparkSession)
import ss.implicits._
val df: DataFrame = ss.sparkContext.parallelize(Seq(
(0, "cat26", 30.9, 200), (0, "cat26", 22.1, 100), (0, "cat95", 19.6, 300), (1, "cat4", 1.3, 100),
(1, "cat23", 28.5, 100), (1, "cat4", 26.8, 400), (1, "cat13", 12.6, 250), (1, "cat23", 5.3, 300),
(0, "cat26", 39.6, 30), (2, "cat40", 29.7, 500), (1, "cat4", 27.9, 600), (2, "cat68", 9.8, 100),
(1, "cat23", 35.6, 500))).toDF("Hour", "Category","TotalComm", "TotalValue")
val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
.groupByKey(_._1).mapGroups{case(k,iter) => (k._1,k._2,iter.map(x => x._2._1).sum,iter.map(y => y._2._2).sum)}
.toDF("KeyHour","KeyCategory","TotalComm","TotalValue").orderBy(asc("KeyHour"))
resultSum.show()
+-------+-----------+---------+----------+
|KeyHour|KeyCategory|TotalComm|TotalValue|
+-------+-----------+---------+----------+
| 0| cat26| 92.6| 0|
| 0| cat95| 19.6| 0|
| 1| cat13| 12.6| 0|
| 1| cat23| 69.4| 0|
| 1| cat4| 56.0| 0|
| 2| cat40| 29.7| 0|
| 2| cat68| 9.8| 0|
+-------+-----------+---------+----------+
iter
在mapGroups
里面是一个缓冲区并且计算只能执行一次。因此,当您求和为 iter.map(x => x._2._1).sum
时, iter 缓冲区 中没有任何剩余,因此 iter.map(y => y._2._2).sum
操作产生 0。所以你必须找到一种机制来计算同一次迭代中两者的总和
for 循环与 ListBuffers
为了简单起见,我使用了 for
循环和 ListBuffer
来同时求和
val resultSum = df.as[Record].map(row => ((row.Hour,row.Category),(row.TotalComm,row.TotalValue)))
.groupByKey(_._1).mapGroups{case(k,iter) => {
val listBuffer1 = new ListBuffer[Double]
val listBuffer2 = new ListBuffer[Int]
for(a <- iter){
listBuffer1 += a._2._1
listBuffer2 += a._2._2
}
(k._1, k._2, listBuffer1.sum, listBuffer2.sum)
}}
.toDF("KeyHour","KeyCategory","TotalComm","TotalValue").orderBy($"KeyHour".asc)
这应该会给你正确的结果
+-------+-----------+---------+----------+
|KeyHour|KeyCategory|TotalComm|TotalValue|
+-------+-----------+---------+----------+
| 0| cat26| 92.6| 330|
| 0| cat95| 19.6| 300|
| 1| cat23| 69.4| 900|
| 1| cat13| 12.6| 250|
| 1| cat4| 56.0| 1100|
| 2| cat68| 9.8| 100|
| 2| cat40| 29.7| 500|
+-------+-----------+---------+----------+
希望回答对你有帮助
正如 Ramesh Maharjan 指出的那样,问题在于两次使用迭代器,这将导致 TotalValue
列为 0。但是,甚至没有必要使用 groupByKey
和mapGroups
从头开始。可以使用 groupBy
和 agg
来完成相同的操作,这将使代码更加清晰易读。作为一个加号,它也避免使用慢 groupByKey
。
以下内容同样有效:
val resultSum = df.groupBy($"Hour", $"Category")
.agg(sum($"TotalComm").as("TotalComm"), sum($"TotalValue").as("TotalValue"))
.orderBy(asc("Hour"))
结果:
+----+--------+---------+----------+
|Hour|Category|TotalComm|TotalValue|
+----+--------+---------+----------+
| 0| cat95| 19.6| 300|
| 0| cat26| 92.6| 330|
| 1| cat23| 69.4| 900|
| 1| cat13| 12.6| 250|
| 1| cat4| 56.0| 1100|
| 2| cat68| 9.8| 100|
| 2| cat40| 29.7| 500|
+----+--------+---------+----------+
如果您仍想更改小时和类别列的名称,只需将 groupBy
更改为
groupBy($"Hour".as("KeyHour"), $"Category".as("KeyCategory"))