有没有更简单的方法将 100 多个 PySpark 数据框与不同的列组合在一起(不是合并,而是追加)

is there any easier way to combine 100+ PySpark dataframe with different columns together (not merge, but append)

假设我有很多数据框,结构相似,但列不同。我想将它们全部组合在一起,如何以更简单的方式进行?

例如df1df2df3如下:

df1

   id   base1 base2 col1 col2 col3 col4
   1    1     100   30    1    2    3
   2    2     200   40    2    3    4
   3    3     300   20    4    4    5

df2

   id   base1 base2 col1
   5    4     100   15
   6    1     99    18
   7    2     89    9

df3

   id   base1 base2 col1 col2
   9    2     77    12    3
   10   1     89    16    5
   11   2     88    10    7

成为:

   id   base1 base2 col1 col2 col3 col4
   1    1     100   30    1    2    3
   2    2     200   40    2    3    4
   3    3     300   20    4    4    5
   5    4     100   15   NaN  NaN  NaN 
   6    1     99    18   NaN  NaN  NaN 
   7    2     89    9    NaN  NaN  NaN 
   9    2     77    12    3   NaN  NaN
   10   1     89    16    5   NaN  NaN
   11   2     88    10    7   NaN  NaN

目前我使用这个代码:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

df_comb1=customUnion(df1,df2)
df_comb2=customUnion(df_comb1,df3)

但是,如果我继续创建新的数据框,如 df4df5 等。 (100+) 我的代码变得混乱。

有没有更简单的编码方式?

提前致谢

您可以使用数据框列表和函数来管理它,而不必为每个数据框静态命名...

dataframes = [df1,df2,df3] # load data frames

计算所有可能列的集合:

all_cols = {i for lst in [df.columns for df in dataframes] for i in lst}
#{'base1', 'base2', 'col1', 'col2', 'col3', 'col4', 'id'}

向 DF 添加缺失列的函数:

def add_missing_cols(df, cols):
    v = df
    for col in [c for c in cols if (not c in df.columns)]:
        v = v.withColumn(col, f.lit(None))
    return v

completed_dfs = [add_missing_cols(df, all_cols) for df in dataframes]

res = completed_dfs[0]
for df in completed_dfs[1:]:
    res = res.unionAll(df)

res.show()
+---+-----+-----+----+----+----+----+
| id|base1|base2|col1|col2|col3|col4|
+---+-----+-----+----+----+----+----+
|  1|    1|  100|  30|   1|   2|   3|
|  2|    2|  200|  40|   2|   3|   4|
|  3|    3|  300|  20|   4|   4|   5|
|  5|    4|  100|  15|null|null|null|
|  6|    1|   99|  18|null|null|null|
|  7|    2|   89|   9|null|null|null|
|  9|    2|   77|  12|   3|null|null|
| 10|    1|   89|  16|   5|null|null|
| 11|    2|   88|  10|   7|null|null|
+---+-----+-----+----+----+----+----+