如何比较pyspark中数据帧之间的差异
How to compare differences between dataframes in pyspark
我有两个基本相同的数据帧,但来自两个不同的来源。在我的第一个数据框中,我有 p_user_id
和 date_of_birth
字段,它们是 longType
,一个是 dateType
,其余字段是 stringType
。在我的第二个数据框中,所有内容都是 stringType
。我首先根据 p_user_id
(这是我的唯一标识符)检查两个数据帧的行数。
DF1:
+--------------+
|test1_racounts|
+--------------+
| 418895|
+--------------+
DF2:
+---------+
|d_tst_rac|
+---------+
| 418915|
+---------+
然后,如果行数不同,我 运行 检查哪些 p_user_id
值在一个数据帧中,而不在另一个数据帧中。
p_user_tst_rac.subtract(rac_p_user_df).show(100, truncate=0)
给我这个结果:
+---------+
|p_user_id|
+---------+
|661520 |
|661513 |
|661505 |
|661461 |
|661501 |
|661476 |
|661478 |
|661468 |
|661479 |
|661464 |
|661467 |
|661474 |
|661484 |
|661495 |
|661499 |
|661486 |
|661502 |
|661506 |
|661517 |
+---------+
当我试图提取其余相应字段的差异时,我的问题就出现了。我想拉出其余字段,以便我可以在数据库和应用程序中进行手动搜索,看看是否有遗漏的地方。当我添加其余列时,我的结果会因差异而超过 20 行。 运行 匹配并获取相应数据的更好方法是什么:
完整代码范围:
#racs in mysql
my_rac = spark.read.parquet("/Users/mysql.parquet")
my_rac.printSchema()
my_rac.createOrReplaceTempView('my_rac')
d_rac = spark.sql('''select distinct * from my_rac''')
d_rac.createOrReplaceTempView('d_rac')
spark.sql('''select count(*) as test1_racounts_ from d_rac''').show()
rac_p_user_df = spark.sql('''select
cast(p_user_id as string) as p_user_id
, record_id
, contact_last_name
, contact_first_name
from d_rac''')
#mssql_rac
sql_rac = spark.read.csv("/Users/mzn293/Downloads/kavi-20211116.csv")
#sql_rac.printSchema()
hav_rac.createOrReplaceTempView('sql_rac')
d_sql_rac = spark.sql('''select distinct
_c0 as p_user_id
, _c1 as record_id
, _c4 as contact_last_name
, _c5 as contact_first_name
from sql_rac''')
d_sql_rac.createOrReplaceTempView('d_sql_rac')
spark.sql('''select count(*) as d_aws_rac from d_sql_rac''').show()
dist_aws_rac = spark.sql('''select * from d_aws_rac''')
dist_sql_rac.subtract(rac_p_user_df).show(100, truncate=0)
有了这个,我得到了超过 20 个计数的差异。此外,我觉得有更好的方法来获得我的结果。但是我不确定我缺少什么来获取这 20 行的数据而不是 100 多行。
在这种情况下,最简单的方法是使用反连接。
df_diff = df1.join(df2, df1.p_user_id == df2.p_user_id, "leftanti")
这将为您提供 df1
中存在的所有记录行,但在 df2
中没有匹配记录。
我有两个基本相同的数据帧,但来自两个不同的来源。在我的第一个数据框中,我有 p_user_id
和 date_of_birth
字段,它们是 longType
,一个是 dateType
,其余字段是 stringType
。在我的第二个数据框中,所有内容都是 stringType
。我首先根据 p_user_id
(这是我的唯一标识符)检查两个数据帧的行数。
DF1:
+--------------+
|test1_racounts|
+--------------+
| 418895|
+--------------+
DF2:
+---------+
|d_tst_rac|
+---------+
| 418915|
+---------+
然后,如果行数不同,我 运行 检查哪些 p_user_id
值在一个数据帧中,而不在另一个数据帧中。
p_user_tst_rac.subtract(rac_p_user_df).show(100, truncate=0)
给我这个结果:
+---------+
|p_user_id|
+---------+
|661520 |
|661513 |
|661505 |
|661461 |
|661501 |
|661476 |
|661478 |
|661468 |
|661479 |
|661464 |
|661467 |
|661474 |
|661484 |
|661495 |
|661499 |
|661486 |
|661502 |
|661506 |
|661517 |
+---------+
当我试图提取其余相应字段的差异时,我的问题就出现了。我想拉出其余字段,以便我可以在数据库和应用程序中进行手动搜索,看看是否有遗漏的地方。当我添加其余列时,我的结果会因差异而超过 20 行。 运行 匹配并获取相应数据的更好方法是什么:
完整代码范围:
#racs in mysql
my_rac = spark.read.parquet("/Users/mysql.parquet")
my_rac.printSchema()
my_rac.createOrReplaceTempView('my_rac')
d_rac = spark.sql('''select distinct * from my_rac''')
d_rac.createOrReplaceTempView('d_rac')
spark.sql('''select count(*) as test1_racounts_ from d_rac''').show()
rac_p_user_df = spark.sql('''select
cast(p_user_id as string) as p_user_id
, record_id
, contact_last_name
, contact_first_name
from d_rac''')
#mssql_rac
sql_rac = spark.read.csv("/Users/mzn293/Downloads/kavi-20211116.csv")
#sql_rac.printSchema()
hav_rac.createOrReplaceTempView('sql_rac')
d_sql_rac = spark.sql('''select distinct
_c0 as p_user_id
, _c1 as record_id
, _c4 as contact_last_name
, _c5 as contact_first_name
from sql_rac''')
d_sql_rac.createOrReplaceTempView('d_sql_rac')
spark.sql('''select count(*) as d_aws_rac from d_sql_rac''').show()
dist_aws_rac = spark.sql('''select * from d_aws_rac''')
dist_sql_rac.subtract(rac_p_user_df).show(100, truncate=0)
有了这个,我得到了超过 20 个计数的差异。此外,我觉得有更好的方法来获得我的结果。但是我不确定我缺少什么来获取这 20 行的数据而不是 100 多行。
在这种情况下,最简单的方法是使用反连接。
df_diff = df1.join(df2, df1.p_user_id == df2.p_user_id, "leftanti")
这将为您提供 df1
中存在的所有记录行,但在 df2
中没有匹配记录。