使用 Spark 优化多个 JDBC 查询

Optimize multiple JDBC queries with Spark

我正在尝试使用 Spark 从 Greenplum 数据库中提取增量数据。我们有每个 table 的增量数据,带有一个名为 transactionId 的键。 每个 transactionId 可以包含一行或多行的数据。所有这些都存储在元数据 table: incKeyTable 中。 我们还有另一个元数据 table 中每个 table 的最后移动 transactionIDincKeyLoads。此 table 包含每个 table 的一个条目,这是最后更新 transactionId 进入生产 table。 为了找出每个 table 的增量 transactionid,我想出了以下逻辑。

val spark = SparkSession.builder().master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
import spark.implicits._
Class.forName("org.postgresql.Driver").newInstance()
val tableStatus = s"select tablename, last_update_transaction_id from prod.incKeyLoads where source_system='DB2' and tablename='table1' and final_stage='PROD' and load='Successfull'"
val tableMetaDF = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(${tableStatus}) as LoadedData").option("user", "user").option("password", "pwd").load()
val lutransIdTableMap   = tableMetaDF.map(r => (r.getString(0),r.getLong(1))).collect().toMap

现在我在 scala Map 中有了我最近更新的事务 ID,如下所示:

lutransIdTableMap.foreach(println) =
(table1 -> 123)
(table2 -> 113)
(table3 -> 122)
...
(tableN -> 098)

为了找出最新的transactionId(增量数据)即将到来的greenplum,我编写了以下逻辑来查询元数据table:incKeyTable

Class.forName("com.pivotal.jdbc.GreenplumDriver").newInstance()
def sortLogIds(incTransIds:DataFrame, lastMovedTransId:Long, tablename: String):String = {
    val returnMsg = "Full loads on this table"
    val count = incTransIds.where($"load_type" === "FULLLOAD").count
    if(count == 0) {
      incTransIds.createOrReplaceTempView("incTransID")
      val execQuery  = s"SELECT transactionId from incTransID order by transactionId desc"
      val incLogIdDf = spark.sql(execQuery)
      incLogIdDf.show
      val pushTransIds = "select * from schema.tablename where transactionID in(" + "'" + incLogIdDf.select($"transactionId").collect().map(_.getInt(0).toString).mkString("','") + "')"
      pushLogIds
    } else {
      println("Full load count is greater than zero..")
      returnMsg
    }
}

var incTransIdMap = Map[String, String]()
lutransIdTableMap.keys.foreach(keyTable => if(lutransIdTableMap(keyTable) !=0) {
    val tablename = keyTable.split("\.")   // Tablename = schema.tablename
    val cdf = spark.read.format("jdbc").option("url", "url").option("dbtable", s"(select transactionId, load_type, source_system, tablename from schema.incKeyTable where source_system='DB2' and target_table='${tablename(1)}' and transactionId > ${lutransIdTableMap(keyTable)}) as controlTableDF").option("user", "user").option("password", "pwd").load()
    incTransIdMap += (keyTable -> sortLogIds(cdf, lutransIdTableMap(keyTable), tablename(1)))
    }
)

此方法有效,但它花费了很长时间,以至于我可以在搜索完成之前从 table 级别的 greenplum 中提取整个数据,因为数据框 cdf 是一个巨大的数据框。我尝试缓存数据框:cdf,但它包含近 500 万行,建议不要将如此大的 table 缓存到缓存中。 我想不出其他可以加快搜索速度的方法。任何人都可以让我知道使这个过程高效的想法吗?

问题中的代码不可能是您实际 运行ning 的内容,因为您在 sortLogIds 中返回 pushLogIds,它从未被定义,并且您正在从 schema.tablename 而不是来自 s"schema.$tablename"。这使得很难确定发生了什么......

也就是说,从大数据处理模式的角度来看,您的方法存在几个潜在问题:

  1. 迭代而不是 UNION 转换。 其他条件相同,而不是发出许多单独的查询然后在驱动程序上组装结果,最好是想办法发出单个查询。这就是优化器有机会提供帮助的方式。在您的情况下,考虑创建一个 Greenplum 视图,该视图组合了 lutransIdTableMap.

  2. 中的所有表
  3. 操作而不是连接转换。sortLogIds 中,您正在执行 count 操作只是为了决定是否要 运行 额外查询。在其他条件相同的情况下,最好通过连接转换来表达这一点,以便延迟 运行ning 操作。稍后您发出 show,在幕后相当于 take(n)。这个动作真的有必要吗?稍后您使用 collect 来生成 SQL 表达式以在 IN 运算符中使用。这是您应该改用联接的另一个示例。总而言之,您正在执行由 incTransId 表示的同一个 Greenplum 基础查询三次。如果你坚持这种处理,你绝对应该以某种方式坚持incTransId

  4. SQL 使用汇编而不是 DSL。 通常,如果您通过编程语言而不是通过 SparkSQL,您应该使用 DSL 而不是将 SQL 表达式组装为字符串。这样,您就不需要重新定义视图等

如果没有完整的代码并且不知道确切的 Greenplum 模式 + 分布策略 + 索引(如果有的话)和涉及的数据大小,这里有太多需要修复的地方。然而,以上应该给你一个起点。

这是一个如何从使用迭代切换到联合的示例。

val allData = Map("table1" -> 101, "table2" -> 212)
  .map { case (tableName, id) =>
    spark.table(tableName).withColumn("id", lit(id))
  }
  .reduceLeft(_ union _)

这里是一个如何使用连接而不是 collect + IN 的例子。

val allIds = spark.range(100)
val myIds = spark.createDataset(Seq(11, 33, 55, 77, 99)).toDF("id")
allIds.where('id.isin(myIds.as[Int].collect: _*)) // premature action
allIds.join(myIds, Seq("id")) // inner join delays action

以上示例还展示了如何使用 collect 的数据集,例如将 .collect().map(_.getInt(0).toString) 替换为 .as[String].collect,这样更简单、更安全、更快速。

希望对您有所帮助!