使用 flatMap / reduce:处理包含行列表的行
Using flatMap / reduce: dealing with rows containing a list of rows
我有一个数据框,每行包含一个行数组
我想将所有内部行聚合到一个数据框中
以下是我所取得/取得的成就:
这个
df.select('*').take(1)
给我这个:
[
Row(
body=[
Row(a=1, b=1),
Row(a=2, b=2)
]
)
]
这样做:
df.rdd.flatMap(lambda x: x).collect()
我明白了:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
所以我被迫这样做:
df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)
所以我可以实现以下目标:
[
Row(a=1, b=1)
Row(a=2, b=2)
]
使用上面的结果,我终于可以将它转换为数据帧并保存在某个地方。这就是我想要的。但是调用 flatMap 两次看起来不对。
我尝试使用 Reduce 来达到同样的效果,就像下面的代码:
flatRdd = df.rdd.flatMap(lambda x: x)
dfMerged = reduce(DataFrame.unionByName, [flatRdd])
reduce的第二个参数必须是可迭代的,所以我被迫添加[flatRdd]。可悲的是,它给了我这个:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
肯定有更好的方法来实现我想要的。
IIUC,您可以 explode
,然后使用 .*
语法将生成的 Row
展平。
假设您从以下 DataFrame 开始:
df.show()
#+----------------+
#| body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+
架构:
df.printSchema()
#root
# |-- body: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
您可以先 explode
body
列:
from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#| [1, 1]|
#| [2, 2]|
#+--------+
现在:
df = df.select("exploded.*")
df.show()
#+---+---+
#| a| b|
#+---+---+
#| 1| 1|
#| 2| 2|
#+---+---+
现在,如果您调用 collect
,您将获得所需的输出:
print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]
另请参阅:
你不需要运行 Row对象上的flatMap(),直接用键引用它:
>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]
我有一个数据框,每行包含一个行数组
我想将所有内部行聚合到一个数据框中
以下是我所取得/取得的成就:
这个
df.select('*').take(1)
给我这个:
[
Row(
body=[
Row(a=1, b=1),
Row(a=2, b=2)
]
)
]
这样做:
df.rdd.flatMap(lambda x: x).collect()
我明白了:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
所以我被迫这样做:
df.rdd.flatMap(lambda x: x).flatMap(lambda x: x)
所以我可以实现以下目标:
[
Row(a=1, b=1)
Row(a=2, b=2)
]
使用上面的结果,我终于可以将它转换为数据帧并保存在某个地方。这就是我想要的。但是调用 flatMap 两次看起来不对。
我尝试使用 Reduce 来达到同样的效果,就像下面的代码:
flatRdd = df.rdd.flatMap(lambda x: x)
dfMerged = reduce(DataFrame.unionByName, [flatRdd])
reduce的第二个参数必须是可迭代的,所以我被迫添加[flatRdd]。可悲的是,它给了我这个:
[[
Row(a=1, b=1)
Row(a=2, b=2)
]]
肯定有更好的方法来实现我想要的。
IIUC,您可以 explode
,然后使用 .*
语法将生成的 Row
展平。
假设您从以下 DataFrame 开始:
df.show()
#+----------------+
#| body|
#+----------------+
#|[[1, 1], [2, 2]]|
#+----------------+
架构:
df.printSchema()
#root
# |-- body: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- a: long (nullable = true)
# | | |-- b: long (nullable = true)
您可以先 explode
body
列:
from pyspark.sql.functions import explode
df = df.select(explode("body").alias("exploded"))
df.show()
#+--------+
#|exploded|
#+--------+
#| [1, 1]|
#| [2, 2]|
#+--------+
现在
df = df.select("exploded.*")
df.show()
#+---+---+
#| a| b|
#+---+---+
#| 1| 1|
#| 2| 2|
#+---+---+
现在,如果您调用 collect
,您将获得所需的输出:
print(df.collect())
#[Row(a=1, b=1), Row(a=2, b=2)]
另请参阅:
你不需要运行 Row对象上的flatMap(),直接用键引用它:
>>> df.rdd.flatMap(lambda x: x.body).collect()
[Row(a=1, b=1), Row(a=2, b=2)]