Apache Flink:检查一个在延迟后分派元素的运算符
Apache Flink: checkpointing an operator that dispatches elements after a delay
我想在一定延迟后中继来自流的传入事件。这是执行此操作的 Flink 运算符的代码
class MessageDelayFunction[T](schedulingDelay: Int, timeUnit: TimeUnit)
extends RichAsyncFunction[T, T] {
// use guava's executor for periodic execution
private var scheduledThreadPoolExecutor: ScheduledThreadPoolExecutor = _
private var executorService: ListeningScheduledExecutorService = _
override def open(parameters: Configuration): Unit = {
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1)
executorService = MoreExecutors.listeningDecorator(scheduledThreadPoolExecutor)
}
override def asyncInvoke(input: T,
resultFuture: ResultFuture[T]): Unit =
// dispatch the input after the specified delay
executorService.schedule(
new Runnable {
override def run(): Unit = resultFuture.complete(Collections.singletonList(input))
},
schedulingDelay,
timeUnit)
override def timeout(input: T, resultFuture: ResultFuture[T]): Unit =
// if we incur a timeout, ignore the message
resultFuture.complete(Collections.emptyList())
}
我 运行 这个有 schedulingDelay = 2
和 timeUnit = timeUnit.HOURS
。
但是,Flink 的检查点使用此运算符失败(我验证了它在没有它的情况下工作正常)。检查点每 2 分钟发生一次,超时为 10 分钟。每个检查点都会超时,整个节点似乎只是“卡在”这个操作符上。有没有更好的方法来实现我在 Flink 中尝试做的事情?或者有什么方法可以解决检查点问题?
您可以使用 flink 提供的定时器功能,这些也是检查点。
我想在一定延迟后中继来自流的传入事件。这是执行此操作的 Flink 运算符的代码
class MessageDelayFunction[T](schedulingDelay: Int, timeUnit: TimeUnit)
extends RichAsyncFunction[T, T] {
// use guava's executor for periodic execution
private var scheduledThreadPoolExecutor: ScheduledThreadPoolExecutor = _
private var executorService: ListeningScheduledExecutorService = _
override def open(parameters: Configuration): Unit = {
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1)
executorService = MoreExecutors.listeningDecorator(scheduledThreadPoolExecutor)
}
override def asyncInvoke(input: T,
resultFuture: ResultFuture[T]): Unit =
// dispatch the input after the specified delay
executorService.schedule(
new Runnable {
override def run(): Unit = resultFuture.complete(Collections.singletonList(input))
},
schedulingDelay,
timeUnit)
override def timeout(input: T, resultFuture: ResultFuture[T]): Unit =
// if we incur a timeout, ignore the message
resultFuture.complete(Collections.emptyList())
}
我 运行 这个有 schedulingDelay = 2
和 timeUnit = timeUnit.HOURS
。
但是,Flink 的检查点使用此运算符失败(我验证了它在没有它的情况下工作正常)。检查点每 2 分钟发生一次,超时为 10 分钟。每个检查点都会超时,整个节点似乎只是“卡在”这个操作符上。有没有更好的方法来实现我在 Flink 中尝试做的事情?或者有什么方法可以解决检查点问题?
您可以使用 flink 提供的定时器功能,这些也是检查点。