检查 table 是否存在 Spark jdbc
Check table exists Spark jdbc
我正在使用 Spark JDBC 从 Microsoft SQL 服务器读取一些数据到数据框中。当 table 不存在时(例如,它被意外删除)我得到一个异常:com.microsoft.sqlserver.jdbc.SQLServerException: 对象名称无效 'TestAllData'.
我想创建一些机制来首先检查 table 是否存在,然后才读取数据。有没有办法使用 Spark JDBC 来做到这一点?
因为我尝试使用 Ms sql 服务器的 if exists 构造,但它不适用于使用 Spark 进行查询。
目前,我读取数据的代码如下所示:
def getDataQuery() = {
s"(select * from TestData) as subq"
}
def jdbcOptions(dataQuery: String, partitionColumn: String, lowerBound: String, upperBound: String, numPartitions: String) = Map[String,String](
"driver" -> config.getString("sqlserver.db.driver"),
"url" -> config.getString("sqlserver.db.url"),
"user" -> config.getString("sqlserver.db.user"),
"password" -> config.getString("sqlserver.db.password"),
"customSchema" -> config.getString("sqlserver.db.custom_schema"),
"dbtable" -> dataQuery,
"partitionColumn" -> partitionColumn,
"lowerBound" -> lowerBound,
"upperBound" -> upperBound,
"numPartitions" -> numPartitions
)
val dataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions(getDataQuery()))
.load()
您可以使用查询进行检查,如果:
def tableExist() = {
s"show tables in default"
}
val existDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions(tableExist()))
.load()
val dataDF = if (existDF.select("tableName").collect().map(_ (0)).contains("TestData"))
sparkSession
.read
.format("jdbc")
.options(jdbcOptions(getDataQuery()))
.load()
与 Pablo López Gallego 所写的概念相同,但针对 Postgres
object JdbcLoader extends App{
val finalUrl = s"jdbc:postgresql://localhost:5432/my_db?ApplicationName=test"
val user = "user"
val password = "pass"
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
def loadTable(tableName:String ): DataFrame ={
val opts: Map[String, String] = Map(
"url" -> finalUrl,
"user" -> user,
"password" -> password,
"dbtable" -> tableName
)
spark.sqlContext.
read.
format("jdbc").
options(opts).
load
}
def checkIfTableExists(tableName: String) : Boolean = {
var schema = "public"
var table = tableName
if (tableName.contains(".")){
val schemaAndTable = tableName.split("\.")
schema = schemaAndTable.head
table = schemaAndTable.last
}
val tableExistQ = s"(SELECT table_name FROM information_schema.tables WHERE table_schema='${schema}'" +
s" AND table_type='BASE TABLE' and table_name = '${table}') as FOO"
val df = loadTable(tableExistQ)
df.count() > 0
}
println(checkIfTableExists("my_schema.users"))
}
我正在使用 Spark JDBC 从 Microsoft SQL 服务器读取一些数据到数据框中。当 table 不存在时(例如,它被意外删除)我得到一个异常:com.microsoft.sqlserver.jdbc.SQLServerException: 对象名称无效 'TestAllData'.
我想创建一些机制来首先检查 table 是否存在,然后才读取数据。有没有办法使用 Spark JDBC 来做到这一点? 因为我尝试使用 Ms sql 服务器的 if exists 构造,但它不适用于使用 Spark 进行查询。
目前,我读取数据的代码如下所示:
def getDataQuery() = {
s"(select * from TestData) as subq"
}
def jdbcOptions(dataQuery: String, partitionColumn: String, lowerBound: String, upperBound: String, numPartitions: String) = Map[String,String](
"driver" -> config.getString("sqlserver.db.driver"),
"url" -> config.getString("sqlserver.db.url"),
"user" -> config.getString("sqlserver.db.user"),
"password" -> config.getString("sqlserver.db.password"),
"customSchema" -> config.getString("sqlserver.db.custom_schema"),
"dbtable" -> dataQuery,
"partitionColumn" -> partitionColumn,
"lowerBound" -> lowerBound,
"upperBound" -> upperBound,
"numPartitions" -> numPartitions
)
val dataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions(getDataQuery()))
.load()
您可以使用查询进行检查,如果:
def tableExist() = {
s"show tables in default"
}
val existDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions(tableExist()))
.load()
val dataDF = if (existDF.select("tableName").collect().map(_ (0)).contains("TestData"))
sparkSession
.read
.format("jdbc")
.options(jdbcOptions(getDataQuery()))
.load()
与 Pablo López Gallego 所写的概念相同,但针对 Postgres
object JdbcLoader extends App{
val finalUrl = s"jdbc:postgresql://localhost:5432/my_db?ApplicationName=test"
val user = "user"
val password = "pass"
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
def loadTable(tableName:String ): DataFrame ={
val opts: Map[String, String] = Map(
"url" -> finalUrl,
"user" -> user,
"password" -> password,
"dbtable" -> tableName
)
spark.sqlContext.
read.
format("jdbc").
options(opts).
load
}
def checkIfTableExists(tableName: String) : Boolean = {
var schema = "public"
var table = tableName
if (tableName.contains(".")){
val schemaAndTable = tableName.split("\.")
schema = schemaAndTable.head
table = schemaAndTable.last
}
val tableExistQ = s"(SELECT table_name FROM information_schema.tables WHERE table_schema='${schema}'" +
s" AND table_type='BASE TABLE' and table_name = '${table}') as FOO"
val df = loadTable(tableExistQ)
df.count() > 0
}
println(checkIfTableExists("my_schema.users"))
}