Pyspark 在广播加入后删除重复的列

Pyspark removing duplicate columns after broadcast join

我有两个数据框,我想加入它们然后保存为镶木地板 table。执行连接后,我的结果 table 有重复的列,阻止我保存数据集。

这是我的加入代码

join_conditions = [
        df1.colX == df2.colY,
        df1.col1 == df2.col1,
        df1.col2 == df2.col2,
        df1.col3 == df2.col3,
    ]

dfj= df1.alias("1").join(F.broadcast(df2.alias("2")), join_conditions, "inner"
).drop("1.col1", "1.col2", "1.col3")

dfj.write.format("parquet").mode("overwrite").saveAsTable("table")

我原以为删除会删除重复的列,但是当我尝试保存 table 时抛出一个异常,说它们仍然存在。如果列不存在,drop() 不会抛出异常,这意味着别名可能是错误的/没有像我预期的那样工作?

我不能将连接条件作为字符串列表来执行,因为当并非连接条件中的所有列在每个 DataFrame 上的调用都相同时,这似乎会导致错误:

join_conditions = [
        df1.colX == df2.colY,
        "col1",
        "col2",
        "col3"
    ]

例如不起作用。

此联接有效,但仍会产生重复的列

join_conditions = [
        df1.X == df2.colY,
        F.col("1.col1") == F.col("2.col1"),
        F.col("1.col2") == F.col("2.col2"),
        F.col("1.col3") == F.col("2.col3"),
    ]

也没有用。所有这些方法仍然会导致连接的数据框具有重复的列 col1、col2 和 col3。我做错了什么/没有正确理解?使用 pyspark 示例代码的答案将不胜感激。

我不确定为什么它不起作用,它真的很奇怪。
这不是很漂亮,但它有效


from pyspark.sql import functions as F

data = [{'colX': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
data2 = [{'colY': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
df1 = spark.createDataFrame(data)
df2 = spark.createDataFrame(data2)

join_cond = [df1.colX==df2.colY, 
df1.col1==df2.col1, 
df1.col2==df2.col2, 
df1.col3==df2.col3]

df1.join(F.broadcast(df2), join_cond, 'inner').drop(df1.col1).drop(df1.col2).drop(df1.col3).printSchema()
root
 |-- colX: string (nullable = true)
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)
 |-- colY: string (nullable = true)