Spark Scala 通过方法调用更新数据框

Spark Scala update a dataframe via a method call

我需要重命名 spark 数据框中的列列表。所以我写了下面的逻辑,当 i 运行 in spark shell 时它工作正常。

但是当我添加到静态对象并通过方法调用时它不起作用。

val policy1 = ("10931375", "TEMP", "US")
val policy2 = ("1328904", "TEAM", "US")
var policy = Seq(policy1, policy2).toDF("ID", "Source", "Country")

policy.show()

+--------+------+-------+
|      ID|Source|Country|
+--------+------+-------+
|10931375|  TEMP|     US|
| 1328904|  TEAM|     US|
+--------+------+-------+    

object Rules {
  val colMapping = Map("ID" -> "NEW_ID",
      "Source" -> "NEW_Source")


  def renameColumns(ds: Dataset[Row]): DataFrame = {
    colMapping foreach { x => ds.withColumnRenamed(x._1, x._2) }
    ds
  }
}

import Rules._
renameColumns(policy).show()
+--------+------+-------+
|      ID|Source|Country|
+--------+------+-------+
|10931375|  TEMP|     US|
| 1328904|  TEAM|     US|
+--------+------+-------+   

但是当我 运行 在 spark shell 中工作时

colMapping foreach { x => policy = policy.withColumnRenamed(x._1, x._2) }
policy.show()

+--------+----------+-------+
|  NEW_ID|NEW_Source|Country|
+--------+----------+-------+
|10931375|      TEMP|     US|
| 1328904|      TEAM|     US|
+--------+----------+-------+

这些片段不等同,工作方式也不相同:

  • 在第一种情况下,您将更改应用于 Dataset,每次调用都会创建一个新的独立 Dataset 并立即丢弃结果,而 return 原始Datset.

  • 在第二种情况下,您修改了对 Dataset 的可变引用。

您正在舍弃结果。不要使用 foreach

试一试

 def renameColumns(ds: Dataset[Row]): DataFrame = {
    colMapping.foldLeft(ds){
      case (d, (oldName, newName)) => {
        d.withColumnRenamed(oldName, newName)
      }
    }
  }