Spark(scala):groupby并将值列表聚合到一个基于索引的列表
Spark (scala): groupby and aggregate list of values to one list based on index
我有以下数据框:
root
|-- visitor: string (nullable = true)
|-- asset: array (nullable = true)
| |-- element: string (containsNull = true)
我正在尝试将共享相似索引(访问者)的值列表分组为原始列表类型(数组)的单个列表。
示例:
val rawData1 = Seq(("visitor1",Array("item1","item2","item3","item4")),("visitor2",Array("item1","item2","item3")))
val rawData2 = Seq(("visitor1",Array("item1","item2","item5")),("visitor2",Array("item4","item7")))
val df1 = spark.createDataFrame(rawData1).toDF("visitor","asset")
val df2 = spark.createDataFrame(rawData2).toDF("visitor","asset")
val dfJoined = df1.union(df2)
dfJoined.groupBy("visitor").agg(collect_list("asset"))
我得到的是:
visitor collect_list(asset)
visitor2 [WrappedArray(item1, item2, item3), WrappedArray(item4, item7)]
visitor1 [WrappedArray(item1, item2, item3, item4), WrappedArray(item1, item2, item5)]
但我不希望资产列中有两个子列表,我希望将两个列表的所有值分组为一个原始类型(数组)的列表。
谢谢!
一种选择是在合并之前用 explode
压平 df1
和 df2
,然后进行聚合:
(df1.withColumn("asset", explode($"asset"))
.union(df2.withColumn("asset", explode($"asset")))
.groupBy("visitor")
.agg(collect_list("asset"))
).show(false)
+--------+-------------------------------------------------+
|visitor |collect_list(asset) |
+--------+-------------------------------------------------+
|visitor2|[item1, item2, item3, item4, item7] |
|visitor1|[item1, item2, item3, item4, item1, item2, item5]|
+--------+-------------------------------------------------+
我有以下数据框:
root
|-- visitor: string (nullable = true)
|-- asset: array (nullable = true)
| |-- element: string (containsNull = true)
我正在尝试将共享相似索引(访问者)的值列表分组为原始列表类型(数组)的单个列表。
示例:
val rawData1 = Seq(("visitor1",Array("item1","item2","item3","item4")),("visitor2",Array("item1","item2","item3")))
val rawData2 = Seq(("visitor1",Array("item1","item2","item5")),("visitor2",Array("item4","item7")))
val df1 = spark.createDataFrame(rawData1).toDF("visitor","asset")
val df2 = spark.createDataFrame(rawData2).toDF("visitor","asset")
val dfJoined = df1.union(df2)
dfJoined.groupBy("visitor").agg(collect_list("asset"))
我得到的是:
visitor collect_list(asset)
visitor2 [WrappedArray(item1, item2, item3), WrappedArray(item4, item7)]
visitor1 [WrappedArray(item1, item2, item3, item4), WrappedArray(item1, item2, item5)]
但我不希望资产列中有两个子列表,我希望将两个列表的所有值分组为一个原始类型(数组)的列表。
谢谢!
一种选择是在合并之前用 explode
压平 df1
和 df2
,然后进行聚合:
(df1.withColumn("asset", explode($"asset"))
.union(df2.withColumn("asset", explode($"asset")))
.groupBy("visitor")
.agg(collect_list("asset"))
).show(false)
+--------+-------------------------------------------------+
|visitor |collect_list(asset) |
+--------+-------------------------------------------------+
|visitor2|[item1, item2, item3, item4, item7] |
|visitor1|[item1, item2, item3, item4, item1, item2, item5]|
+--------+-------------------------------------------------+