Apache Flink 中是否有任何触发器每分钟触发一次延迟数据?
Is there any trigger in Apache Flink which fires every minutes on late data?
我的 Flink 作业中有很多延迟事件,因此将 allowedLateness() 设置为 10 分钟(使用 TumblingEventTimeWindows 和复杂的 AggregateFunction 在每个 window 上运行)
似乎聚合发生在每个迟到的事件上,但我想减少触发频率。
- 有没有每分钟触发一次的触发器?
- 触发器对延迟事件有影响吗?
- 是否有仅对延迟事件有效的触发器?
澄清一下,我在下面提到的延迟事件,是那些仍在您设置的允许延迟范围内的延迟事件。
Is there any trigger which fires only in every minute ?
没有。但是您可以自定义您自己的触发器,尝试使用事件计时器服务来实现。
Do the triggers affect to late events ?
是的。延迟事件将通过调用onElement函数在触发器中引用。
Are there any triggers which effect only to the late events ?
您可以像这样在自定义触发器中过滤延迟事件:
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
您可以使用您想要的任何行为来实现自定义 Trigger。
如果查看 EventTimeTrigger
的实现,翻滚事件时间 windows、
的默认触发器
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
您会看到,只要在流的水印达到或超过 window 的末尾后将事件分配给 window,就会触发 returns FIRE。这就是为什么每个迟到的事件都会导致另一次触发。
另一种方法是不允许迟到,而是将迟到的事件收集到它们自己的流中 (using a side output),然后独立处理迟到的事件。
我认为最好的方法是实施 processes function with custom watermark.
我的 Flink 作业中有很多延迟事件,因此将 allowedLateness() 设置为 10 分钟(使用 TumblingEventTimeWindows 和复杂的 AggregateFunction 在每个 window 上运行)
似乎聚合发生在每个迟到的事件上,但我想减少触发频率。
- 有没有每分钟触发一次的触发器?
- 触发器对延迟事件有影响吗?
- 是否有仅对延迟事件有效的触发器?
澄清一下,我在下面提到的延迟事件,是那些仍在您设置的允许延迟范围内的延迟事件。
Is there any trigger which fires only in every minute ?
没有。但是您可以自定义您自己的触发器,尝试使用事件计时器服务来实现。
Do the triggers affect to late events ?
是的。延迟事件将通过调用onElement函数在触发器中引用。
Are there any triggers which effect only to the late events ?
您可以像这样在自定义触发器中过滤延迟事件:
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
您可以使用您想要的任何行为来实现自定义 Trigger。
如果查看 EventTimeTrigger
的实现,翻滚事件时间 windows、
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
您会看到,只要在流的水印达到或超过 window 的末尾后将事件分配给 window,就会触发 returns FIRE。这就是为什么每个迟到的事件都会导致另一次触发。
另一种方法是不允许迟到,而是将迟到的事件收集到它们自己的流中 (using a side output),然后独立处理迟到的事件。
我认为最好的方法是实施 processes function with custom watermark.