FLINK - 将 SQL window 定期刷新元素以进行处理
FLINK - will SQL window flush the element on regular interval for processing
如果 TUMBLE window 会定期计算并发出要处理的元素,我感到很困惑。示例 我有一个查询,预计将在 10 秒的间隔内工作。
select id, key from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ;
现在假设:应用程序接收事件
- E1 @10:00:00
- E2 @10:00:05
- E3 @12:00:10
如您所见,E1 和 E2 在 5 秒内到达,E3 在 @12:00:15 到达。
- 当 E1 和 E2 发出以进行处理时,你能帮我吗?会是@10:00:11 吗?或者 E3 什么时候来然后查询将评估 window 并发出?
- 如果是在 E3 之后,那么有什么方法可以确保每 10 秒执行一次查询吗?
感谢您对此的帮助。
如果您正在使用事件时间处理,那么当水印通过 10:00:10 时,将发出结束于 10:00:10 的 window。如果以通常的 bounded-out-of-orderness 方式添加水印,并且没有其他事件,则在处理 E3 之前水印不会前进。
如果您需要考虑闲置的水印策略,我相信您唯一的选择是使用 DataStream API 创建流和 apply watermarking that deals with idle sources, and then convert the DataStream to a Table.
请注意,.withIdleness(...)
所做的是将流标记为空闲,这可以防止该流阻止水印。这解决了一个空闲流阻碍整个作业的问题如果有其他活动流。如果你想让水印在什么都没有发生的时候继续,你需要做一些更激烈的事情。
理想的解决方案是使用来自同一源的保持活动消息,这样您就知道空闲是真实的,而不是中断。如果做不到这一点,请参阅 ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor 的示例,了解如何使用计时器检测空闲并根据时间的流逝而不是新事件的到来来推进水印。 (请注意,此示例尚未更新为使用新的 WatermarkStrategy
界面。)
您可以配置 tableEnv 让 table 提前发出:
TableConfig config = bbTableEnv.getConfig();
config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
config.getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
如果 TUMBLE window 会定期计算并发出要处理的元素,我感到很困惑。示例 我有一个查询,预计将在 10 秒的间隔内工作。
select id, key from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ;
现在假设:应用程序接收事件
- E1 @10:00:00
- E2 @10:00:05
- E3 @12:00:10
如您所见,E1 和 E2 在 5 秒内到达,E3 在 @12:00:15 到达。
- 当 E1 和 E2 发出以进行处理时,你能帮我吗?会是@10:00:11 吗?或者 E3 什么时候来然后查询将评估 window 并发出?
- 如果是在 E3 之后,那么有什么方法可以确保每 10 秒执行一次查询吗?
感谢您对此的帮助。
如果您正在使用事件时间处理,那么当水印通过 10:00:10 时,将发出结束于 10:00:10 的 window。如果以通常的 bounded-out-of-orderness 方式添加水印,并且没有其他事件,则在处理 E3 之前水印不会前进。
如果您需要考虑闲置的水印策略,我相信您唯一的选择是使用 DataStream API 创建流和 apply watermarking that deals with idle sources, and then convert the DataStream to a Table.
请注意,.withIdleness(...)
所做的是将流标记为空闲,这可以防止该流阻止水印。这解决了一个空闲流阻碍整个作业的问题如果有其他活动流。如果你想让水印在什么都没有发生的时候继续,你需要做一些更激烈的事情。
理想的解决方案是使用来自同一源的保持活动消息,这样您就知道空闲是真实的,而不是中断。如果做不到这一点,请参阅 ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor 的示例,了解如何使用计时器检测空闲并根据时间的流逝而不是新事件的到来来推进水印。 (请注意,此示例尚未更新为使用新的 WatermarkStrategy
界面。)
您可以配置 tableEnv 让 table 提前发出:
TableConfig config = bbTableEnv.getConfig();
config.getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
config.getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");