Spark DataFrame 联合递归

Spark DataFrame Union Recursion

我正在尝试对给定的原始 DataFrame 进行 substring(column,numOne,numTwo),并通过对通过执行 substring(column,numOne,numTwo) 创建的 DataFrame 的所有子集执行 UNION 来创建新的 DataFrame。 下面是我想出的一些代码

  def main(args: Array[String]): Unit = {
    //To Log only ERRORS
    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .appName("PopularMoviesDS")
      .config("spark.sql.warehouse.dir", "file:///C:/temp")
      .master("local[*]")
      .getOrCreate()

    var swing = 2
    val dataframeInt = spark.createDataFrame(Seq(
      (1, "Chandler", "Pasadena", "US")
    )).toDF("id", "name", "city", "country")
    var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
    def dataFrameCreatorOrg(df:DataFrame): DataFrame ={
      val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
        var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
      while(swing>0){
        returnDf =  returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
        swing -= 1
      }
      returnDf
    }
dataFrameCreator(dataframeInt).show()
    +-----+
    | name|
    +-----+
    |handl|
    | Chan|
    +-----+

上面的代码按我的预期工作,但我想运行上面使用的尾递归。下面的代码,


    var swing = 2
    val dataframeInt = spark.createDataFrame(Seq(
      (1, "Chandler", "Pasadena", "US")
    )).toDF("id", "name", "city", "country")
    var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))

    def dataFrameCreator(df:DataFrame): DataFrame ={
      val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
      returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
      returnDf
    }

    @tailrec
    def bigUnionHelper(num: Int, df: DataFrame): DataFrame = {
      if (num<0) df
      else bigUnionHelper(num-1, dataFrameCreator(dataframeInt))
    }
    bigUnionHelper(swing, dataframeInt).show()
//Result:
+-----+
| name|
+-----+
|handl|
|handl|
|handl|
+-----+

我完全明白还有优化的空间,但我无法弄清楚为什么 tailRecursive - bigUnionHelper 不工作并且没有给出与第一个函数相同的结果。 感谢您的帮助,在此先感谢您。

我觉得应该是这样的。

val swing = 2
val dataframeInt = spark.createDataFrame(Seq(
    (1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")

def bigUnionHelper(df: DataFrame, num: Int): DataFrame = {
    @tailrec
    def dataFrameCreator(df: DataFrame, num:Int, acc:List[DataFrame] = List()): List[DataFrame] = {
        if (num < 1) acc
        else {
            val map: Map[Int, Seq[String]] = Map(1 -> Seq("1", "4"), 2 -> Seq("2", "5"))
            val tempDf = df.selectExpr(s"substring(name,${map(num).head},${map(swing)(1)})")
            dataFrameCreator(df, num -1, tempDf :: acc)
        }
    }
    dataFrameCreator(df, num).reduce(_ union _)
}

bigUnionHelper(dataframeInt, swing).show()