Pyspark 如何从两个数据框中识别不匹配的行值

Pyspark how can identify unmatched row value from two data frame

我有以下两个数据框,我试图从中识别数据框二中不匹配的行值。这是迁移的一部分,我想在源数据 migrated/moved 到不同目的地后看到差异。

source_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  1.1|  1.2|
|def|  3.0|  3.4|
+---+-----+-----+

dest_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc|  2.1|  2.2|
|def|  3.0|  3.4|
+---+-----+-----+

我想看到类似下面的输出

key: abc,

col:          val11                  val12

difference:  [src-1.1,dst:2.1]       [src:1.2,dst:2.2]

有什么解决办法吗?

source_df  = spark.createDataFrame(
  [
('abc','1.1','1.2'),
('def','3.0','3.4'),
  ], ['key','val11','val12']
)

dest_df  = spark.createDataFrame(
  [
('abc','2.1','2.2'),
('def','3.0','3.4'),
  ], ['key','val11','val12']
)

report = source_df\
    .join(dest_df, 'key', 'full')\
    .filter((source_df.val11 != dest_df.val11) | (source_df.val12 != dest_df.val12))\
    .withColumn('difference_val11', F.concat(F.lit('[src:'), source_df.val11, F.lit(',dst:'),dest_df.val11,F.lit(']')))\
    .withColumn('difference_val12', F.concat(F.lit('[src:'), source_df.val12, F.lit(',dst:'),dest_df.val12,F.lit(']')))\
    .select('key', 'difference_val11', 'difference_val12')

report.show()

+---+-----------------+-----------------+
|key| difference_val11| difference_val12|
+---+-----------------+-----------------+
|abc|[src:1.1,dst:2.1]|[src:1.1,dst:2.1]|
+---+-----------------+-----------------+

或者,如果您想要完全采用那种格式:

for x in report.select('key', 'difference_val11', 'difference_val12').collect():
    print("key: " + str(x[0]) + ",\n\n" +\
          "col:          val11                 val12\n\n" +\
         "difference:   " + str(x[1]) + "     " + str(x[2]))

输出:

key: abc,

col:          val11                 val12

difference:   [src:1.1,dst:2.1]     [src:1.2,dst:2.2]