Scala:动态连接数据帧

Scala: dynamically joining data frames

我将数据拆分为多个文件。我想加载并加入文件。 我想构建一个动态函数 1.将n个数据文件加入一个数据框 2. 给定文件位置和连接列的输入(例如,pk)

我认为这可以用 foldLeft 来完成,但我不太确定如何:

到目前为止,这是我的代码:

@throws
def dataJoin(path:String, fileNames:String*): DataFrame=
{
  try
  {
    val dfList:ArrayBuffer[DataFrame]=new ArrayBuffer
    for(fileName <- fileNames)
    {
      val df:DataFrame=DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName")
      dfList += df
    }

    dfList.foldLeft
    {
             (df,df1) => joinDataFrames(df,df1, "UID")
    }
  }
  catch
  {
    case e:Exception => throw new Exception(e)
  }
}


def joinDataFrames(df:DataFrame,df1:DataFrame, joinColum:String): Unit =
{
  df.join(df1, Seq(joinColum))
}

foldLeft 可能确实适合这里,但它需要一个 "zero" 元素来开始折叠(除了折叠功能)。在这种情况下,"zero" 可以是第一个 DataFrame:

dfList.tail.foldLeft(dfList.head) { (df1, df2) => df1.join(df2, "UID") }

为避免错误,您可能希望在尝试访问第一项之前确保列表不为空 - 一种方法是使用模式匹配。

dfList match {
  case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
  case Nil => spark.emptyDataFrame
}

最后,在一个集合上 map 比迭代它并填充另一个(空的、可变的)集合更简单、更安全、更惯用:

val dfList = fileNames.map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))

总计:

def dataJoin(path:String, fileNames: String*): DataFrame = {
  val dfList = fileNames
    .map(fileName => DataFrameUtils.openFile(spark, s"$path${File.separator}$fileName"))
    .toList

  dfList match {
    case head :: tail => tail.foldLeft(head) { (df1, df2) => df1.join(df2, "UID") }
    case Nil => spark.emptyDataFrame
  }
}