检查两个 pyspark 数据帧之间是否存在重复的列值元组
Check for existence of duplicate column value tuples between two pyspark dataframes
我有两个数据框,我想使用多列将一个数据框与另一个数据框进行比较,这样如果一个数据框中的列值元组存在于另一个数据框中,则在第一个数据框中放置一个指标(例如, tupleExistsInDf2 = True
)。示例代码:
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-06"),
date("2022-05-06"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-12")
],
"variable": [A, B, C, D, A, A, E, F, A, G]
})
df1 = spark.createDataFrame(df1)
df2 = pd.DataFrame({
"pk": [1, 1, 2, 2, 2, 3, 4, 5, 6, 6],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-08"),
date("2022-05-03"),
date("2022-05-07"),
date("2022-05-08")
],
"variable": [A, B, A, A, E, A, A, H, A, A]
})
df2 = spark.createDataFrame(df2)
这会产生两个 pyspark 数据帧:
df1.show()
#+-----+-----------+--------+
#|pk | date|variable|
#+-----+-----------+--------+
#| 1| 2022-05-06| A|
#| 1| 2022-05-13| B|
#| 1| 2022-05-06| C|
#| 1| 2022-05-06| D|
#| 2| 2022-05-14| A|
#| 2| 2022-05-15| A|
#| 2| 2022-05-05| E|
#| 2| 2022-05-05| F|
#| 3| 2022-05-11| A|
#| 4| 2022-05-12| G|
#+-----+-----------+--------+
df2.show()
#+-----+-----------+--------+
#|pk | date|variable|
#+-----+-----------+--------+
#| 1| 2022-05-06| A|
#| 1| 2022-05-13| B|
#| 2| 2022-05-14| A|
#| 2| 2022-05-15| A|
#| 2| 2022-05-05| E|
#| 3| 2022-05-11| A|
#| 4| 2022-05-08| A|
#| 5| 2022-05-03| H|
#| 6| 2022-05-07| A|
#| 6| 2022-05-08| A|
#+-----+-----------+--------+
我想做的是在df1
中指出每个元组(pk, date, variable)
是否存在于df2
中。此示例的结果为:
#+-----+-----------+--------+----------------+
#|pk | date|variable|tupleExistsInDf2|
#+-----+-----------+--------+----------------+
#| 1| 2022-05-06| A| True|
#| 1| 2022-05-13| B| True|
#| 1| 2022-05-06| C| False|
#| 1| 2022-05-06| D| False|
#| 2| 2022-05-14| A| True|
#| 2| 2022-05-15| A| False|
#| 2| 2022-05-05| E| True|
#| 2| 2022-05-05| F| False|
#| 3| 2022-05-11| A| True|
#| 4| 2022-05-12| G| False|
#+-----+-----------+--------+----------------+
使用intersection大概就够了,因为它比False/True的一列有用。但如果您需要它,这里是您所要求的 sudo 代码:
- 走十字路口
- 创建加入条件
- Left outer Join 和 select 只有需要的列
- 使用 Coalesce 填充空值。 naFill 也可以。
.
import pyspark.sql.functions as f
intersection = df1.intersect(df2).withColumn("tupleExistsInDf2", f.lit(True) ) # collect columns that are in both tables.
cond = [df1.pk==intersection.pk, df1.date==intersection.date, df1.variable==intersection.variable] # create join condition
df1.join( intersection , cond, "leftouter")\
.select( df1.pk,df1.date,df1.variable, f.coalesce(intersection.tupleExistsInDf2, f.lit(False) ) )
我有两个数据框,我想使用多列将一个数据框与另一个数据框进行比较,这样如果一个数据框中的列值元组存在于另一个数据框中,则在第一个数据框中放置一个指标(例如, tupleExistsInDf2 = True
)。示例代码:
import pandas as pd
from datetime import date
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = pd.DataFrame({
"pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-06"),
date("2022-05-06"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-12")
],
"variable": [A, B, C, D, A, A, E, F, A, G]
})
df1 = spark.createDataFrame(df1)
df2 = pd.DataFrame({
"pk": [1, 1, 2, 2, 2, 3, 4, 5, 6, 6],
"date": [
date("2022-05-06"),
date("2022-05-13"),
date("2022-05-14"),
date("2022-05-15"),
date("2022-05-05"),
date("2022-05-11"),
date("2022-05-08"),
date("2022-05-03"),
date("2022-05-07"),
date("2022-05-08")
],
"variable": [A, B, A, A, E, A, A, H, A, A]
})
df2 = spark.createDataFrame(df2)
这会产生两个 pyspark 数据帧:
df1.show()
#+-----+-----------+--------+
#|pk | date|variable|
#+-----+-----------+--------+
#| 1| 2022-05-06| A|
#| 1| 2022-05-13| B|
#| 1| 2022-05-06| C|
#| 1| 2022-05-06| D|
#| 2| 2022-05-14| A|
#| 2| 2022-05-15| A|
#| 2| 2022-05-05| E|
#| 2| 2022-05-05| F|
#| 3| 2022-05-11| A|
#| 4| 2022-05-12| G|
#+-----+-----------+--------+
df2.show()
#+-----+-----------+--------+
#|pk | date|variable|
#+-----+-----------+--------+
#| 1| 2022-05-06| A|
#| 1| 2022-05-13| B|
#| 2| 2022-05-14| A|
#| 2| 2022-05-15| A|
#| 2| 2022-05-05| E|
#| 3| 2022-05-11| A|
#| 4| 2022-05-08| A|
#| 5| 2022-05-03| H|
#| 6| 2022-05-07| A|
#| 6| 2022-05-08| A|
#+-----+-----------+--------+
我想做的是在df1
中指出每个元组(pk, date, variable)
是否存在于df2
中。此示例的结果为:
#+-----+-----------+--------+----------------+
#|pk | date|variable|tupleExistsInDf2|
#+-----+-----------+--------+----------------+
#| 1| 2022-05-06| A| True|
#| 1| 2022-05-13| B| True|
#| 1| 2022-05-06| C| False|
#| 1| 2022-05-06| D| False|
#| 2| 2022-05-14| A| True|
#| 2| 2022-05-15| A| False|
#| 2| 2022-05-05| E| True|
#| 2| 2022-05-05| F| False|
#| 3| 2022-05-11| A| True|
#| 4| 2022-05-12| G| False|
#+-----+-----------+--------+----------------+
使用intersection大概就够了,因为它比False/True的一列有用。但如果您需要它,这里是您所要求的 sudo 代码:
- 走十字路口
- 创建加入条件
- Left outer Join 和 select 只有需要的列
- 使用 Coalesce 填充空值。 naFill 也可以。
.
import pyspark.sql.functions as f
intersection = df1.intersect(df2).withColumn("tupleExistsInDf2", f.lit(True) ) # collect columns that are in both tables.
cond = [df1.pk==intersection.pk, df1.date==intersection.date, df1.variable==intersection.variable] # create join condition
df1.join( intersection , cond, "leftouter")\
.select( df1.pk,df1.date,df1.variable, f.coalesce(intersection.tupleExistsInDf2, f.lit(False) ) )