等待 x 秒后重试 SQL UPDATE 查询
Retry SQL UPDATE query after waiting x seconds
我正在使用 RichSinkFunction 对现有记录执行 SQL UPDATE 查询。
此函数假定记录已存在于数据库中。但是,在某些情况下,现有记录会延迟。
为了克服记录延迟的问题,我添加了一个 Thread.sleep()
让函数等待并重试数据库更新。
下面提供了示例代码以供参考。
class RichSinkFact extends RichSinkFunction[FulfillmentUsagesOutput]{
private def updateFactUpcoming(
r: FulfillmentUsagesOutput,
schemaName: String
): Unit = {
var updateStmt: PreparedStatement = null
val sqlStatement =
s"""
|UPDATE $schemaName.$factUpcomingTableName
|SET unit_id = ?
|WHERE pledge_id = ?
|;
|
""".stripMargin
try {
updateStmt = connection.prepareStatement(sqlStatement)
updateStmt.setLong(1, r.unit_id)
updateStmt.setString(2, r.pledge_id)
val rows = updateStmt.executeUpdate()
if(rows == 0) {
logger.warn(s"Retrying update for ${r}")
//retry update
Thread.sleep(retrySleepTime)
val rows = updateStmt.executeUpdate()
if(rows == 0){
//raise error
logger.error(s"Unable to update row: ${r}")
}
}
} finally {
if (updateStmt != null) {
updateStmt.close()
}
}
}
}
问题 : 由于Flink已经实现了其他定时器并使用了内部时间处理函数,那么重试DB更新的方式是否正确?
谢谢
如您所料,在 Flink 用户函数中休眠会导致问题,应该避免。在这种情况下,有一个更好的解决方案:看看 Sink.ProcessingTimeService。这将允许您注册计时器,这些计时器将在它们触发时调用您注册的回调。
感谢 David 提出了这种方法背后的最初想法。
Sink.ProcessingTimeService is only present from Flink 1.12 onwards. So, for anyone on a previous version of Flink looking to implement a similar solution, ProcessingTimeCallback 可用于在 Sink 应用程序中实现计时器。
我在这里包含了一个示例方法
https://gist.github.com/soumoks/f73694c64169c8b3494ba1842fa61f1b
我正在使用 RichSinkFunction 对现有记录执行 SQL UPDATE 查询。 此函数假定记录已存在于数据库中。但是,在某些情况下,现有记录会延迟。
为了克服记录延迟的问题,我添加了一个 Thread.sleep()
让函数等待并重试数据库更新。
下面提供了示例代码以供参考。
class RichSinkFact extends RichSinkFunction[FulfillmentUsagesOutput]{
private def updateFactUpcoming(
r: FulfillmentUsagesOutput,
schemaName: String
): Unit = {
var updateStmt: PreparedStatement = null
val sqlStatement =
s"""
|UPDATE $schemaName.$factUpcomingTableName
|SET unit_id = ?
|WHERE pledge_id = ?
|;
|
""".stripMargin
try {
updateStmt = connection.prepareStatement(sqlStatement)
updateStmt.setLong(1, r.unit_id)
updateStmt.setString(2, r.pledge_id)
val rows = updateStmt.executeUpdate()
if(rows == 0) {
logger.warn(s"Retrying update for ${r}")
//retry update
Thread.sleep(retrySleepTime)
val rows = updateStmt.executeUpdate()
if(rows == 0){
//raise error
logger.error(s"Unable to update row: ${r}")
}
}
} finally {
if (updateStmt != null) {
updateStmt.close()
}
}
}
}
问题 : 由于Flink已经实现了其他定时器并使用了内部时间处理函数,那么重试DB更新的方式是否正确?
谢谢
如您所料,在 Flink 用户函数中休眠会导致问题,应该避免。在这种情况下,有一个更好的解决方案:看看 Sink.ProcessingTimeService。这将允许您注册计时器,这些计时器将在它们触发时调用您注册的回调。
感谢 David 提出了这种方法背后的最初想法。 Sink.ProcessingTimeService is only present from Flink 1.12 onwards. So, for anyone on a previous version of Flink looking to implement a similar solution, ProcessingTimeCallback 可用于在 Sink 应用程序中实现计时器。
我在这里包含了一个示例方法 https://gist.github.com/soumoks/f73694c64169c8b3494ba1842fa61f1b