连接 pyspark 中多列的通用合并
Generic coalesce of multiple columns in join pyspark
我必须合并许多 spark DataFrame。合并后,我想在具有相同名称的多个列之间执行合并。
我能够按照这个 创建一个最小的例子。
但是,我需要一段更通用的代码来支持:一组要合并的变量(在示例 set_vars = set(('var1','var2'))
中)和多个连接键(在示例 join_keys = set(('id'))
中)。
在 pyspark
中是否有更简洁(更通用)的方法来获得此结果?
df1 = spark.createDataFrame([
( 1, None , "aa"),
( 2 , "a", None ),
( 3 , "b", None),
( 4 , "h", None),],
"id int, var1 string, var2 string",
)
df2 = spark.createDataFrame([
( 1, "f" , "Ba"),
( 2 , "a", "bb" ),
( 3 , "b", None),],
"id int, var1 string, var2 string",
)
df1 = df1.alias("df1")
df2 = df2.alias("df2")
df3 = df1.join(df2, df1.id == df2.id, how='left').withColumn("var1_", coalesce("df1.var1", "df2.var1")).drop("var1").withColumnRenamed("var1_", "var1").withColumn("var2_", coalesce("df1.var2", "df2.var2")).drop("var2").withColumnRenamed("var2_", "var2")
我们可以通过将列作为列表传递给连接方法而不是编写连接条件来避免重复列,请参考此link。
但是这里有一些公共列不需要连接条件。我们可以使用 for 循环来概括您的代码。
spark = SparkSession.builder.master("local[*]").getOrCreate()
df1 = spark.createDataFrame([
( 1, None , "aa"),
( 2 , "a", None ),
( 3 , "b", None),
( 4 , "h", None),],
"id int, var1 string, var2 string",
)
df2 = spark.createDataFrame([
( 1, "f" , "Ba"),
( 2 , "a", "bb" ),
( 3 , "b", None),],
"id int, var1 string, var2 string",
)
df1 = df1.alias("df1")
df2 = df2.alias("df2")
key_columns = ["id"]
# Get common columns between 2 dataframes excluding columns-
# -which are being used in joining conditions
other_common_columns = set(df1.columns).intersection(set(df2.columns))\
.difference(set(key_columns))
outputDF = df1.join(df2, key_columns, how='left')
for i in other_common_columns:
outputDF = outputDF.withColumn(f"{i}_", coalesce(f"df1.{i}", f"df2.{i}"))\
.drop(i).withColumnRenamed(f"{i}_", i)
outputDF.show()
+---+----+----+
| id|var2|var1|
+---+----+----+
| 1| aa| f|
| 3|null| b|
| 4|null| h|
| 2| bb| a|
+---+----+----+
我必须合并许多 spark DataFrame。合并后,我想在具有相同名称的多个列之间执行合并。
我能够按照这个
但是,我需要一段更通用的代码来支持:一组要合并的变量(在示例 set_vars = set(('var1','var2'))
中)和多个连接键(在示例 join_keys = set(('id'))
中)。
在 pyspark
中是否有更简洁(更通用)的方法来获得此结果?
df1 = spark.createDataFrame([
( 1, None , "aa"),
( 2 , "a", None ),
( 3 , "b", None),
( 4 , "h", None),],
"id int, var1 string, var2 string",
)
df2 = spark.createDataFrame([
( 1, "f" , "Ba"),
( 2 , "a", "bb" ),
( 3 , "b", None),],
"id int, var1 string, var2 string",
)
df1 = df1.alias("df1")
df2 = df2.alias("df2")
df3 = df1.join(df2, df1.id == df2.id, how='left').withColumn("var1_", coalesce("df1.var1", "df2.var1")).drop("var1").withColumnRenamed("var1_", "var1").withColumn("var2_", coalesce("df1.var2", "df2.var2")).drop("var2").withColumnRenamed("var2_", "var2")
我们可以通过将列作为列表传递给连接方法而不是编写连接条件来避免重复列,请参考此link。 但是这里有一些公共列不需要连接条件。我们可以使用 for 循环来概括您的代码。
spark = SparkSession.builder.master("local[*]").getOrCreate()
df1 = spark.createDataFrame([
( 1, None , "aa"),
( 2 , "a", None ),
( 3 , "b", None),
( 4 , "h", None),],
"id int, var1 string, var2 string",
)
df2 = spark.createDataFrame([
( 1, "f" , "Ba"),
( 2 , "a", "bb" ),
( 3 , "b", None),],
"id int, var1 string, var2 string",
)
df1 = df1.alias("df1")
df2 = df2.alias("df2")
key_columns = ["id"]
# Get common columns between 2 dataframes excluding columns-
# -which are being used in joining conditions
other_common_columns = set(df1.columns).intersection(set(df2.columns))\
.difference(set(key_columns))
outputDF = df1.join(df2, key_columns, how='left')
for i in other_common_columns:
outputDF = outputDF.withColumn(f"{i}_", coalesce(f"df1.{i}", f"df2.{i}"))\
.drop(i).withColumnRenamed(f"{i}_", i)
outputDF.show()
+---+----+----+
| id|var2|var1|
+---+----+----+
| 1| aa| f|
| 3|null| b|
| 4|null| h|
| 2| bb| a|
+---+----+----+