比较两个 DataFrame 并检查变化

Compare two DataFrames and check for changes

我有 2 个类似的 Spark Dataframes df1df2,我想比较它们的变化:

df1 = spark.createDataFrame([
              ["PROG1","ACTION1","10","NEW"],
              ["PROG2","ACTION2","12","NEW"],
              ["PROG3","ACTION1","14","NEW"],
              ["PROG4","ACTION4","16","NEW"]
    ],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])

df2 = spark.createDataFrame([
          ["PROG1","ACTION1","11","IN PROGRESS"],
          ["PROG2","ACTION2","12","NEW"],
          ["PROG3","ACTION1","20","FINISHED"],
          ["PROG4","ACTION4","14","IN PROGRESS"],
          ["PROG5","ACTION1","20","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])

下面按顺序显示 df1df2 以及比较 2 个数据帧后我想要的预期结果。

这是 Spark 中的解决方案:

import pyspark.sql.types as T
import pyspark.sql.functions as F

df1 = spark.createDataFrame(
    [
        ('PROG1', 'ACTION1', 10, 'NEW'),
        ('PROG2', 'ACTION2', 12, 'NEW'),
        ('PROG3', 'ACTION1', 14, 'NEW'),
        ('PROG4', 'ACTION4', 16, 'NEW'),
    ],
    ['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)

df2 = spark.createDataFrame(
    [
        ('PROG1', 'ACTION1', 11, 'IN PROGRESS'),
        ('PROG2', 'ACTION2', 12, 'NEW'),
        ('PROG3', 'ACTION1', 20, 'FINISHED'),
        ('PROG4', 'ACTION4', 14, 'IN PROGRESS'),
        ('PROG5', 'ACTION1', 20, 'NEW'),
    ],
    ['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)

df1 = df1.alias('df1')
df2 = df2.alias('df2')
df = df1.join(df2, on=['PROGRAM_NAME', 'ACTION'], how='inner')
df = df.filter(F.col('df1.Status') != F.col('df2.Status'))
df.select(
  F.col('PROGRAM_NAME'),
  F.col('ACTION'),
  *[F.col(f'df2.{col}') for col in df2.columns[2:]]
)

你可以得到这样的结果:

import pandas as pd

dict1 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4"],
         "ACTION":["ACTION1","ACTION2","ACTION1","ACTION4"],
         "Value1":[10,12,14,16],
         "Status":["NEW","NEW","NEW","NEW"]} 
dict2 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4","PROG5"],
         "ACTION":["ACTION1","ACTION2","ACTION1","ACTION4","ACTION1"],
         "Value1":[11,12,20,14,20],
         "Status":["IN PROGRES","NEW","FINISHED","IN PROGRES","NEW"]}

DF1 = pd.DataFrame(dict1)
DF2 = pd.DataFrame(dict2)

DF3 = DF2.copy()
DF3 = DF3[DF3["PROGRAM_NAME"].isin(DF1["PROGRAM_NAME"])]

输出:

您可以合并 df1 和 df2 并仅保留 df2

VALUE1STATUS

df1

df2

将 df1 的列后缀保留为 _x,将 df2 的列保留为空白,然后仅保留 df2

的列
df1.merge(df2, on=['PROGRAM_NAME', 'ACTION'], suffixes=('_x', ''))[df2.columns] 

类似的问题已在 SO 中被多次问到。

使用简单的 join 来获取 df1df2 中的行,并筛选出其他 2 列具有不同值的行:

from pyspark.sql.functions import col

df_final = df2.alias("new").join(
        df1.alias("old"),
        (col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
    ).filter(
        (col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
    ).select("new.*")

df_final.show()

#+------------+-------+------+-----------+
#|PROGRAM_NAME| ACTION|VALUE1|     STATUS|
#+------------+-------+------+-----------+
#|       PROG3|ACTION1|    20|   FINISHED|
#|       PROG4|ACTION4|    14|IN PROGRESS|
#|       PROG1|ACTION1|    11|IN PROGRESS|
#+------------+-------+------+-----------+

也可以直接在join条件中加入过滤条件