如何在 Scala 中创建未初始化的 Dataframe 变量。因此可以在 if else 条件下初始化相同的变量
How to create an uninitialised Dataframe variable in scala. So that same variable can be initialized in if else condition
我需要创建一个未初始化的 Dataframe
变量。以便 post 初始化其中的值,我可以将其添加到 Seq
var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame
queries.foreach(q=>{
var view_name = q._1
var sourceType = q._2
var query = q._3
var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame
if(sourceType == "sqlserver"){
df = jdbcConn.option("query", query).load()
}else if(sourceType == "mongodb"){
var connectionString = connectionInt.setCollection(view_name);
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").
option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
option("uri", connectionString).
load();
}
df.createOrReplaceTempView(view_name)
var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
dataframes = dataframes :+ tup
});
我收到以下错误
input.scala:102: error: type mismatch;
found : Seq[(String, org.apache.spark.sql.DataFrame)]
(which expands to) Seq[(String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])]
required: (String, org.apache.spark.sql.DataFrame)
(which expands to) (String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])
var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
^
使用“map”而不是“foreach”看起来更好:
val dataframes = queries.map({ case (view_name, sourceType, query) => {
val df: org.apache.spark.sql.DataFrame =
if (sourceType == "sqlserver") {
jdbcConn.option("query", query).load()
} else if (sourceType == "mongodb") {
var connectionString = connectionInt.setCollection(view_name);
spark.read.format("com.mongodb.spark.sql.DefaultSource").
option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
option("uri", connectionString).
load();
}
else {
spark.emptyDataFrame
}
df.createOrReplaceTempView(view_name)
(view_name, df)
}
});
我需要创建一个未初始化的 Dataframe
变量。以便 post 初始化其中的值,我可以将其添加到 Seq
var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame
queries.foreach(q=>{
var view_name = q._1
var sourceType = q._2
var query = q._3
var df: org.apache.spark.sql.DataFrame = spark.emptyDataFrame
if(sourceType == "sqlserver"){
df = jdbcConn.option("query", query).load()
}else if(sourceType == "mongodb"){
var connectionString = connectionInt.setCollection(view_name);
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").
option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
option("uri", connectionString).
load();
}
df.createOrReplaceTempView(view_name)
var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df)
dataframes = dataframes :+ tup
});
我收到以下错误
input.scala:102: error: type mismatch; found : Seq[(String, org.apache.spark.sql.DataFrame)] (which expands to) Seq[(String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row])] required: (String, org.apache.spark.sql.DataFrame) (which expands to) (String, org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) var tup: Tuple2[String, org.apache.spark.sql.DataFrame] = dataframes :+ (view_name, df) ^
使用“map”而不是“foreach”看起来更好:
val dataframes = queries.map({ case (view_name, sourceType, query) => {
val df: org.apache.spark.sql.DataFrame =
if (sourceType == "sqlserver") {
jdbcConn.option("query", query).load()
} else if (sourceType == "mongodb") {
var connectionString = connectionInt.setCollection(view_name);
spark.read.format("com.mongodb.spark.sql.DefaultSource").
option("spark.mongodb.input.partitioner", "MongoSinglePartitioner").
option("uri", connectionString).
load();
}
else {
spark.emptyDataFrame
}
df.createOrReplaceTempView(view_name)
(view_name, df)
}
});