使用优化的 DSL 为 spark 中的每一行生成 2 行

generate 2 rows for each row in spark using optimized DSL

我有这样的数据:

id,ts_start,ts_end,foo_start,foo_end
1,1,2,f_s,f_e
2,3,4,foo,bar
3,3,6,foo,f_e

即聚合了所有开始和结束信息的单个记录。 使用平面地图,这些可以转换为

id,ts,foo
1,1,f_s
1,2,f_e

如何使用优化的 SQL DSL 和 explodepivot 来做同样的事情?

编辑

显然,我不想读入数据两次并合并结果。

或者如果我不想使用 flatmap + serde + 自定义代码,这是唯一的选择吗?

给定:

val df = Seq(
  (1,1,2,"f_s","f_e"),
  (2,3,4,"foo","bar"),
  (3,3,6,"foo","f_e")
).toDF("id","ts_start","ts_end","foo_start","foo_end")

你可以做到:

df
  .select($"id",
    explode(
      array(
       struct($"ts_start".as("ts"),$"foo_start".as("foo")),
       struct($"ts_end".as("ts"),$"foo_end".as("foo"))
     )
    ).as("tmp")
  )
  .select(
    $"id",
    $"tmp.*"
  )
  .show()

给出:

+---+---+---+
| id| ts|foo|
+---+---+---+
|  1|  1|f_s|
|  1|  2|f_e|
|  2|  3|foo|
|  2|  4|bar|
|  3|  3|foo|
|  3|  6|f_e|
+---+---+---+