比较 2 个数据框并创建一个输出数据框,其中包含包含差异的列的名称及其值

Compare 2 dataframes and create an output dataframe containing the name of the columns that contain differences and their values

使用 Spark 和 Scala 我有 df1 和 df2 如下:

df1
+--------------------+--------+----------------+----------+
|                  ID|colA.   |colB.           |colC       |
+--------------------+--------+----------------+----------+
|                   1|       0|              10|     APPLES|
|                   2|       0|              20|     APPLES|
|.                  3|       0|              30|      PEARS|
+--------------------+--------+----------------+----------+

 df2
+--------------------+--------+----------------+----------+
|                  ID|colA.   |colB            |colC      |
+--------------------+--------+----------------+----------+
|                   1|       0|              10|    APPLES|
|                   2|       0|              20|     PEARS|
|                   3|       0|              10|    APPLES|
+--------------------+--------+----------------+----------+

我需要比较这 2 个数据帧并在包含 4 列的 df3 中提取差异:包含差异的列名称、df1 的值、df2 的值、ID 不使用列名如何实现,我只能使用硬编码的ID。

 +--------------------+--------+----------------+-------------+-----
|    Column Name     |Value from df1.          |Value from df2| ID |
+--------------------+--------+----------------+--------------+-----
|              col B |                       30|            10| 3. |
|              col C |                   APPLES|         PEARS| 2. |
|              col C |                    PEARS|        APPLES| 3. |
+--------------------+--------+----------------+---------------+----+

到目前为止我所做的是提取包含差异的列的名称,但我对如何获取这些值感到困惑。 val 列 = df1.columns val df_join = df1.alias("d1").join(df2.alias("d2"), col("d1.id") === col("d2.id"), “左”)

val test = columns.foldLeft(df_join) {(df_join, name) => df_join.withColumn(name + 
"_temp", when(col("d1." + name) =!= col("d2." + name), lit(name))))}
.withColumn("Col Name", concat_ws(",", columns.map(name => col(name + "_temp")): _*))

你可以这样试试:

// Consider the below dataframes

df1.show()

+---+----+----+------+
| ID|colA|colB|  colC|
+---+----+----+------+
|  1|   0|  10|APPLES|
|  2|   0|  20|APPLES|
|  3|   0|  30| PEARS|
+---+----+----+------+

df2.show()

+---+----+----+------+
| ID|colA|colB|  colC|
+---+----+----+------+
|  1|   0|  10|APPLES|
|  2|   0|  20| PEARS|
|  3|   0|  10|APPLES|
+---+----+----+------+

// As ID column can be hardcoded, we can use it to exclude from the list of all the columns of the dataframe so that we will be left with the remaining columns

val df1_columns = df1.columns.to[ListBuffer].-=("ID")
val df2_columns = df2.columns.to[ListBuffer].-=("ID")

// obtain the number of columns to use it in the stack function later

val df1_columns_count = df1_columns.length
val df2_columns_count = df2_columns.length

// obtain the columns in dynamic way to use in the stack function

var df1_stack_str = ""
var df2_stack_str = ""

// Typecasting columns to string type to avoid conflicts

df1_columns.foreach { column =>
    df1_stack_str += s"'$column',cast($column as string),"
}
df1_stack_str = df1_stack_str.substring(0,df1_stack_str.lastIndexOf(","))

// Typecasting columns to string type to avoid conflicts

df2_columns.foreach { column =>
    df2_stack_str += s"'$column',cast($column as string),"
}
df2_stack_str = df2_stack_str.substring(0,df2_stack_str.lastIndexOf(","))

/*
In this case the stack function implementation would look like this

val df11 = df1.selectExpr("id","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_","stack(3,'colA',cast(colA as string),'colB',cast(colB as string),'colC',cast(colC as string)) as (column_name_,value_from_df2)")
*/

val df11 = df1.selectExpr("id",s"stack($df1_columns_count,$df1_stack_str) as (column_name,value_from_df1)")
val df21 = df2.selectExpr("id id_",s"stack($df2_columns_count,$df2_stack_str) as (column_name_,value_from_df2)")

// use inner join to get value_from_df1 and value_from_df2 in one dataframe and apply the filter

df11.as("df11").join(df21.as("df21"),expr("df11.id=df21.id_ and df11.column_name=df21.column_name_"))
    .drop("id_","column_name_")
    .filter("value_from_df1!=value_from_df2")
    .show
    
// Final output

+---+-----------+--------------+--------------+
| id|column_name|value_from_df1|value_from_df2|
+---+-----------+--------------+--------------+
|  2|       colC|        APPLES|         PEARS|
|  3|       colB|            30|            10|
|  3|       colC|         PEARS|        APPLES|
+---+-----------+--------------+--------------+