Pyspark 在广播加入后删除重复的列
Pyspark removing duplicate columns after broadcast join
我有两个数据框,我想加入它们然后保存为镶木地板 table。执行连接后,我的结果 table 有重复的列,阻止我保存数据集。
这是我的加入代码
join_conditions = [
df1.colX == df2.colY,
df1.col1 == df2.col1,
df1.col2 == df2.col2,
df1.col3 == df2.col3,
]
dfj= df1.alias("1").join(F.broadcast(df2.alias("2")), join_conditions, "inner"
).drop("1.col1", "1.col2", "1.col3")
dfj.write.format("parquet").mode("overwrite").saveAsTable("table")
我原以为删除会删除重复的列,但是当我尝试保存 table 时抛出一个异常,说它们仍然存在。如果列不存在,drop() 不会抛出异常,这意味着别名可能是错误的/没有像我预期的那样工作?
我不能将连接条件作为字符串列表来执行,因为当并非连接条件中的所有列在每个 DataFrame 上的调用都相同时,这似乎会导致错误:
join_conditions = [
df1.colX == df2.colY,
"col1",
"col2",
"col3"
]
例如不起作用。
此联接有效,但仍会产生重复的列
join_conditions = [
df1.X == df2.colY,
F.col("1.col1") == F.col("2.col1"),
F.col("1.col2") == F.col("2.col2"),
F.col("1.col3") == F.col("2.col3"),
]
也没有用。所有这些方法仍然会导致连接的数据框具有重复的列 col1、col2 和 col3。我做错了什么/没有正确理解?使用 pyspark 示例代码的答案将不胜感激。
我不确定为什么它不起作用,它真的很奇怪。
这不是很漂亮,但它有效
from pyspark.sql import functions as F
data = [{'colX': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
data2 = [{'colY': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
df1 = spark.createDataFrame(data)
df2 = spark.createDataFrame(data2)
join_cond = [df1.colX==df2.colY,
df1.col1==df2.col1,
df1.col2==df2.col2,
df1.col3==df2.col3]
df1.join(F.broadcast(df2), join_cond, 'inner').drop(df1.col1).drop(df1.col2).drop(df1.col3).printSchema()
root
|-- colX: string (nullable = true)
|-- col1: long (nullable = true)
|-- col2: long (nullable = true)
|-- col3: long (nullable = true)
|-- colY: string (nullable = true)
我有两个数据框,我想加入它们然后保存为镶木地板 table。执行连接后,我的结果 table 有重复的列,阻止我保存数据集。
这是我的加入代码
join_conditions = [
df1.colX == df2.colY,
df1.col1 == df2.col1,
df1.col2 == df2.col2,
df1.col3 == df2.col3,
]
dfj= df1.alias("1").join(F.broadcast(df2.alias("2")), join_conditions, "inner"
).drop("1.col1", "1.col2", "1.col3")
dfj.write.format("parquet").mode("overwrite").saveAsTable("table")
我原以为删除会删除重复的列,但是当我尝试保存 table 时抛出一个异常,说它们仍然存在。如果列不存在,drop() 不会抛出异常,这意味着别名可能是错误的/没有像我预期的那样工作?
我不能将连接条件作为字符串列表来执行,因为当并非连接条件中的所有列在每个 DataFrame 上的调用都相同时,这似乎会导致错误:
join_conditions = [
df1.colX == df2.colY,
"col1",
"col2",
"col3"
]
例如不起作用。
此联接有效,但仍会产生重复的列
join_conditions = [
df1.X == df2.colY,
F.col("1.col1") == F.col("2.col1"),
F.col("1.col2") == F.col("2.col2"),
F.col("1.col3") == F.col("2.col3"),
]
也没有用。所有这些方法仍然会导致连接的数据框具有重复的列 col1、col2 和 col3。我做错了什么/没有正确理解?使用 pyspark 示例代码的答案将不胜感激。
我不确定为什么它不起作用,它真的很奇怪。
这不是很漂亮,但它有效
from pyspark.sql import functions as F
data = [{'colX': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
data2 = [{'colY': "hello", 'col1': 1, 'col2': 2, 'col3': 3}]
df1 = spark.createDataFrame(data)
df2 = spark.createDataFrame(data2)
join_cond = [df1.colX==df2.colY,
df1.col1==df2.col1,
df1.col2==df2.col2,
df1.col3==df2.col3]
df1.join(F.broadcast(df2), join_cond, 'inner').drop(df1.col1).drop(df1.col2).drop(df1.col3).printSchema()
root
|-- colX: string (nullable = true)
|-- col1: long (nullable = true)
|-- col2: long (nullable = true)
|-- col3: long (nullable = true)
|-- colY: string (nullable = true)