状态生存时间。它如何与 Apache Flink CEP 模式一起工作?
State Time-to-Live. How it work with Apache Flink CEP Pattern?
我阅读了关于状态生存时间的 Apache Flink 文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
还有两个瞬间不明白
1)
StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//And use in my Process Function
valueStateDescriptor.enableTimeToLive(ttl);
如果我将 15:00 中的某些元素放入 ValueState,然后使用保存点停止我的工作,并且仅在 17:00 我将从最后一个保存点开始我的工作。
价值状态会很清楚,对吗?
2)如果我使用 Apache Flink CEP 模式:
.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));
如果我将在 15:00 中获取 A 元素,然后使用保存点停止我的工作,并且仅在 17:00 中我将从上一个保存点开始我的工作。并获取 B 元素,模式不匹配,对吗?
它 (ttl) 如何与 Apache Flink CEP 模式一起工作?
谢谢。
我了解CEP,我真的在使用摄取时间。我将尝试解释:我将 Process Function 与 ValueState 和 timerTime 一起使用,并在 onTimer 方法中清除状态。我在状态 (keyedstate) 中放入一些元素,将计时器设置为 1 小时并执行一些逻辑。基本上 value state + timer 用作输出限制器(1 小时内输出 1 条消息)。在我的公司,我们需要在集群上停止 运行 作业(带保存点),然后几个小时后我们需要从上一个保存点重新启动作业。现在我没有使用 TTL,重启后,我的 ValueState.value 不为空。我希望在不到一个小时内重新启动后 ValueState.value 不为空(如果我在停止前输入状态),但超过一小时值状态始终为空。
P.s 我使用 RrocksDb 状态后端,间隔为 1s 的增量检查点。它完美地工作。))
If I will put in ValueState some element in 15:00 and then stop my job with savepoint and only in 17:00 i will start my job from last savepoint.
Value State will be clear, am i right?
(1) 这个 ValueState 实际上会消失,但我不确定它是否真的会消失。如果您的状态 TTL 配置包含 cleanupFullSnapshot()
,那么如果您在 16:00 之后获取保存点,则可以保证保存点不会包含相关状态 。但在这种情况下,似乎这些都不是真的,所以状态在快照中。我不知道在快照还原期间或在下一个 clean-up 期间是否删除了已过期的状态。但是因为你指定了NeverReturnExpired
,所以不会影响结果。
How [does] it (ttl) work with Apache Flink CEP Pattern?
(2) CEP 不使用状态 TTL。只要它可能影响模式匹配,CEP 就会保持状态,并在不再需要状态时明确清除状态。从您对这个问题的措辞来看,我假设您使用的是处理时间,而不是事件时间。在这种情况下,模式将不会在 60 分钟内匹配。但是如果你要使用事件时间,那么水印将用于确定已经过去了多少时间,并且停机时间对模式匹配没有影响。
更新:
我现在看到您正在使用摄取时间,并依靠计时器来清除状态。对于摄取时间,您可以选择使用事件时间或处理时间计时器。如果您使用处理时间计时器,那么在作业未 运行 时应该触发的任何计时器将在作业重新启动后立即触发。使用事件时间计时器,它们将在水印达到这些计时器中的时间时立即触发。由于水印未保存在保存点中,因此在创建任何水印之前,某些事件必须流动和处理(并且对于周期性水印,auto-watermark 间隔必须过去)。
我阅读了关于状态生存时间的 Apache Flink 文档 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
还有两个瞬间不明白
1)
StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//And use in my Process Function
valueStateDescriptor.enableTimeToLive(ttl);
如果我将 15:00 中的某些元素放入 ValueState,然后使用保存点停止我的工作,并且仅在 17:00 我将从最后一个保存点开始我的工作。
价值状态会很清楚,对吗?
2)如果我使用 Apache Flink CEP 模式:
.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));
如果我将在 15:00 中获取 A 元素,然后使用保存点停止我的工作,并且仅在 17:00 中我将从上一个保存点开始我的工作。并获取 B 元素,模式不匹配,对吗?
它 (ttl) 如何与 Apache Flink CEP 模式一起工作?
谢谢。
我了解CEP,我真的在使用摄取时间。我将尝试解释:我将 Process Function 与 ValueState 和 timerTime 一起使用,并在 onTimer 方法中清除状态。我在状态 (keyedstate) 中放入一些元素,将计时器设置为 1 小时并执行一些逻辑。基本上 value state + timer 用作输出限制器(1 小时内输出 1 条消息)。在我的公司,我们需要在集群上停止 运行 作业(带保存点),然后几个小时后我们需要从上一个保存点重新启动作业。现在我没有使用 TTL,重启后,我的 ValueState.value 不为空。我希望在不到一个小时内重新启动后 ValueState.value 不为空(如果我在停止前输入状态),但超过一小时值状态始终为空。
P.s 我使用 RrocksDb 状态后端,间隔为 1s 的增量检查点。它完美地工作。))
If I will put in ValueState some element in 15:00 and then stop my job with savepoint and only in 17:00 i will start my job from last savepoint. Value State will be clear, am i right?
(1) 这个 ValueState 实际上会消失,但我不确定它是否真的会消失。如果您的状态 TTL 配置包含 cleanupFullSnapshot()
,那么如果您在 16:00 之后获取保存点,则可以保证保存点不会包含相关状态 。但在这种情况下,似乎这些都不是真的,所以状态在快照中。我不知道在快照还原期间或在下一个 clean-up 期间是否删除了已过期的状态。但是因为你指定了NeverReturnExpired
,所以不会影响结果。
How [does] it (ttl) work with Apache Flink CEP Pattern?
(2) CEP 不使用状态 TTL。只要它可能影响模式匹配,CEP 就会保持状态,并在不再需要状态时明确清除状态。从您对这个问题的措辞来看,我假设您使用的是处理时间,而不是事件时间。在这种情况下,模式将不会在 60 分钟内匹配。但是如果你要使用事件时间,那么水印将用于确定已经过去了多少时间,并且停机时间对模式匹配没有影响。
更新:
我现在看到您正在使用摄取时间,并依靠计时器来清除状态。对于摄取时间,您可以选择使用事件时间或处理时间计时器。如果您使用处理时间计时器,那么在作业未 运行 时应该触发的任何计时器将在作业重新启动后立即触发。使用事件时间计时器,它们将在水印达到这些计时器中的时间时立即触发。由于水印未保存在保存点中,因此在创建任何水印之前,某些事件必须流动和处理(并且对于周期性水印,auto-watermark 间隔必须过去)。