Spark DataFrame 联合递归
Spark DataFrame Union Recursion
我正在尝试对给定的原始 DataFrame 进行 substring(column,numOne,numTwo),并通过对通过执行 substring(column,numOne,numTwo) 创建的 DataFrame 的所有子集执行 UNION 来创建新的 DataFrame。
下面是我想出的一些代码
def main(args: Array[String]): Unit = {
//To Log only ERRORS
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("PopularMoviesDS")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.master("local[*]")
.getOrCreate()
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreatorOrg(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
while(swing>0){
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
swing -= 1
}
returnDf
}
dataFrameCreator(dataframeInt).show()
+-----+
| name|
+-----+
|handl|
| Chan|
+-----+
上面的代码按我的预期工作,但我想运行上面使用的尾递归。下面的代码,
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreator(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
returnDf
}
@tailrec
def bigUnionHelper(num: Int, df: DataFrame): DataFrame = {
if (num<0) df
else bigUnionHelper(num-1, dataFrameCreator(dataframeInt))
}
bigUnionHelper(swing, dataframeInt).show()
//Result:
+-----+
| name|
+-----+
|handl|
|handl|
|handl|
+-----+
我完全明白还有优化的空间,但我无法弄清楚为什么 tailRecursive - bigUnionHelper 不工作并且没有给出与第一个函数相同的结果。
感谢您的帮助,在此先感谢您。
我觉得应该是这样的。
val swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
def bigUnionHelper(df: DataFrame, num: Int): DataFrame = {
@tailrec
def dataFrameCreator(df: DataFrame, num:Int, acc:List[DataFrame] = List()): List[DataFrame] = {
if (num < 1) acc
else {
val map: Map[Int, Seq[String]] = Map(1 -> Seq("1", "4"), 2 -> Seq("2", "5"))
val tempDf = df.selectExpr(s"substring(name,${map(num).head},${map(swing)(1)})")
dataFrameCreator(df, num -1, tempDf :: acc)
}
}
dataFrameCreator(df, num).reduce(_ union _)
}
bigUnionHelper(dataframeInt, swing).show()
我正在尝试对给定的原始 DataFrame 进行 substring(column,numOne,numTwo),并通过对通过执行 substring(column,numOne,numTwo) 创建的 DataFrame 的所有子集执行 UNION 来创建新的 DataFrame。 下面是我想出的一些代码
def main(args: Array[String]): Unit = {
//To Log only ERRORS
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("PopularMoviesDS")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.master("local[*]")
.getOrCreate()
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreatorOrg(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
while(swing>0){
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
swing -= 1
}
returnDf
}
dataFrameCreator(dataframeInt).show()
+-----+
| name|
+-----+
|handl|
| Chan|
+-----+
上面的代码按我的预期工作,但我想运行上面使用的尾递归。下面的代码,
var swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
var returnDf:DataFrame = spark.emptyDataFrame.withColumn("name",functions.lit(null))
def dataFrameCreator(df:DataFrame): DataFrame ={
val map:Map[Int, Seq[String]] = Map(1 -> Seq("1","4"), 2 -> Seq("2","5"))
returnDf = returnDf.union(df.selectExpr(s"substring(name,${map(swing)(0)},${map(swing)(1)})"))
returnDf
}
@tailrec
def bigUnionHelper(num: Int, df: DataFrame): DataFrame = {
if (num<0) df
else bigUnionHelper(num-1, dataFrameCreator(dataframeInt))
}
bigUnionHelper(swing, dataframeInt).show()
//Result:
+-----+
| name|
+-----+
|handl|
|handl|
|handl|
+-----+
我完全明白还有优化的空间,但我无法弄清楚为什么 tailRecursive - bigUnionHelper 不工作并且没有给出与第一个函数相同的结果。 感谢您的帮助,在此先感谢您。
我觉得应该是这样的。
val swing = 2
val dataframeInt = spark.createDataFrame(Seq(
(1, "Chandler", "Pasadena", "US")
)).toDF("id", "name", "city", "country")
def bigUnionHelper(df: DataFrame, num: Int): DataFrame = {
@tailrec
def dataFrameCreator(df: DataFrame, num:Int, acc:List[DataFrame] = List()): List[DataFrame] = {
if (num < 1) acc
else {
val map: Map[Int, Seq[String]] = Map(1 -> Seq("1", "4"), 2 -> Seq("2", "5"))
val tempDf = df.selectExpr(s"substring(name,${map(num).head},${map(swing)(1)})")
dataFrameCreator(df, num -1, tempDf :: acc)
}
}
dataFrameCreator(df, num).reduce(_ union _)
}
bigUnionHelper(dataframeInt, swing).show()