迭代数据框中的列

Iterating on columns in dataframe

我有以下数据框 df1

+----------+----+----+----+-----+
|      WEEK|DIM1|DIM2|  T1|   T2|
+----------+----+----+----+-----+
|2016-04-02|  14|NULL|9874|  880|
|2016-04-30|  14|  FR|9875|   13|
|2017-06-10|  15| PQR|9867|57721|
+----------+----+----+----+-----+

df2

+----------+----+----+----+-----+
|      WEEK|DIM1|DIM2|  T1|   T2|
+----------+----+----+----+-----+
|2016-04-02|  14|NULL|9879|  820|
|2016-04-30|  14|  FR|9785|    9|
|2017-06-10|  15| XYZ|9967|57771|
+----------+----+----+----+-----+

我需要生成如下输出 -

+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+
|      WEEK|DIM1|DIM2|  T1|   T2|  T1|   T2|t1_diff|t2_diff|pr_primary|pr_reference|
+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+
|2016-04-02|  14|NULL|9874|  880|9879|  820|     -5|     60|         Y|           Y|
|2017-06-10|  15| PQR|9867|57721|null| null|   null|   null|         Y|           N|
|2017-06-10|  15| XYZ|null| null|9967|57771|   null|   null|         N|           Y|
|2016-04-30|  14|  FR|9875|   13|9785|    9|     90|      4|         Y|           Y|
+----------+----+----+----+-----+----+-----+-------+-------+----------+------------+

这里,t1_diff 是左 T1 和右 T1 之间的差异,t2_diff 是左 T2 和右 T2 之间的差异,pr_primary 如果 df1 中存在行而不是行,则为 Y在 df2 中,对于 pr_reference 也是如此。 我用下面的代码

生成了上面的内容
val df1 = Seq(
  ("2016-04-02", "14", "NULL", 9874, 880), ("2016-04-30", "14", "FR", 9875, 13), ("2017-06-10", "15", "PQR", 9867, 57721)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

val df2 = Seq(
  ("2016-04-02", "14", "NULL", 9879, 820), ("2016-04-30", "14", "FR", 9785, 9), ("2017-06-10", "15", "XYZ", 9967, 57771)
).toDF("WEEK", "DIM1", "DIM2","T1","T2")

import org.apache.spark.sql.functions._

val joined = df1.as("l").join(df2.as("r"), Seq("WEEK", "DIM1", "DIM2"), "fullouter")

val j1 = joined.withColumn("t1_diff",col(s"l.T1") - col(s"r.T1")).withColumn("t2_diff",col(s"l.T2") - col(s"r.T2"))
val isPresentSubstitution = udf( (x: String, y: String) => if (x == null && y == null) "N" else "Y")
j1.withColumn("pr_primary",isPresentSubstitution(col(s"l.T1"), col(s"l.T2"))).withColumn("pr_reference",isPresentSubstitution(col(s"r.T1"), col(s"r.T2"))).show

我想让它泛化到任意数量的列,而不仅仅是 T1 和 T2。有人可以建议我更好的方法吗?我是运行这个火花。

您可以通过向每个数据帧添加序列号,然后根据序列号连接这两个数据帧来实现。

    val df3 = df1.withColumn("SeqNum", monotonicallyIncreasingId)
    val df4 = df2.withColumn("SeqNum", monotonicallyIncreasingId)

    df3.as("l").join(df4.as("r"),"SeqNum").withColumn("t1_diff",col("l.T1") - col("r.T1")).withColumn("t2_diff",col("l.T2") - col("r.T2")).drop("SeqNum").show()

为了能够设置任意数量的列,如 t1_diff 和任何表达式计算它们的值,我们需要进行一些重构,允许以更通用的方式使用 withColumn

首先,我们需要收集目标值:目标列的名称和计算其内容的表达式。这可以通过一系列元组来完成:

val diffColumns = Seq(
  ("t1_diff", col("l.T1") - col("r.T1")),
  ("t2_diff", col("l.T2") - col("r.T2"))
)
// or, to make it more readable, create a dedicated "case class DiffColumn(colName: String, expression: Column)"

现在我们可以使用折叠从 joined 和上面的序列生成连接的 DataFrame:

val joinedWithDiffCols = 
  diffColumns.foldLeft(joined) { case(df, diffTuple) =>
    df.withColumn(diffTuple._1, diffTuple._2)
  }

joinedWithDiffCols 包含与问题中 j1 相同的数据。

要追加新列,您现在只需修改 diffColumns 序列。您甚至可以将 pr_primarypr_reference 的计算放在这个序列中(但在这种情况下,将 ref 重命名为 appendedColumns,更准确地说)。

更新

为了方便创建diffCollumns的元组,也可以泛化,例如:

// when both column names are same:
def generateDiff(column: String): (String, Column) = generateDiff(column, column)

// when left and right column names are different:
def generateDiff(leftCol: String, rightCol: String): (String, Column) =
  (s"${leftCol}_diff", col("l." + leftCol) - col("r." + rightCol))

val diffColumns = Seq("T1", "T2").map(generateDiff)

更新结束

假设列在 df1df2 中的名称相同,您可以这样做:

val diffCols = df1.columns
                  .filter(_.matches("T\d+"))
                  .map(c => col(s"l.$c") - col(s"r.$c") as (s"${c.toLowerCase}_diff") )

然后与 joined 一起使用,例如:

joined.select( ( col("*") :+ diffCols ) :_*).show(false)
//+----------+----+----+----+-----+----+-----+-------+-------+
//|WEEK      |DIM1|DIM2|T1  |T2   |T1  |T2   |t1_diff|t2_diff|
//+----------+----+----+----+-----+----+-----+-------+-------+
//|2016-04-02|14  |NULL|9874|880  |9879|820  |-5     |60     |
//|2017-06-10|15  |PQR |9867|57721|null|null |null   |null   |
//|2017-06-10|15  |XYZ |null|null |9967|57771|null   |null   |
//|2016-04-30|14  |FR  |9875|13   |9785|9    |90     |4      |
//+----------+----+----+----+-----+----+-----+-------+-------+