如何在 Scala 中创建未初始化的 Dataframe 变量。因此可以在 if else 条件下初始化相同的变量

How to create an uninitialised Dataframe variable in scala. So that same variable can be initialized in if else condition

我需要创建一个未初始化的 Dataframe 变量。以便 post 初始化其中的值,我可以将其添加到 Seq

var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

queries.foreach(q=>{
    var view_name = q._1
    var sourceType = q._2
    var query = q._3
    var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame

    if(sourceType == "sqlserver"){
        df = jdbcConn.option("query", query).load()
    }else if(sourceType == "mongodb"){
        var connectionString = connectionInt.setCollection(view_name);
        df = spark.read.format("com.mongodb.spark.sql.DefaultSource").
                                option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
                                option("uri", connectionString).    
                                load();
    }

    df.createOrReplaceTempView(view_name)
    var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
    dataframes = dataframes :+ tup
});

我收到以下错误

input.scala:102: error: type mismatch; found : Seq[(String, org.apache.spark.sql.DataFrame)] (which expands to) Seq[(String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])] required: (String, org.apache.spark.sql.DataFrame) (which expands to) (String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df) ^

使用“map”而不是“foreach”看起来更好:

  val dataframes = queries.map({ case (view_name, sourceType, query) => {
  val df: org.apache.spark.sql.DataFrame =
    if (sourceType == "sqlserver") {
      jdbcConn.option("query", query).load()
    } else if (sourceType == "mongodb") {
      var connectionString = connectionInt.setCollection(view_name);
      spark.read.format("com.mongodb.spark.sql.DefaultSource").
        option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
        option("uri", connectionString).
        load();
    }
    else {
      spark.emptyDataFrame
    }

  df.createOrReplaceTempView(view_name)
  (view_name, df)
}
});