等待 x 秒后重试 SQL UPDATE 查询

Retry SQL UPDATE query after waiting x seconds

我正在使用 RichSinkFunction 对现有记录执行 SQL UPDATE 查询。 此函数假定记录已存在于数据库中。但是,在某些情况下,现有记录会延迟。

为了克服记录延迟的问题,我添加了一个 Thread.sleep() 让函数等待并重试数据库更新。

下面提供了示例代码以供参考。

class RichSinkFact extends RichSinkFunction[FulfillmentUsagesOutput]{

private def updateFactUpcoming(
    r: FulfillmentUsagesOutput,
    schemaName: String
  ): Unit = {

    var updateStmt: PreparedStatement = null
    val sqlStatement =
      s"""
         |UPDATE $schemaName.$factUpcomingTableName
         |SET unit_id = ?
         |WHERE pledge_id = ?
         |;
         |
      """.stripMargin

    try {
      updateStmt = connection.prepareStatement(sqlStatement)
      updateStmt.setLong(1, r.unit_id)
      updateStmt.setString(2, r.pledge_id)
      val rows = updateStmt.executeUpdate()

      if(rows == 0) {
        logger.warn(s"Retrying update for ${r}")
        //retry update
        Thread.sleep(retrySleepTime)
        val rows = updateStmt.executeUpdate()
        if(rows == 0){
          //raise error
          logger.error(s"Unable to update row: ${r}")
        }
      }

    } finally {
      if (updateStmt != null) {
        updateStmt.close()
      }
    }
  }
}

问题 : 由于Flink已经实现了其他定时器并使用了内部时间处理函数,那么重试DB更新的方式是否正确?

谢谢

如您所料,在 Flink 用户函数中休眠会导致问题,应该避免。在这种情况下,有一个更好的解决方案:看看 Sink.ProcessingTimeService。这将允许您注册计时器,这些计时器将在它们触发时调用您注册的回调。

感谢 David 提出了这种方法背后的最初想法。 Sink.ProcessingTimeService is only present from Flink 1.12 onwards. So, for anyone on a previous version of Flink looking to implement a similar solution, ProcessingTimeCallback 可用于在 Sink 应用程序中实现计时器。

我在这里包含了一个示例方法 https://gist.github.com/soumoks/f73694c64169c8b3494ba1842fa61f1b