Flink 作业在 10 分钟后中断
Flink job is interrupted after 10 minutes
我有一个带有全局 window 和自定义进程的 flink 作业。
该过程在大约 10 分钟后因下一个错误而失败:
java.io.InterruptedIOException
这是我的工作:
SingleOutputStreamOperator<CustomEntry> result = stream
.keyBy(r -> r.getId())
.window(GlobalWindows.create())
.trigger(new CustomTriggeringFunction())
.process(new CustomProcessingFunction());
CustomProcessingFunction 运行 很长时间(超过 10 分钟),但 10 分钟后,进程停止并在 InterruptedIOException
是否可以增加flink job的超时时间?
从 Flink 的角度来看,对于用户函数来说,运行 的时间过长是不合理的。这个 window 过程函数在做什么,需要 10 多分钟?也许您可以重组它以改用异步 i/o 运算符,这样您就不会完全阻塞管道。
也就是说,10 分钟是默认的检查点超时间隔,并且您在 运行ning 期间阻止检查点完成。所以你可以尝试增加 execution.checkpointing.timeout
。
如果作业因为检查点超时而失败,那将有所帮助。或者您可以从默认值 (0) 增加 execution.checkpointing.tolerable-failed-checkpoints
。
我有一个带有全局 window 和自定义进程的 flink 作业。
该过程在大约 10 分钟后因下一个错误而失败:
java.io.InterruptedIOException
这是我的工作:
SingleOutputStreamOperator<CustomEntry> result = stream
.keyBy(r -> r.getId())
.window(GlobalWindows.create())
.trigger(new CustomTriggeringFunction())
.process(new CustomProcessingFunction());
CustomProcessingFunction 运行 很长时间(超过 10 分钟),但 10 分钟后,进程停止并在 InterruptedIOException
是否可以增加flink job的超时时间?
从 Flink 的角度来看,对于用户函数来说,运行 的时间过长是不合理的。这个 window 过程函数在做什么,需要 10 多分钟?也许您可以重组它以改用异步 i/o 运算符,这样您就不会完全阻塞管道。
也就是说,10 分钟是默认的检查点超时间隔,并且您在 运行ning 期间阻止检查点完成。所以你可以尝试增加 execution.checkpointing.timeout
。
如果作业因为检查点超时而失败,那将有所帮助。或者您可以从默认值 (0) 增加 execution.checkpointing.tolerable-failed-checkpoints
。