如何比较pyspark中数据帧之间的差异

How to compare differences between dataframes in pyspark

我有两个基本相同的数据帧,但来自两个不同的来源。在我的第一个数据框中,我有 p_user_iddate_of_birth 字段,它们是 longType,一个是 dateType,其余字段是 stringType。在我的第二个数据框中,所有内容都是 stringType。我首先根据 p_user_id(这是我的唯一标识符)检查两个数据帧的行数。 DF1:

+--------------+                                                                
|test1_racounts|
+--------------+
|        418895|
+--------------+

DF2:

+---------+                                                                     
|d_tst_rac|
+---------+
|   418915|
+---------+

然后,如果行数不同,我 运行 检查哪些 p_user_id 值在一个数据帧中,而不在另一个数据帧中。

p_user_tst_rac.subtract(rac_p_user_df).show(100, truncate=0)

给我这个结果:

+---------+                                                                     
|p_user_id|
+---------+
|661520   |
|661513   |
|661505   |
|661461   |
|661501   |
|661476   |
|661478   |
|661468   |
|661479   |
|661464   |
|661467   |
|661474   |
|661484   |
|661495   |
|661499   |
|661486   |
|661502   |
|661506   |
|661517   |
+---------+

当我试图提取其余相应字段的差异时,我的问题就出现了。我想拉出其余字段,以便我可以在数据库和应用程序中进行手动搜索,看看是否有遗漏的地方。当我添加其余列时,我的结果会因差异而超过 20 行。 运行 匹配并获取相应数据的更好方法是什么:

完整代码范围:

#racs in mysql
my_rac = spark.read.parquet("/Users/mysql.parquet")
my_rac.printSchema()
my_rac.createOrReplaceTempView('my_rac')
d_rac = spark.sql('''select distinct * from my_rac''')
d_rac.createOrReplaceTempView('d_rac')
spark.sql('''select count(*) as test1_racounts_ from d_rac''').show()
rac_p_user_df = spark.sql('''select 
cast(p_user_id as string) as p_user_id
, record_id
, contact_last_name
, contact_first_name
 from d_rac''')

#mssql_rac
sql_rac = spark.read.csv("/Users/mzn293/Downloads/kavi-20211116.csv")
#sql_rac.printSchema()
hav_rac.createOrReplaceTempView('sql_rac')
d_sql_rac = spark.sql('''select distinct 
_c0 as p_user_id
, _c1 as record_id
, _c4 as contact_last_name
, _c5 as contact_first_name
 from sql_rac''')
d_sql_rac.createOrReplaceTempView('d_sql_rac')
spark.sql('''select count(*) as d_aws_rac from d_sql_rac''').show()
dist_aws_rac = spark.sql('''select * from d_aws_rac''')
dist_sql_rac.subtract(rac_p_user_df).show(100, truncate=0)

有了这个,我得到了超过 20 个计数的差异。此外,我觉得有更好的方法来获得我的结果。但是我不确定我缺少什么来获取这 20 行的数据而不是 100 多行。

在这种情况下,最简单的方法是使用反连接。

df_diff = df1.join(df2, df1.p_user_id == df2.p_user_id, "leftanti")

这将为您提供 df1 中存在的所有记录行,但在 df2 中没有匹配记录。