UDAF 合并 Spark 中第一个 orderdby 的行 DataSet/Dataframe
UDAF merge rows where are first orderdby in a Spark DataSet/Dataframe
假设我们在 Spark 中有一个 dataset
/dataframe
,其中有 3 列
ID
、Word
、Timestamp
我想编写一个 UDAF
函数,我可以在其中执行类似的操作
df.show()
ID | Word | Timestamp
1 | I | "2017-1-1 00:01"
1 | am | "2017-1-1 00:02"
1 | Chris | "2017-1-1 00:03"
2 | I | "2017-1-1 00:01"
2 | am | "2017-1-1 00:02"
2 | Jessica | "2017-1-1 00:03"
val df_merged = df.groupBy("ID")
.sort("ID", "Timestamp")
.agg(custom_agg("ID", "Word", "Timestamp")
df_merged.show
ID | Words | StartTime | EndTime |
1 | "I am Chris" | "2017-1-1 00:01" | "2017-1-1 00:03" |
1 | "I am Jessica" | "2017-1-1 00:01" | "2017-1-1 00:03" |
问题是如何确保 Words
列在我的 UDAF
中以正确的顺序合并?
抱歉,我不会使用 Scala,希望您能阅读。
Window
函数可以做你想做的事:
df = df.withColumn('Words', f.collect_list(df['Word']).over(
Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
end=Window.unboundedFollowing)))
输出:
+---+-------+-----------------+----------------+
| ID| Word| Timestamp| Words|
+---+-------+-----------------+----------------+
| 1| I|2017-1-1 00:01:00| [I, am, Chris]|
| 1| am|2017-1-1 00:02:00| [I, am, Chris]|
| 1| Chris|2017-1-1 00:03:00| [I, am, Chris]|
| 2| I|2017-1-1 00:01:00|[I, am, Jessica]|
| 2| am|2017-1-1 00:02:00|[I, am, Jessica]|
| 2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+
然后groupBy
以上数据:
df = df.groupBy(df['ID'], df['Words']).agg(
f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))
输出:
+---+------------+-----------------+-----------------+
| ID| Words| StartTime| EndTime|
+---+------------+-----------------+-----------------+
| 1| I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
| 2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+
这是 Spark 2 的 groupByKey
的解决方案(与未类型化的 Dataset
一起使用)。groupByKey 的优点是您可以访问该组(您获得 Iterator[Row]
在 mapGroups
):
df.groupByKey(r => r.getAs[Int]("ID"))
.mapGroups{case(id,rows) => {
val sorted = rows
.toVector
.map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
.sortBy(_._2.getTime)
(id,
sorted.map(_._1).mkString(" "),
sorted.map(_._2).head,
sorted.map(_._2).last
)
}
}.toDF("ID","Words","StartTime","EndTime")
假设我们在 Spark 中有一个 dataset
/dataframe
,其中有 3 列
ID
、Word
、Timestamp
我想编写一个 UDAF
函数,我可以在其中执行类似的操作
df.show()
ID | Word | Timestamp
1 | I | "2017-1-1 00:01"
1 | am | "2017-1-1 00:02"
1 | Chris | "2017-1-1 00:03"
2 | I | "2017-1-1 00:01"
2 | am | "2017-1-1 00:02"
2 | Jessica | "2017-1-1 00:03"
val df_merged = df.groupBy("ID")
.sort("ID", "Timestamp")
.agg(custom_agg("ID", "Word", "Timestamp")
df_merged.show
ID | Words | StartTime | EndTime |
1 | "I am Chris" | "2017-1-1 00:01" | "2017-1-1 00:03" |
1 | "I am Jessica" | "2017-1-1 00:01" | "2017-1-1 00:03" |
问题是如何确保 Words
列在我的 UDAF
中以正确的顺序合并?
抱歉,我不会使用 Scala,希望您能阅读。
Window
函数可以做你想做的事:
df = df.withColumn('Words', f.collect_list(df['Word']).over(
Window().partitionBy(df['ID']).orderBy('Timestamp').rowsBetween(start=Window.unboundedPreceding,
end=Window.unboundedFollowing)))
输出:
+---+-------+-----------------+----------------+
| ID| Word| Timestamp| Words|
+---+-------+-----------------+----------------+
| 1| I|2017-1-1 00:01:00| [I, am, Chris]|
| 1| am|2017-1-1 00:02:00| [I, am, Chris]|
| 1| Chris|2017-1-1 00:03:00| [I, am, Chris]|
| 2| I|2017-1-1 00:01:00|[I, am, Jessica]|
| 2| am|2017-1-1 00:02:00|[I, am, Jessica]|
| 2|Jessica|2017-1-1 00:03:00|[I, am, Jessica]|
+---+-------+-----------------+----------------+
然后groupBy
以上数据:
df = df.groupBy(df['ID'], df['Words']).agg(
f.min(df['Timestamp']).alias('StartTime'), f.max(df['Timestamp']).alias('EndTime'))
df = df.withColumn('Words', f.concat_ws(' ', df['Words']))
输出:
+---+------------+-----------------+-----------------+
| ID| Words| StartTime| EndTime|
+---+------------+-----------------+-----------------+
| 1| I am Chris|2017-1-1 00:01:00|2017-1-1 00:03:00|
| 2|I am Jessica|2017-1-1 00:01:00|2017-1-1 00:03:00|
+---+------------+-----------------+-----------------+
这是 Spark 2 的 groupByKey
的解决方案(与未类型化的 Dataset
一起使用)。groupByKey 的优点是您可以访问该组(您获得 Iterator[Row]
在 mapGroups
):
df.groupByKey(r => r.getAs[Int]("ID"))
.mapGroups{case(id,rows) => {
val sorted = rows
.toVector
.map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))
.sortBy(_._2.getTime)
(id,
sorted.map(_._1).mkString(" "),
sorted.map(_._2).head,
sorted.map(_._2).last
)
}
}.toDF("ID","Words","StartTime","EndTime")