检查两个 pyspark 数据帧之间是否存在重复的列值元组

Check for existence of duplicate column value tuples between two pyspark dataframes

我有两个数据框,我想使用多列将一个数据框与另一个数据框进行比较,这样如果一个数据框中的列值元组存在于另一个数据框中,则在第一个数据框中放置一个指标(例如, tupleExistsInDf2 = True)。示例代码:

import pandas as pd
from datetime import date
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df1 = pd.DataFrame({
    "pk": [1, 1, 1, 1, 2, 2, 2, 2, 3, 4],
    "date": [
        date("2022-05-06"),
        date("2022-05-13"),
        date("2022-05-06"),
        date("2022-05-06"),
        date("2022-05-14"),
        date("2022-05-15"),
        date("2022-05-05"),
        date("2022-05-05"),
        date("2022-05-11"),
        date("2022-05-12")
    ],
    "variable": [A, B, C, D, A, A, E, F, A, G]
})

df1 = spark.createDataFrame(df1)

df2 = pd.DataFrame({
    "pk": [1, 1, 2, 2, 2, 3, 4, 5, 6, 6],
    "date": [
        date("2022-05-06"),
        date("2022-05-13"),
        date("2022-05-14"),
        date("2022-05-15"),
        date("2022-05-05"),
        date("2022-05-11"),
        date("2022-05-08"), 
        date("2022-05-03"),
        date("2022-05-07"),
        date("2022-05-08")
    ],
    "variable": [A, B, A, A, E, A, A, H, A, A]

})

df2 = spark.createDataFrame(df2)

这会产生两个 pyspark 数据帧:

df1.show()

#+-----+-----------+--------+
#|pk   |       date|variable|
#+-----+-----------+--------+
#|    1| 2022-05-06|       A|
#|    1| 2022-05-13|       B|
#|    1| 2022-05-06|       C|
#|    1| 2022-05-06|       D|
#|    2| 2022-05-14|       A|
#|    2| 2022-05-15|       A|
#|    2| 2022-05-05|       E|
#|    2| 2022-05-05|       F|
#|    3| 2022-05-11|       A|
#|    4| 2022-05-12|       G|
#+-----+-----------+--------+

df2.show()

#+-----+-----------+--------+
#|pk   |       date|variable|
#+-----+-----------+--------+
#|    1| 2022-05-06|       A|
#|    1| 2022-05-13|       B|
#|    2| 2022-05-14|       A|
#|    2| 2022-05-15|       A|
#|    2| 2022-05-05|       E|
#|    3| 2022-05-11|       A|
#|    4| 2022-05-08|       A|
#|    5| 2022-05-03|       H|
#|    6| 2022-05-07|       A|
#|    6| 2022-05-08|       A|
#+-----+-----------+--------+

我想做的是在df1中指出每个元组(pk, date, variable)是否存在于df2中。此示例的结果为:

#+-----+-----------+--------+----------------+
#|pk   |       date|variable|tupleExistsInDf2|
#+-----+-----------+--------+----------------+
#|    1| 2022-05-06|       A|            True|
#|    1| 2022-05-13|       B|            True|
#|    1| 2022-05-06|       C|           False|
#|    1| 2022-05-06|       D|           False|
#|    2| 2022-05-14|       A|            True|
#|    2| 2022-05-15|       A|           False|
#|    2| 2022-05-05|       E|            True|
#|    2| 2022-05-05|       F|           False|
#|    3| 2022-05-11|       A|            True|
#|    4| 2022-05-12|       G|           False|
#+-----+-----------+--------+----------------+

使用intersection大概就够了,因为它比False/True的一列有用。但如果您需要它,这里是您所要求的 sudo 代码:

  1. 走十字路口
  2. 创建加入条件
  3. Left outer Join 和 select 只有需要的列
    1. 使用 Coalesce 填充空值。 naFill 也可以。

.

import pyspark.sql.functions as f
intersection = df1.intersect(df2).withColumn("tupleExistsInDf2", f.lit(True) ) # collect columns that are in both tables.
cond = [df1.pk==intersection.pk, df1.date==intersection.date, df1.variable==intersection.variable] # create join condition
df1.join( intersection , cond,  "leftouter")\
  .select( df1.pk,df1.date,df1.variable, f.coalesce(intersection.tupleExistsInDf2, f.lit(False) ) )