如何使用 Scala 将 DataSet 传递给接受 DataFrame 作为 Apache Spark 参数的函数?

How to pass DataSet(s) to a function that accepts DataFrame(s) as arguments in Apache Spark using Scala?

我在 Scala 中有一个用于 Spark 的库,其中包含许多函数。 一个示例是以下函数,用于合并具有不同列的两个数据帧:

def appendDF(df2: DataFrame): DataFrame = {

  val cols1 = df.columns.toSeq
  val cols2 = df2.columns.toSeq

  def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
    targetCols.map({
      case x if sourceCols.contains(x) => col(x)
      case y                           => lit(null).as(y)
    })
  }

  // both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
  df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))

}

我想将此函数(以及更多)用于 Dataset[CleanRow] 而不是 DataFrames。 CleanRow 是一个简单的 class ,它定义了列的名称和类型。 我有根据的猜测是使用 .toDF() 方法将 Dataset 转换为 Dataframe。不过,我想知道有没有更好的方法。

根据我的理解,Dataset 和 Dataframe 之间应该没有太大区别,因为 Dataset 只是 Dataframe[Row]。另外,我认为从 Spark 2.x 开始,DF 和 DS 的 API 已经统一,所以我想我可以互换使用它们中的任何一个,但事实并非如此。

如果可以更改签名:

import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

def f[T](d: Dataset[T]): Dataset[T] = {d}

// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]

// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]