在 Pyspark [non pandas df] 中将多个数据帧合并为一个

Merge many dataframes into one in Pyspark [non pandas df]

我将通过一个过程一个一个地生成数据帧。我必须将它们合并为一个。

+--------+----------+
|    Name|Age       |
+--------+----------+
|Alex    |        30|
+--------+----------+


+--------+----------+
|    Name|Age       |
+--------+----------+
|Earl    |        32|
+--------+----------+


+--------+----------+
|    Name|Age       |
+--------+----------+
|Jane    |        15|
+--------+----------+

最后:

+--------+----------+
|    Name|Age       |
+--------+----------+
|Alex    |        30|
+--------+----------+
|Earl    |        32|
+--------+----------+
|Jane    |        15|
+--------+----------+

尝试了很多选项,例如 concat、merge、append,但我猜都是 pandas 库。我没有使用 pandas。使用版本 python 2.7 和 Spark 2.2

编辑以涵盖带有 foreachpartition 的最终场景:

l = [('Alex', 30)]
k = [('Earl', 32)]

ldf = spark.createDataFrame(l, ('Name', 'Age'))
ldf = spark.createDataFrame(k, ('Name', 'Age'))
# option 1:
union_df(ldf).show()
#option 2:
uxdf = union_df(ldf)
uxdf.show()

两种情况下的输出:

+-------+---+
|   Name|Age|
+-------+---+
|Earl   | 32|
+-------+---+

您可以对数据帧使用 unionAll()

from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.union, dfs)

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v"))
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v"))
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v"))

unionAll(df1, df2, df3).show()

## +---+----+
## |  k|   v|
## +---+----+
## |  1|foo1|
## |  2|bar1|
## |  3|foo2|
## |  4|bar2|
## |  5|foo3|
## |  6|bar3|
## +---+----+

编辑:

您可以创建一个空数据框,并继续对其进行联合:

# Create first dataframe
ldf = spark.createDataFrame(l, ["Name", "Age"])
ldf.show()
# Save it's schema
schema = ldf.schema
# Create an empty DF with the same schema, (you need to provide schema to create empty dataframe)
empty_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)
empty_df.show()
# Union the first DF with the empty df
empty_df = empty_df.union(ldf)
empty_df.show()
# New dataframe after some operations
ldf = spark.createDataFrame(k, schema)
# Union with the empty_df again
empty_df = empty_df.union(ldf)
empty_df.show()
# First DF ldf
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
+----+---+

# Empty dataframe empty_df
+----+---+
|Name|Age|
+----+---+
+----+---+

# After first union empty_df.union(ldf)
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
+----+---+
# After second union with new ldf 
+----+---+
|Name|Age|
+----+---+
|Alex| 30|
|Earl| 32|
+----+---+