使用 flatMap / reduce:处理包含行列表的行

Using flatMap / reduce: dealing with rows containing a list of rows

我有一个数据框,每行包含一个行数组

我想将所有内部行聚合到一个数据框中

以下是我所取得/取得的成就:

这个

df.select('*').take(1)

给我这个:

[
   Row(
       body=[
               Row(a=1, b=1), 
               Row(a=2, b=2)
            ]
      )
]

这样做:

df.rdd.flatMap(lambda x: x).collect()

我明白了:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

所以我被迫这样做:

df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)

所以我可以实现以下目标:

[
  Row(a=1, b=1) 
  Row(a=2, b=2)
]

使用上面的结果,我终于可以将它转换为数据帧并保存在某个地方。这就是我想要的。但是调用 flatMap 两次看起来不对。

我尝试使用 Reduce 来达到同样的效果,就像下面的代码:

flatRdd = df.rdd.flatMap(lambda x: x)        
dfMerged = reduce(DataFrame.unionByName, [flatRdd])

reduce的第二个参数必须是可迭代的,所以我被迫添加[flatRdd]。可悲的是,它给了我这个:

[[
   Row(a=1, b=1)
   Row(a=2, b=2)
]]

肯定有更好的方法来实现我想要的。

IIUC,您可以 explode,然后使用 .* 语法将生成的 Row 展平。

假设您从以下 DataFrame 开始:

df.show()
#+----------------+
#|            body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+

架构:

df.printSchema()
#root
# |-- body: array (nullable = true)
# |    |-- element: struct (containsNull = true)
# |    |    |-- a: long (nullable = true)
# |    |    |-- b: long (nullable = true)

您可以先 explode body 列:

from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#|  [1, 1]|
#|  [2, 2]|
#+--------+

现在:

df = df.select("exploded.*")
df.show()
#+---+---+
#|  a|  b|
#+---+---+
#|  1|  1|
#|  2|  2|
#+---+---+

现在,如果您调用 collect,您将获得所需的输出:

print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]

另请参阅:

你不需要运行 Row对象上的flatMap(),直接用键引用它:

>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]