Spark 数据集:return 具有相同键值的 HashMap
Spark dataset: return a HashMap of values having same key
+------+-----+
|userID|entID|
+------+-----+
| 0| 5|
| 0| 15|
| 1| 7|
| 1| 3|
| 2| 3|
| 2| 4|
| 2| 5|
| 2| 9|
| 3| 25|
+------+-----+
我希望结果为 {0->(5,15), 1->(7,3),..}
如有任何帮助,我们将不胜感激。
一种方法是将数据集转换为 RDD 并执行 groupByKey
。要获得 Map
的结果,如果数据集不是太大,您需要 collect
提供的分组 RDD:
val ds = Seq(
(0, 5), (0, 15), (1, 7), (1, 3),
(2, 3), (2, 4), (2, 5), (2, 9), (3, 25)
).toDF("userID", "entID").as[(Int, Int)]
// ds: org.apache.spark.sql.Dataset[(Int, Int)] =[userID: int, entID: int]
val map = ds.rdd.groupByKey.collectAsMap
// map: scala.collection.Map[Int,Iterable[Int]] = Map(
// 2 -> CompactBuffer(3, 4, 5, 9), 1 -> CompactBuffer(7, 3),
// 3 -> CompactBuffer(25), 0 -> CompactBuffer(5, 15)
// )
又是你的table:
val df = Seq(
(0, 5),
(0, 15),
(1, 7),
(1, 3),
(2, 3),
(2, 4),
(2, 5),
(2, 9),
(3, 25)
).toDF("userId", "entId")
df.show()
输出:
+------+-----+
|userId|entId|
+------+-----+
| 0| 5|
| 0| 15|
| 1| 7|
| 1| 3|
| 2| 3|
| 2| 4|
| 2| 5|
| 2| 9|
| 3| 25|
+------+-----+
现在您可以按 userId
分组,然后将 endId
收集到列表中,将结果列与列表别名为 entIds
:
import org.apache.spark.sql.functions._
val entIdsForUserId = df.
groupBy($"userId").
agg(collect_list($"entId").alias("entIds"))
entIdsForUserId.show()
输出:
+------+------------+
|userId| entIds|
+------+------------+
| 1| [7, 3]|
| 3| [25]|
| 2|[3, 4, 5, 9]|
| 0| [5, 15]|
+------+------------+
未指定groupBy
后的顺序。根据你想用它做什么,你可以另外对它进行排序。
您可以将其收集到主节点上的单个地图中:
val m = entIdsForUserId.
map(r => (r.getAs[Int](0), r.getAs[Seq[Int]](1))).
collect.toMap
这会给你:
Map(1 -> List(7, 3), 3 -> List(25), 2 -> List(3, 4, 5, 9), 0 -> List(5, 15))
+------+-----+
|userID|entID|
+------+-----+
| 0| 5|
| 0| 15|
| 1| 7|
| 1| 3|
| 2| 3|
| 2| 4|
| 2| 5|
| 2| 9|
| 3| 25|
+------+-----+
我希望结果为 {0->(5,15), 1->(7,3),..}
如有任何帮助,我们将不胜感激。
一种方法是将数据集转换为 RDD 并执行 groupByKey
。要获得 Map
的结果,如果数据集不是太大,您需要 collect
提供的分组 RDD:
val ds = Seq(
(0, 5), (0, 15), (1, 7), (1, 3),
(2, 3), (2, 4), (2, 5), (2, 9), (3, 25)
).toDF("userID", "entID").as[(Int, Int)]
// ds: org.apache.spark.sql.Dataset[(Int, Int)] =[userID: int, entID: int]
val map = ds.rdd.groupByKey.collectAsMap
// map: scala.collection.Map[Int,Iterable[Int]] = Map(
// 2 -> CompactBuffer(3, 4, 5, 9), 1 -> CompactBuffer(7, 3),
// 3 -> CompactBuffer(25), 0 -> CompactBuffer(5, 15)
// )
又是你的table:
val df = Seq(
(0, 5),
(0, 15),
(1, 7),
(1, 3),
(2, 3),
(2, 4),
(2, 5),
(2, 9),
(3, 25)
).toDF("userId", "entId")
df.show()
输出:
+------+-----+
|userId|entId|
+------+-----+
| 0| 5|
| 0| 15|
| 1| 7|
| 1| 3|
| 2| 3|
| 2| 4|
| 2| 5|
| 2| 9|
| 3| 25|
+------+-----+
现在您可以按 userId
分组,然后将 endId
收集到列表中,将结果列与列表别名为 entIds
:
import org.apache.spark.sql.functions._
val entIdsForUserId = df.
groupBy($"userId").
agg(collect_list($"entId").alias("entIds"))
entIdsForUserId.show()
输出:
+------+------------+
|userId| entIds|
+------+------------+
| 1| [7, 3]|
| 3| [25]|
| 2|[3, 4, 5, 9]|
| 0| [5, 15]|
+------+------------+
未指定groupBy
后的顺序。根据你想用它做什么,你可以另外对它进行排序。
您可以将其收集到主节点上的单个地图中:
val m = entIdsForUserId.
map(r => (r.getAs[Int](0), r.getAs[Seq[Int]](1))).
collect.toMap
这会给你:
Map(1 -> List(7, 3), 3 -> List(25), 2 -> List(3, 4, 5, 9), 0 -> List(5, 15))