Apache flink 了解水印空闲以及与有界持续时间和 window 持续时间的关系
Apache flink understanding of watermark idleness and relation to Bounded duration and window duration
我有一个配置了 Kafka 连接器的 Flink 管道。
我已将水印生成频率设置为 2 秒,使用:
env.getConfig().setAutoWatermarkInterval(2000);
现在我的翻滚 window 是流 window 的 60 秒,我们在其中进行一些聚合,并且我们根据我们的数据字段之一的时间戳进行基于事件时间的处理。
我没有为我的水印策略或流配置 allowedLateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Q.1 根据我正在阅读的内容,我的时间 0-60 的 window 将在 90 秒后计算,30-90 将在 120 秒后计算,依此类推。然而,由于我们正在进行翻滚 window,即没有重叠,我的猜测是没有 30-90 window,0-60 之后的下一个 window 是 60-120,在 150 时触发第二个标记,我说的对吗?
Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。时间戳为 45 的事件在 90 秒后到达被认为是乱序的,并且将在第一个 window 即 0-60.For window 60-120 之外,事件时间戳不会匹配,因此它将被丢弃并且不包含在 window 在 150 秒标记处触发,对吗?
问题 3。对于源空闲持续时间,我选择 120,表示如果主题的任何 Kakfa 分区没有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是关于这个数字的选择,即 2 分钟,以及它是否与 window 持续时间(60 秒)或乱序(30 秒)有关。如果是这样,我应该记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非推进水印而延迟数据滞留?
或者 120 的等待时间是否太长以至于我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间的值以确保 0 数据丢失?
编辑:添加了更多代码
Q1:是的,没错。
Q2:是的,这也是正确的。
Q3:此处的细节取决于您是否让 Kafka 源应用 WatermarkStrategy,在这种情况下它将执行每个分区水印,或者 WatermarkStrategy 是否作为单独的运算符部署在某个地方(通常立即链接)之后)源运算符。
在第一种情况下(每个分区的水印由 FlinkKafkaConsumer
完成)你将做这样的事情:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);
DataStream<MyType> stream = env.addSource(kafkaSource);
而在源之后单独添加水印,看起来像这样:
DataStream<MyType> events = env.addSource(...);
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration ...)
.withTimestampAssigner((event, timestamp) -> event.timestamp));
当基于每个分区完成水印时,单个空闲分区将阻止处理该分区的 consumer/source 实例的水印——直到空闲超时开始(120 秒你的例子)。相比之下,如果水印是在链接到源的单独运算符中完成的,那么只有当分配给该源实例(具有空闲分区的分区)的所有分区都空闲时,水印才会被保留(同样,对于 120秒)。
但不管这些细节如何,都希望不会丢失数据。会有一段时间 windows 不会被触发(因为水印没有推进),但事件将继续被处理并分配给它们适当的 windows。一旦水印恢复,那些 windows 将关闭并提供他们的结果。
发生数据丢失的情况是分区空闲,因为上游的某些故障导致中断,最终产生一堆延迟事件。在空闲超时到期后,水印将前进,如果源空闲是因为上游发生了某些事情(而不是因为根本没有事件),那么最终到达的那些事件将会延迟(除非你的边界外 -序延迟足够大以容纳它们)。如果您选择忽略迟到的事件,那么这些事件将会丢失。
我有一个配置了 Kafka 连接器的 Flink 管道。
我已将水印生成频率设置为 2 秒,使用:
env.getConfig().setAutoWatermarkInterval(2000);
现在我的翻滚 window 是流 window 的 60 秒,我们在其中进行一些聚合,并且我们根据我们的数据字段之一的时间戳进行基于事件时间的处理。
我没有为我的水印策略或流配置 allowedLateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Q.1 根据我正在阅读的内容,我的时间 0-60 的 window 将在 90 秒后计算,30-90 将在 120 秒后计算,依此类推。然而,由于我们正在进行翻滚 window,即没有重叠,我的猜测是没有 30-90 window,0-60 之后的下一个 window 是 60-120,在 150 时触发第二个标记,我说的对吗?
Q.2 如果没有 allowedLateness,所有迟到的数据都将被丢弃,例如。时间戳为 45 的事件在 90 秒后到达被认为是乱序的,并且将在第一个 window 即 0-60.For window 60-120 之外,事件时间戳不会匹配,因此它将被丢弃并且不包含在 window 在 150 秒标记处触发,对吗?
问题 3。对于源空闲持续时间,我选择 120,表示如果主题的任何 Kakfa 分区没有数据,则在 2 分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是关于这个数字的选择,即 2 分钟,以及它是否与 window 持续时间(60 秒)或乱序(30 秒)有关。如果是这样,我应该记住什么来进行适当的选择,这样我就不会因为空闲分区导致的非推进水印而延迟数据滞留?
或者 120 的等待时间是否太长以至于我可能会丢失数据,因此我应该将其设置为远小于 OutOfOrderness 持续时间的值以确保 0 数据丢失?
编辑:添加了更多代码
Q1:是的,没错。
Q2:是的,这也是正确的。
Q3:此处的细节取决于您是否让 Kafka 源应用 WatermarkStrategy,在这种情况下它将执行每个分区水印,或者 WatermarkStrategy 是否作为单独的运算符部署在某个地方(通常立即链接)之后)源运算符。
在第一种情况下(每个分区的水印由 FlinkKafkaConsumer
完成)你将做这样的事情:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);
DataStream<MyType> stream = env.addSource(kafkaSource);
而在源之后单独添加水印,看起来像这样:
DataStream<MyType> events = env.addSource(...);
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration ...)
.withTimestampAssigner((event, timestamp) -> event.timestamp));
当基于每个分区完成水印时,单个空闲分区将阻止处理该分区的 consumer/source 实例的水印——直到空闲超时开始(120 秒你的例子)。相比之下,如果水印是在链接到源的单独运算符中完成的,那么只有当分配给该源实例(具有空闲分区的分区)的所有分区都空闲时,水印才会被保留(同样,对于 120秒)。
但不管这些细节如何,都希望不会丢失数据。会有一段时间 windows 不会被触发(因为水印没有推进),但事件将继续被处理并分配给它们适当的 windows。一旦水印恢复,那些 windows 将关闭并提供他们的结果。
发生数据丢失的情况是分区空闲,因为上游的某些故障导致中断,最终产生一堆延迟事件。在空闲超时到期后,水印将前进,如果源空闲是因为上游发生了某些事情(而不是因为根本没有事件),那么最终到达的那些事件将会延迟(除非你的边界外 -序延迟足够大以容纳它们)。如果您选择忽略迟到的事件,那么这些事件将会丢失。