Flink 中的早期触发 - 如何使用触发器将早期 window 结果发送到不同的 DataStream

Early firing in Flink - how to emit early window results to a different DataStream with a trigger

我正在使用使用一天翻滚 window 的代码,并且希望每小时将早期结果发送到不同的 DataStream。 我知道触发器是一种到达这里的方式,但我真的不知道它是如何工作的。

当前代码如下:

myStream
     .keyBy(..)
     .window(TumblingEventTimeWindows.of(Time.days(1)))
     .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

根据我的理解,我应该注册一个触发器,然后在它的 onEventTime 方法上获取一个 TriggerContext,然后我可以从那里将数据发送到标记的输出。但是如何从那里获取 MyAggregateFunction 的当前状态?或者我需要在 onEventTime() 内部进行自己的计算吗?

此外,文档指出 "By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner."。我的某一天 window 仍然会正确触发,还是我需要以不同的方式触发它?

另一种方法是创建两个不同的运算符 - 一个 windows 1 小时,另一个 windows 1 天。触发器会是首选方法吗?

这里有不少问题。我会试着问问他们所有人。首先,如果您使用 trigger() 指定您自己的触发器,这意味着您将有效地覆盖默认触发器,因此 window 可能不会像默认情况下那样工作。因此,例如,如果您创建了 1 天的事件时间翻滚 window,但覆盖了一个触发器,使其每 20 个元素触发一次,它永远不会根据事件时间触发 [=21] =].

现在,在您的自定义触发器触发后,MyAggregateFunction 的输出将传递给 MyProcessWindowFunction,因此它的工作方式与默认触发器相同,您不需要访问触发器内部的 MyAggregateFunction

最后,虽然在技术上可能实现每小时触发部分结果的触发器,但我个人的意见是您可能应该使用两个单独的 windows。虽然此解决方案可能会产生稍大的开销并可能导致更大的状态,但它应该更清晰、更容易实现,并且最终更能抵抗错误。

通常当我遇到像这样更复杂的行为时,我会使用 KeyedProcessFunction。您可以聚合(并保存在状态中)每小时和每天的结果,根据需要设置计时器,并使用辅助输出来获取每小时结果,而不是使用常规输出来获取每日结果。

与其使用自定义 Trigger,不如使用两层 windowing 更简单,其中每小时的结果进一步汇总到每天的结果中。像这样:

hourlyResults = myStream
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

dailyResults = hourlyResults
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.days(1)))
    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())

hourlyResults.addSink(...)
dailyResults.addSink(...)

请注意,window 的结果不是 KeyedStream,因此您需要再次使用 keyBy,除非您可以安排利用 reinterpretAsKeyedStreamdocs).