Flink - 如何结合处理时间和计数触发器?
Flink - How to combine process time and count trigger?
我有一个 Flink 流作业,我正在其中创建 windows 基于一些键并将数据点添加到一个数据点。
.window(SlidingProcessingTimeWindows.of(Time.days(1), Time.minutes(5)))
.trigger(CountTrigger.of(5))
.window(<ProcessWindowFunction>)
我正在使用上面的代码段创建大小为 1 天的幻灯片 window,幻灯片为 5 分钟。此外,计数触发器是在累积 5 个数据点后触发处理函数。
除此之外,我想为每个 slide
发生的事件触发处理函数。这意味着,在累积 1 天的数据点 (window size
) 之前,CountTrigger
将触发 process
函数,并且一旦创建 1 天的价值点并且每个 window 幻灯片5 分钟,我想为每个数据点触发处理函数,而不是等待 CountTrigger
累积 10 个数据点。有人可以帮我解决这个问题吗?
扩展 org.apache.flink.streaming.api.windowing.triggers.CountTrigger
并覆盖 onProcessingTime
方法。在那里实施您的处理时间逻辑。然后使用此触发器而不是普通的 CountTrigger
.
请注意,这会非常痛苦。每个事件将分配给总共 288 windows(24 小时/5 分钟)。这意味着每个事件都会触发对您的 ProcessWindowFunction 的 288 次调用。
如果您发现需要对此进行优化,您可能可以通过精心实现的 KeyedProcessFunction 获得更好的性能。
我有一个 Flink 流作业,我正在其中创建 windows 基于一些键并将数据点添加到一个数据点。
.window(SlidingProcessingTimeWindows.of(Time.days(1), Time.minutes(5)))
.trigger(CountTrigger.of(5))
.window(<ProcessWindowFunction>)
我正在使用上面的代码段创建大小为 1 天的幻灯片 window,幻灯片为 5 分钟。此外,计数触发器是在累积 5 个数据点后触发处理函数。
除此之外,我想为每个 slide
发生的事件触发处理函数。这意味着,在累积 1 天的数据点 (window size
) 之前,CountTrigger
将触发 process
函数,并且一旦创建 1 天的价值点并且每个 window 幻灯片5 分钟,我想为每个数据点触发处理函数,而不是等待 CountTrigger
累积 10 个数据点。有人可以帮我解决这个问题吗?
扩展 org.apache.flink.streaming.api.windowing.triggers.CountTrigger
并覆盖 onProcessingTime
方法。在那里实施您的处理时间逻辑。然后使用此触发器而不是普通的 CountTrigger
.
请注意,这会非常痛苦。每个事件将分配给总共 288 windows(24 小时/5 分钟)。这意味着每个事件都会触发对您的 ProcessWindowFunction 的 288 次调用。
如果您发现需要对此进行优化,您可能可以通过精心实现的 KeyedProcessFunction 获得更好的性能。