Apache Flink:检查一个在延迟后分派元素的运算符

Apache Flink: checkpointing an operator that dispatches elements after a delay

我想在一定延迟后中继来自流的传入事件。这是执行此操作的 Flink 运算符的代码

class MessageDelayFunction[T](schedulingDelay: Int, timeUnit: TimeUnit)
  extends RichAsyncFunction[T, T] {

  // use guava's executor for periodic execution
  private var scheduledThreadPoolExecutor: ScheduledThreadPoolExecutor = _
  private var executorService: ListeningScheduledExecutorService = _


  override def open(parameters: Configuration): Unit = {
    scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1)
    executorService = MoreExecutors.listeningDecorator(scheduledThreadPoolExecutor)
  }

  override def asyncInvoke(input: T,
                           resultFuture: ResultFuture[T]): Unit =
    // dispatch the input after the specified delay
    executorService.schedule(
      new Runnable {
        override def run(): Unit = resultFuture.complete(Collections.singletonList(input))
      },
      schedulingDelay,
      timeUnit)


  override def timeout(input: T, resultFuture: ResultFuture[T]): Unit =
    // if we incur a timeout, ignore the message
    resultFuture.complete(Collections.emptyList())
}

我 运行 这个有 schedulingDelay = 2timeUnit = timeUnit.HOURS

但是,Flink 的检查点使用此运算符失败(我验证了它在没有它的情况下工作正常)。检查点每 2 分钟发生一次,超时为 10 分钟。每个检查点都会超时,整个节点似乎只是“卡在”这个操作符上。有没有更好的方法来实现我在 Flink 中尝试做的事情?或者有什么方法可以解决检查点问题?

您可以使用 flink 提供的定时器功能,这些也是检查点。