Pyspark 如何从两个数据框中识别不匹配的行值
Pyspark how can identify unmatched row value from two data frame
我有以下两个数据框,我试图从中识别数据框二中不匹配的行值。这是迁移的一部分,我想在源数据 migrated/moved 到不同目的地后看到差异。
source_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 1.1| 1.2|
|def| 3.0| 3.4|
+---+-----+-----+
dest_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 2.1| 2.2|
|def| 3.0| 3.4|
+---+-----+-----+
我想看到类似下面的输出
key: abc,
col: val11 val12
difference: [src-1.1,dst:2.1] [src:1.2,dst:2.2]
有什么解决办法吗?
source_df = spark.createDataFrame(
[
('abc','1.1','1.2'),
('def','3.0','3.4'),
], ['key','val11','val12']
)
dest_df = spark.createDataFrame(
[
('abc','2.1','2.2'),
('def','3.0','3.4'),
], ['key','val11','val12']
)
report = source_df\
.join(dest_df, 'key', 'full')\
.filter((source_df.val11 != dest_df.val11) | (source_df.val12 != dest_df.val12))\
.withColumn('difference_val11', F.concat(F.lit('[src:'), source_df.val11, F.lit(',dst:'),dest_df.val11,F.lit(']')))\
.withColumn('difference_val12', F.concat(F.lit('[src:'), source_df.val12, F.lit(',dst:'),dest_df.val12,F.lit(']')))\
.select('key', 'difference_val11', 'difference_val12')
report.show()
+---+-----------------+-----------------+
|key| difference_val11| difference_val12|
+---+-----------------+-----------------+
|abc|[src:1.1,dst:2.1]|[src:1.1,dst:2.1]|
+---+-----------------+-----------------+
或者,如果您想要完全采用那种格式:
for x in report.select('key', 'difference_val11', 'difference_val12').collect():
print("key: " + str(x[0]) + ",\n\n" +\
"col: val11 val12\n\n" +\
"difference: " + str(x[1]) + " " + str(x[2]))
输出:
key: abc,
col: val11 val12
difference: [src:1.1,dst:2.1] [src:1.2,dst:2.2]
我有以下两个数据框,我试图从中识别数据框二中不匹配的行值。这是迁移的一部分,我想在源数据 migrated/moved 到不同目的地后看到差异。
source_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 1.1| 1.2|
|def| 3.0| 3.4|
+---+-----+-----+
dest_df
+---+-----+-----+
|key|val11|val12|
+---+-----+-----+
|abc| 2.1| 2.2|
|def| 3.0| 3.4|
+---+-----+-----+
我想看到类似下面的输出
key: abc,
col: val11 val12
difference: [src-1.1,dst:2.1] [src:1.2,dst:2.2]
有什么解决办法吗?
source_df = spark.createDataFrame(
[
('abc','1.1','1.2'),
('def','3.0','3.4'),
], ['key','val11','val12']
)
dest_df = spark.createDataFrame(
[
('abc','2.1','2.2'),
('def','3.0','3.4'),
], ['key','val11','val12']
)
report = source_df\
.join(dest_df, 'key', 'full')\
.filter((source_df.val11 != dest_df.val11) | (source_df.val12 != dest_df.val12))\
.withColumn('difference_val11', F.concat(F.lit('[src:'), source_df.val11, F.lit(',dst:'),dest_df.val11,F.lit(']')))\
.withColumn('difference_val12', F.concat(F.lit('[src:'), source_df.val12, F.lit(',dst:'),dest_df.val12,F.lit(']')))\
.select('key', 'difference_val11', 'difference_val12')
report.show()
+---+-----------------+-----------------+
|key| difference_val11| difference_val12|
+---+-----------------+-----------------+
|abc|[src:1.1,dst:2.1]|[src:1.1,dst:2.1]|
+---+-----------------+-----------------+
或者,如果您想要完全采用那种格式:
for x in report.select('key', 'difference_val11', 'difference_val12').collect():
print("key: " + str(x[0]) + ",\n\n" +\
"col: val11 val12\n\n" +\
"difference: " + str(x[1]) + " " + str(x[2]))
输出:
key: abc,
col: val11 val12
difference: [src:1.1,dst:2.1] [src:1.2,dst:2.2]