比较两个 DataFrame 并检查变化
Compare two DataFrames and check for changes
我有 2 个类似的 Spark Dataframes df1
和 df2
,我想比较它们的变化:
df1
和 df2
共享相同的列
df2
的行数可以多于 df1
,但是在比较 时可以忽略 df2
中不在 df1
中的任何其他行
- 比较键列是
PROGRAM_NAME
和 ACTION
df1 = spark.createDataFrame([
["PROG1","ACTION1","10","NEW"],
["PROG2","ACTION2","12","NEW"],
["PROG3","ACTION1","14","NEW"],
["PROG4","ACTION4","16","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])
df2 = spark.createDataFrame([
["PROG1","ACTION1","11","IN PROGRESS"],
["PROG2","ACTION2","12","NEW"],
["PROG3","ACTION1","20","FINISHED"],
["PROG4","ACTION4","14","IN PROGRESS"],
["PROG5","ACTION1","20","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])
下面按顺序显示 df1
、df2
以及比较 2 个数据帧后我想要的预期结果。
这是 Spark 中的解决方案:
import pyspark.sql.types as T
import pyspark.sql.functions as F
df1 = spark.createDataFrame(
[
('PROG1', 'ACTION1', 10, 'NEW'),
('PROG2', 'ACTION2', 12, 'NEW'),
('PROG3', 'ACTION1', 14, 'NEW'),
('PROG4', 'ACTION4', 16, 'NEW'),
],
['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)
df2 = spark.createDataFrame(
[
('PROG1', 'ACTION1', 11, 'IN PROGRESS'),
('PROG2', 'ACTION2', 12, 'NEW'),
('PROG3', 'ACTION1', 20, 'FINISHED'),
('PROG4', 'ACTION4', 14, 'IN PROGRESS'),
('PROG5', 'ACTION1', 20, 'NEW'),
],
['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)
df1 = df1.alias('df1')
df2 = df2.alias('df2')
df = df1.join(df2, on=['PROGRAM_NAME', 'ACTION'], how='inner')
df = df.filter(F.col('df1.Status') != F.col('df2.Status'))
df.select(
F.col('PROGRAM_NAME'),
F.col('ACTION'),
*[F.col(f'df2.{col}') for col in df2.columns[2:]]
)
你可以得到这样的结果:
import pandas as pd
dict1 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4"],
"ACTION":["ACTION1","ACTION2","ACTION1","ACTION4"],
"Value1":[10,12,14,16],
"Status":["NEW","NEW","NEW","NEW"]}
dict2 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4","PROG5"],
"ACTION":["ACTION1","ACTION2","ACTION1","ACTION4","ACTION1"],
"Value1":[11,12,20,14,20],
"Status":["IN PROGRES","NEW","FINISHED","IN PROGRES","NEW"]}
DF1 = pd.DataFrame(dict1)
DF2 = pd.DataFrame(dict2)
DF3 = DF2.copy()
DF3 = DF3[DF3["PROGRAM_NAME"].isin(DF1["PROGRAM_NAME"])]
输出:
您可以合并 df1 和 df2 并仅保留 df2
的 VALUE1
和 STATUS
df1
df2
将 df1 的列后缀保留为 _x
,将 df2 的列保留为空白,然后仅保留 df2
的列
df1.merge(df2, on=['PROGRAM_NAME', 'ACTION'], suffixes=('_x', ''))[df2.columns]
类似的问题已在 SO 中被多次问到。
使用简单的 join
来获取 df1
和 df2
中的行,并筛选出其他 2 列具有不同值的行:
from pyspark.sql.functions import col
df_final = df2.alias("new").join(
df1.alias("old"),
(col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
).filter(
(col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
).select("new.*")
df_final.show()
#+------------+-------+------+-----------+
#|PROGRAM_NAME| ACTION|VALUE1| STATUS|
#+------------+-------+------+-----------+
#| PROG3|ACTION1| 20| FINISHED|
#| PROG4|ACTION4| 14|IN PROGRESS|
#| PROG1|ACTION1| 11|IN PROGRESS|
#+------------+-------+------+-----------+
也可以直接在join条件中加入过滤条件
我有 2 个类似的 Spark Dataframes df1
和 df2
,我想比较它们的变化:
df1
和df2
共享相同的列df2
的行数可以多于df1
,但是在比较 时可以忽略 - 比较键列是
PROGRAM_NAME
和ACTION
df2
中不在 df1
中的任何其他行
df1 = spark.createDataFrame([
["PROG1","ACTION1","10","NEW"],
["PROG2","ACTION2","12","NEW"],
["PROG3","ACTION1","14","NEW"],
["PROG4","ACTION4","16","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])
df2 = spark.createDataFrame([
["PROG1","ACTION1","11","IN PROGRESS"],
["PROG2","ACTION2","12","NEW"],
["PROG3","ACTION1","20","FINISHED"],
["PROG4","ACTION4","14","IN PROGRESS"],
["PROG5","ACTION1","20","NEW"]
],["PROGRAM_NAME", "ACTION", "VALUE1", "STATUS"])
下面按顺序显示 df1
、df2
以及比较 2 个数据帧后我想要的预期结果。
这是 Spark 中的解决方案:
import pyspark.sql.types as T
import pyspark.sql.functions as F
df1 = spark.createDataFrame(
[
('PROG1', 'ACTION1', 10, 'NEW'),
('PROG2', 'ACTION2', 12, 'NEW'),
('PROG3', 'ACTION1', 14, 'NEW'),
('PROG4', 'ACTION4', 16, 'NEW'),
],
['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)
df2 = spark.createDataFrame(
[
('PROG1', 'ACTION1', 11, 'IN PROGRESS'),
('PROG2', 'ACTION2', 12, 'NEW'),
('PROG3', 'ACTION1', 20, 'FINISHED'),
('PROG4', 'ACTION4', 14, 'IN PROGRESS'),
('PROG5', 'ACTION1', 20, 'NEW'),
],
['PROGRAM_NAME', 'ACTION', 'Value1', 'Status']
)
df1 = df1.alias('df1')
df2 = df2.alias('df2')
df = df1.join(df2, on=['PROGRAM_NAME', 'ACTION'], how='inner')
df = df.filter(F.col('df1.Status') != F.col('df2.Status'))
df.select(
F.col('PROGRAM_NAME'),
F.col('ACTION'),
*[F.col(f'df2.{col}') for col in df2.columns[2:]]
)
你可以得到这样的结果:
import pandas as pd
dict1 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4"],
"ACTION":["ACTION1","ACTION2","ACTION1","ACTION4"],
"Value1":[10,12,14,16],
"Status":["NEW","NEW","NEW","NEW"]}
dict2 = {"PROGRAM_NAME":["PROG1","PROG2","PROG3","PROG4","PROG5"],
"ACTION":["ACTION1","ACTION2","ACTION1","ACTION4","ACTION1"],
"Value1":[11,12,20,14,20],
"Status":["IN PROGRES","NEW","FINISHED","IN PROGRES","NEW"]}
DF1 = pd.DataFrame(dict1)
DF2 = pd.DataFrame(dict2)
DF3 = DF2.copy()
DF3 = DF3[DF3["PROGRAM_NAME"].isin(DF1["PROGRAM_NAME"])]
输出:
您可以合并 df1 和 df2 并仅保留 df2
的VALUE1
和 STATUS
df1
df2
将 df1 的列后缀保留为 _x
,将 df2 的列保留为空白,然后仅保留 df2
df1.merge(df2, on=['PROGRAM_NAME', 'ACTION'], suffixes=('_x', ''))[df2.columns]
类似的问题已在 SO 中被多次问到。
使用简单的 join
来获取 df1
和 df2
中的行,并筛选出其他 2 列具有不同值的行:
from pyspark.sql.functions import col
df_final = df2.alias("new").join(
df1.alias("old"),
(col("new.PROGRAM_NAME") == col("old.PROGRAM_NAME")) & (col("new.ACTION") == col("old.ACTION"))
).filter(
(col("new.VALUE1") != col("old.VALUE1")) | (col("new.STATUS") != col("old.STATUS"))
).select("new.*")
df_final.show()
#+------------+-------+------+-----------+
#|PROGRAM_NAME| ACTION|VALUE1| STATUS|
#+------------+-------+------+-----------+
#| PROG3|ACTION1| 20| FINISHED|
#| PROG4|ACTION4| 14|IN PROGRESS|
#| PROG1|ACTION1| 11|IN PROGRESS|
#+------------+-------+------+-----------+
也可以直接在join条件中加入过滤条件