Pyspark - 2 个数据帧之间的区别 - 识别插入、更新和删除

Pyspark - Difference between 2 dataframes - Identify inserts, updates and deletes

我有 2 个数据帧 df1(旧)和 df2(新)。我正在尝试将 df2 与 df1 进行比较,并找到新添加的行、删除的行、更新的行以及更新的列的名称。

这是我写的代码

from pyspark.sql.functions import col, array, when, array_remove, lit

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
  schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']], 
                array_remove(array(*conditions_), "").alias("updated_columns")
]

df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()

这是我得到的输出

+---------+----------+--------+-----+------+------+---------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|
+---------+----------+--------+-----+------+------+---------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|
+---------+----------+--------+-----+------+------+---------------+

这是我期待的输出

+---------+----------+--------+-----+------+------+---------------+-----------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|           status|
+---------+----------+--------+-----+------+------+---------------+-----------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|        unchanged|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|          updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|          deleted|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|          updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|            added|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|        unchanged|
+---------+----------+--------+-----+------+------+---------------+-----------------+

我知道我可以分别使用左反连接找到添加和删除的行。但是,我正在寻找更新现有连接以获得上述输出的方法。

我建议使用 Ranger,这样您就可以捕捉实际发生变化的内容和时间。但是如果你只有这些数据框...... 你想做一个“外部”连接。 (将两个表中的所有数据拉入一个连接。)您已经有了更新列逻辑。

对于状态:“已删除”(在 df1 但不是 df2)和“添加”(在 df2,但不在 df1),(如果有更新列)-->“更新”,否则“未更改”。

outer join 会对您的情况有所帮助。我已经修改了您提供的用于执行此操作的代码,并且还包含了状态列。

最低工作示例

from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
schema = StructType([
    StructField("firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

status 列的逻辑并将 select_expr 修改为 coalescedf2df1 并优先考虑 df2 以获得最新数据更新。

status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],                
                array_remove(array(*conditions_), "").alias("updated_columns"),
                status.alias("status"),
]

最后,应用 outer join

df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()

输出

+---------+----------+--------+-----+------+------+---------------+---------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|   status|
+---------+----------+--------+-----+------+------+---------------+---------+
|    James|       rob|   Smith|36636|     M|  3000|             []|unchanged|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|unchanged|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|  updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|  deleted|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|  updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|    added|
+---------+----------+--------+-----+------+------+---------------+---------+