在处理后,在 Flink 中将元素传回输入流?
Passing elements back to the input stream, after processing, in Flink?
场景:
我有一个来自传感器的事件流。事件可以是 T 型 或 J 型。
- T 型事件有事件发生时间戳。
- J 类事件有开始和结束时间戳。
根据J-Type事件的起止时间戳,对时间范围内的所有T-type事件应用聚合逻辑,并将结果写入DB。
为此,我创建了一个自定义触发器,它会在收到 J-Type 事件时触发。在我的自定义 ProcessWindowFunction 中,我正在执行聚合逻辑和时间检查。
但是,可能存在一种情况,T 类事件不在当前 J 类事件的时间范围内。
在那种情况下,T型事件应该被推到下一个window,然后再清除当前的window。
解决思路:
在自定义 window 处理函数中将未处理的 T 型事件推送到 Kinesis 流(源)中。 (最坏情况的解决方案)
而不是 FIRE_AND_PURGE,使用 FIRE,在整个运行时保持状态。使用元素迭代器删除处理过的元素。 (不推荐,保持无限window)
想知道是否有任何方法可以将未处理的事件直接推回输入流(无需运动)。 (重新排队)
或
有没有办法在 keyBy 上下文中维护状态,以便我们对这些未处理的数据(之前或)与 window 元素一起执行计算。
这里有两种解决方法。它们的基本行为或多或少是相同的,但您可能会发现其中一个更容易理解、维护或测试。
至于你的问题,不,没有办法循环回(重新排队)未消费的事件而不将它们推回 Kinesis。但是只要坚持到需要它们就可以了。
解决方案 1:使用 RichFlatMapFunction
当 T 类事件到达时,将它们附加到 ListState
对象。当J型事件到来时,从列表中收集所有匹配的T型事件到输出,并更新列表以仅保留那些将属于以后的J型事件的T型事件。
解决方案 2:将 GlobalWindows 与自定义触发器和驱逐器一起使用
除了您已经完成的工作之外,实施一个 Evictor
,它(在 window 被触发后)仅从中删除 J 类型事件和所有匹配的 T 类型事件window.
更新:清除陈旧密钥/失效传感器的状态
对于解决方案 1,您可以使用 state TTL 来安排清除与死键关联的任何非活动状态。或者您可以使用 KeyedProcessFunction
而不是 RichFlatMapFunction
,并使用计时器来完成同样的事情。
使用 window API 管理陈旧密钥的状态可能不那么简单,但对于解决方案 2,我相信您可以扩展自定义触发器以包含一个超时,该超时将清除 window.如果您在 ProcessWindowFunction
中使用了全局状态,则需要依靠状态 TTL 来清理它。
场景:
我有一个来自传感器的事件流。事件可以是 T 型 或 J 型。
- T 型事件有事件发生时间戳。
- J 类事件有开始和结束时间戳。
根据J-Type事件的起止时间戳,对时间范围内的所有T-type事件应用聚合逻辑,并将结果写入DB。
为此,我创建了一个自定义触发器,它会在收到 J-Type 事件时触发。在我的自定义 ProcessWindowFunction 中,我正在执行聚合逻辑和时间检查。
但是,可能存在一种情况,T 类事件不在当前 J 类事件的时间范围内。 在那种情况下,T型事件应该被推到下一个window,然后再清除当前的window。
解决思路:
在自定义 window 处理函数中将未处理的 T 型事件推送到 Kinesis 流(源)中。 (最坏情况的解决方案)
而不是 FIRE_AND_PURGE,使用 FIRE,在整个运行时保持状态。使用元素迭代器删除处理过的元素。 (不推荐,保持无限window)
想知道是否有任何方法可以将未处理的事件直接推回输入流(无需运动)。 (重新排队)
或
有没有办法在 keyBy 上下文中维护状态,以便我们对这些未处理的数据(之前或)与 window 元素一起执行计算。
这里有两种解决方法。它们的基本行为或多或少是相同的,但您可能会发现其中一个更容易理解、维护或测试。
至于你的问题,不,没有办法循环回(重新排队)未消费的事件而不将它们推回 Kinesis。但是只要坚持到需要它们就可以了。
解决方案 1:使用 RichFlatMapFunction
当 T 类事件到达时,将它们附加到 ListState
对象。当J型事件到来时,从列表中收集所有匹配的T型事件到输出,并更新列表以仅保留那些将属于以后的J型事件的T型事件。
解决方案 2:将 GlobalWindows 与自定义触发器和驱逐器一起使用
除了您已经完成的工作之外,实施一个 Evictor
,它(在 window 被触发后)仅从中删除 J 类型事件和所有匹配的 T 类型事件window.
更新:清除陈旧密钥/失效传感器的状态
对于解决方案 1,您可以使用 state TTL 来安排清除与死键关联的任何非活动状态。或者您可以使用 KeyedProcessFunction
而不是 RichFlatMapFunction
,并使用计时器来完成同样的事情。
使用 window API 管理陈旧密钥的状态可能不那么简单,但对于解决方案 2,我相信您可以扩展自定义触发器以包含一个超时,该超时将清除 window.如果您在 ProcessWindowFunction
中使用了全局状态,则需要依靠状态 TTL 来清理它。