使用优化的 DSL 为 spark 中的每一行生成 2 行
generate 2 rows for each row in spark using optimized DSL
我有这样的数据:
id,ts_start,ts_end,foo_start,foo_end
1,1,2,f_s,f_e
2,3,4,foo,bar
3,3,6,foo,f_e
即聚合了所有开始和结束信息的单个记录。
使用平面地图,这些可以转换为
id,ts,foo
1,1,f_s
1,2,f_e
如何使用优化的 SQL DSL 和 explode
或 pivot
来做同样的事情?
编辑
显然,我不想读入数据两次并合并结果。
或者如果我不想使用 flatmap + serde + 自定义代码,这是唯一的选择吗?
给定:
val df = Seq(
(1,1,2,"f_s","f_e"),
(2,3,4,"foo","bar"),
(3,3,6,"foo","f_e")
).toDF("id","ts_start","ts_end","foo_start","foo_end")
你可以做到:
df
.select($"id",
explode(
array(
struct($"ts_start".as("ts"),$"foo_start".as("foo")),
struct($"ts_end".as("ts"),$"foo_end".as("foo"))
)
).as("tmp")
)
.select(
$"id",
$"tmp.*"
)
.show()
给出:
+---+---+---+
| id| ts|foo|
+---+---+---+
| 1| 1|f_s|
| 1| 2|f_e|
| 2| 3|foo|
| 2| 4|bar|
| 3| 3|foo|
| 3| 6|f_e|
+---+---+---+
我有这样的数据:
id,ts_start,ts_end,foo_start,foo_end
1,1,2,f_s,f_e
2,3,4,foo,bar
3,3,6,foo,f_e
即聚合了所有开始和结束信息的单个记录。 使用平面地图,这些可以转换为
id,ts,foo
1,1,f_s
1,2,f_e
如何使用优化的 SQL DSL 和 explode
或 pivot
来做同样的事情?
编辑
显然,我不想读入数据两次并合并结果。
或者如果我不想使用 flatmap + serde + 自定义代码,这是唯一的选择吗?
给定:
val df = Seq(
(1,1,2,"f_s","f_e"),
(2,3,4,"foo","bar"),
(3,3,6,"foo","f_e")
).toDF("id","ts_start","ts_end","foo_start","foo_end")
你可以做到:
df
.select($"id",
explode(
array(
struct($"ts_start".as("ts"),$"foo_start".as("foo")),
struct($"ts_end".as("ts"),$"foo_end".as("foo"))
)
).as("tmp")
)
.select(
$"id",
$"tmp.*"
)
.show()
给出:
+---+---+---+
| id| ts|foo|
+---+---+---+
| 1| 1|f_s|
| 1| 2|f_e|
| 2| 3|foo|
| 2| 4|bar|
| 3| 3|foo|
| 3| 6|f_e|
+---+---+---+