使用 Spark 优化多个 JDBC 查询
Optimize multiple JDBC queries with Spark
我正在尝试使用 Spark 从 Greenplum 数据库中提取增量数据。我们有每个 table 的增量数据,带有一个名为 transactionId
的键。
每个 transactionId
可以包含一行或多行的数据。所有这些都存储在元数据 table: incKeyTable
中。
我们还有另一个元数据 table 中每个 table 的最后移动 transactionID
:incKeyLoads
。此 table 包含每个 table 的一个条目,这是最后更新 transactionId
进入生产 table。
为了找出每个 table 的增量 transactionid
,我想出了以下逻辑。
val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap
现在我在 scala Map 中有了我最近更新的事务 ID,如下所示:
lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)
为了找出最新的transactionId
(增量数据)即将到来的greenplum,我编写了以下逻辑来查询元数据table:incKeyTable
Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
val returnMsg = "Full loads on this table"
val count = incTransIds.where($"load_type" === "FULLLOAD").count
if(count == 0) {
incTransIds.createOrReplaceTempView("incTransID")
val execQuery = s"SELECT transactionId from incTransID order by transactionId desc"
val incLogIdDf = spark.sql(execQuery)
incLogIdDf.show
val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
pushLogIds
} else {
println("Full load count is greater than zero..")
returnMsg
}
}
var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
val tablename = keyTable.split("\.") // Tablename = schema.tablename
val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
}
)
此方法有效,但它花费了很长时间,以至于我可以在搜索完成之前从 table 级别的 greenplum 中提取整个数据,因为数据框 cdf 是一个巨大的数据框。我尝试缓存数据框:cdf,但它包含近 500 万行,建议不要将如此大的 table 缓存到缓存中。
我想不出其他可以加快搜索速度的方法。任何人都可以让我知道使这个过程高效的想法吗?
问题中的代码不可能是您实际 运行ning 的内容,因为您在 sortLogIds
中返回 pushLogIds
,它从未被定义,并且您正在从 schema.tablename
而不是来自 s"schema.$tablename"
。这使得很难确定发生了什么......
也就是说,从大数据处理模式的角度来看,您的方法存在几个潜在问题:
迭代而不是 UNION 转换。 其他条件相同,而不是发出许多单独的查询然后在驱动程序上组装结果,最好是想办法发出单个查询。这就是优化器有机会提供帮助的方式。在您的情况下,考虑创建一个 Greenplum 视图,该视图组合了 lutransIdTableMap
.
中的所有表
操作而不是连接转换。 在 sortLogIds
中,您正在执行 count
操作只是为了决定是否要 运行 额外查询。在其他条件相同的情况下,最好通过连接转换来表达这一点,以便延迟 运行ning 操作。稍后您发出 show
,在幕后相当于 take(n)
。这个动作真的有必要吗?稍后您使用 collect
来生成 SQL 表达式以在 IN
运算符中使用。这是您应该改用联接的另一个示例。总而言之,您正在执行由 incTransId
表示的同一个 Greenplum 基础查询三次。如果你坚持这种处理,你绝对应该以某种方式坚持incTransId
。
SQL 使用汇编而不是 DSL。 通常,如果您通过编程语言而不是通过 SparkSQL,您应该使用 DSL 而不是将 SQL 表达式组装为字符串。这样,您就不需要重新定义视图等
如果没有完整的代码并且不知道确切的 Greenplum 模式 + 分布策略 + 索引(如果有的话)和涉及的数据大小,这里有太多需要修复的地方。然而,以上应该给你一个起点。
这是一个如何从使用迭代切换到联合的示例。
val allData = Map("table1" -> 101, "table2" -> 212)
.map { case (tableName, id) =>
spark.table(tableName).withColumn("id", lit(id))
}
.reduceLeft(_ union _)
这里是一个如何使用连接而不是 collect
+ IN
的例子。
val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action
以上示例还展示了如何使用 collect
的数据集,例如将 .collect().map(_.getInt(0).toString)
替换为 .as[String].collect
,这样更简单、更安全、更快速。
希望对您有所帮助!
我正在尝试使用 Spark 从 Greenplum 数据库中提取增量数据。我们有每个 table 的增量数据,带有一个名为 transactionId
的键。
每个 transactionId
可以包含一行或多行的数据。所有这些都存储在元数据 table: incKeyTable
中。
我们还有另一个元数据 table 中每个 table 的最后移动 transactionID
:incKeyLoads
。此 table 包含每个 table 的一个条目,这是最后更新 transactionId
进入生产 table。
为了找出每个 table 的增量 transactionid
,我想出了以下逻辑。
val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap
现在我在 scala Map 中有了我最近更新的事务 ID,如下所示:
lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)
为了找出最新的transactionId
(增量数据)即将到来的greenplum,我编写了以下逻辑来查询元数据table:incKeyTable
Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
val returnMsg = "Full loads on this table"
val count = incTransIds.where($"load_type" === "FULLLOAD").count
if(count == 0) {
incTransIds.createOrReplaceTempView("incTransID")
val execQuery = s"SELECT transactionId from incTransID order by transactionId desc"
val incLogIdDf = spark.sql(execQuery)
incLogIdDf.show
val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
pushLogIds
} else {
println("Full load count is greater than zero..")
returnMsg
}
}
var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
val tablename = keyTable.split("\.") // Tablename = schema.tablename
val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
}
)
此方法有效,但它花费了很长时间,以至于我可以在搜索完成之前从 table 级别的 greenplum 中提取整个数据,因为数据框 cdf 是一个巨大的数据框。我尝试缓存数据框:cdf,但它包含近 500 万行,建议不要将如此大的 table 缓存到缓存中。 我想不出其他可以加快搜索速度的方法。任何人都可以让我知道使这个过程高效的想法吗?
问题中的代码不可能是您实际 运行ning 的内容,因为您在 sortLogIds
中返回 pushLogIds
,它从未被定义,并且您正在从 schema.tablename
而不是来自 s"schema.$tablename"
。这使得很难确定发生了什么......
也就是说,从大数据处理模式的角度来看,您的方法存在几个潜在问题:
迭代而不是 UNION 转换。 其他条件相同,而不是发出许多单独的查询然后在驱动程序上组装结果,最好是想办法发出单个查询。这就是优化器有机会提供帮助的方式。在您的情况下,考虑创建一个 Greenplum 视图,该视图组合了
lutransIdTableMap
. 中的所有表
操作而不是连接转换。 在
sortLogIds
中,您正在执行count
操作只是为了决定是否要 运行 额外查询。在其他条件相同的情况下,最好通过连接转换来表达这一点,以便延迟 运行ning 操作。稍后您发出show
,在幕后相当于take(n)
。这个动作真的有必要吗?稍后您使用collect
来生成 SQL 表达式以在IN
运算符中使用。这是您应该改用联接的另一个示例。总而言之,您正在执行由incTransId
表示的同一个 Greenplum 基础查询三次。如果你坚持这种处理,你绝对应该以某种方式坚持incTransId
。SQL 使用汇编而不是 DSL。 通常,如果您通过编程语言而不是通过 SparkSQL,您应该使用 DSL 而不是将 SQL 表达式组装为字符串。这样,您就不需要重新定义视图等
如果没有完整的代码并且不知道确切的 Greenplum 模式 + 分布策略 + 索引(如果有的话)和涉及的数据大小,这里有太多需要修复的地方。然而,以上应该给你一个起点。
这是一个如何从使用迭代切换到联合的示例。
val allData = Map("table1" -> 101, "table2" -> 212)
.map { case (tableName, id) =>
spark.table(tableName).withColumn("id", lit(id))
}
.reduceLeft(_ union _)
这里是一个如何使用连接而不是 collect
+ IN
的例子。
val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action
以上示例还展示了如何使用 collect
的数据集,例如将 .collect().map(_.getInt(0).toString)
替换为 .as[String].collect
,这样更简单、更安全、更快速。
希望对您有所帮助!