如何使用 Scala 将 DataSet 传递给接受 DataFrame 作为 Apache Spark 参数的函数?
How to pass DataSet(s) to a function that accepts DataFrame(s) as arguments in Apache Spark using Scala?
我在 Scala 中有一个用于 Spark 的库,其中包含许多函数。
一个示例是以下函数,用于合并具有不同列的两个数据帧:
def appendDF(df2: DataFrame): DataFrame = {
val cols1 = df.columns.toSeq
val cols2 = df2.columns.toSeq
def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
targetCols.map({
case x if sourceCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
// both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))
}
我想将此函数(以及更多)用于 Dataset[CleanRow]
而不是 DataFrames。 CleanRow
是一个简单的 class ,它定义了列的名称和类型。
我有根据的猜测是使用 .toDF()
方法将 Dataset 转换为 Dataframe。不过,我想知道有没有更好的方法。
根据我的理解,Dataset 和 Dataframe 之间应该没有太大区别,因为 Dataset 只是 Dataframe[Row]。另外,我认为从 Spark 2.x 开始,DF 和 DS 的 API 已经统一,所以我想我可以互换使用它们中的任何一个,但事实并非如此。
如果可以更改签名:
import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
def f[T](d: Dataset[T]): Dataset[T] = {d}
// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]
// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]
我在 Scala 中有一个用于 Spark 的库,其中包含许多函数。 一个示例是以下函数,用于合并具有不同列的两个数据帧:
def appendDF(df2: DataFrame): DataFrame = {
val cols1 = df.columns.toSeq
val cols2 = df2.columns.toSeq
def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
targetCols.map({
case x if sourceCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
// both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))
}
我想将此函数(以及更多)用于 Dataset[CleanRow]
而不是 DataFrames。 CleanRow
是一个简单的 class ,它定义了列的名称和类型。
我有根据的猜测是使用 .toDF()
方法将 Dataset 转换为 Dataframe。不过,我想知道有没有更好的方法。
根据我的理解,Dataset 和 Dataframe 之间应该没有太大区别,因为 Dataset 只是 Dataframe[Row]。另外,我认为从 Spark 2.x 开始,DF 和 DS 的 API 已经统一,所以我想我可以互换使用它们中的任何一个,但事实并非如此。
如果可以更改签名:
import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
def f[T](d: Dataset[T]): Dataset[T] = {d}
// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]
// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]