flink 是否保存带有水印的关闭事件时间 windows 的历史记录?
Does flink hold history of closed event time windows with watermark?
我有一个 flink 作业,它使用带有事件时间和水印的键控滚动 windows 聚合数据。
我的问题是flink是否持有他已经关闭的状态windows?
否则,我没有其他解释为什么属于以前从未打开过的 window 的事件会打开 window 而不是立即将其删除。
假设我们的 windows 持续 1 小时,forBoundedOutOfOrderness 持续 10 分钟
让我们举个例子:
event1 = ("2022-01-01T08:25:00Z") => window 解雇
event2 = ("2022-01-01T09:25:00Z") => window 已创建但未按预期触发
event3 = ("2022-01-01T05:25:00Z") => 将与事件 4 聚合而不是丢弃(为什么?)
event4 = ("2022-01-01T05:40:00Z") => 将与事件 3 聚合而不是丢弃(为什么?)
val stream = env
.fromSource(
kafkaSource,
WatermarkStrategy
.forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
.withIdleness(Duration.ofSeconds(idleness))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
: Long = {
logger.info(
LogMessage(
element._3.orgId,
s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
element._3.flowId
)
)
element._3.updateTime.asEpoch
}
}),
s"Source - $kConsumeTopic"
)
stream
.keyBy(element => (element._2.orgId -> element._2.procUid))
.window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
.reduce(new ReduceFunc)
.name("Aggregated EnrichedProcess")
.sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
.name(s"Sink -> $kProduceTopic")
编辑:
我对此进行测试的方法是使用 docker 撰写的集成测试。我正在向 Kafka 生成一个事件 => 由 Flink 作业消耗并发送到 Kafka => 检查 kafka 的内容。
当我在发送 event3 和 event4 之间放置 30 秒的睡眠时,它们会被删除。这是我所期待的行为。
val producer = new Producer(producerTopic)
val consumer = new Consumer(consumerTopic, groupId)
producer.send(event1)
producer.send(event2)
Thread.sleep(30000)
producer.send(event3)
Thread.sleep(30000)
producer.send(event4)
val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()
但现在更奇怪的是,为什么当我将睡眠时间设置为 10 秒而不是 30 秒时,我只收到第一种情况(水印应该已经更新(周期性水印生成器的默认值为 200 毫秒)
执行摘要:
Flink 的 Non-determinism event-time-based 逻辑来自混合处理时间和事件时间——就像周期性水印生成器和空闲检测一样。只有当您永远不会有延迟事件或闲置资源时,您才能确定确定性结果。
更多详情:
如你所料
event3 = ("2022-01-01T05:25:00Z")
要迟到,只有足够大的水印成功先到,才是真正的迟到。使用 forBoundedOutOfOrderness
策略无法保证这一点——这是一个 periodic 水印生成器,每 200 毫秒生成一次水印。所以可能是在event3和event4之间产生了一个基于event2时间戳的水印。
这是一种可能的解释;根据具体情况,可能还有其他人。例如,随着所有这些睡眠的进行,水印生成器的一个并行实例空闲至少一分钟,这可能与产生所观察到的行为有关(取决于 idleness[ 的值=43=], 等等).
更多背景:
在并行度 > 1 的情况下,有多个独立的水印策略实例,每个实例都根据它们处理的事件做自己的事情。
具有多个输入通道的运算符,例如键控 window,将通过将输入水印(来自所有 non-idle 个通道)中的最小值作为它们自己的水印来组合这些水印。
回答原问题:
Flink 是否保留已经关闭的 windows 的状态? 否。一旦允许的延迟(如果有)过期,事件的状态时间 window 被清除。
我有一个 flink 作业,它使用带有事件时间和水印的键控滚动 windows 聚合数据。
我的问题是flink是否持有他已经关闭的状态windows? 否则,我没有其他解释为什么属于以前从未打开过的 window 的事件会打开 window 而不是立即将其删除。
假设我们的 windows 持续 1 小时,forBoundedOutOfOrderness 持续 10 分钟
让我们举个例子:
event1 = ("2022-01-01T08:25:00Z") => window 解雇
event2 = ("2022-01-01T09:25:00Z") => window 已创建但未按预期触发
event3 = ("2022-01-01T05:25:00Z") => 将与事件 4 聚合而不是丢弃(为什么?)
event4 = ("2022-01-01T05:40:00Z") => 将与事件 3 聚合而不是丢弃(为什么?)
val stream = env
.fromSource(
kafkaSource,
WatermarkStrategy
.forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
.withIdleness(Duration.ofSeconds(idleness))
.withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
: Long = {
logger.info(
LogMessage(
element._3.orgId,
s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
element._3.flowId
)
)
element._3.updateTime.asEpoch
}
}),
s"Source - $kConsumeTopic"
)
stream
.keyBy(element => (element._2.orgId -> element._2.procUid))
.window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
.reduce(new ReduceFunc)
.name("Aggregated EnrichedProcess")
.sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
.name(s"Sink -> $kProduceTopic")
编辑: 我对此进行测试的方法是使用 docker 撰写的集成测试。我正在向 Kafka 生成一个事件 => 由 Flink 作业消耗并发送到 Kafka => 检查 kafka 的内容。
当我在发送 event3 和 event4 之间放置 30 秒的睡眠时,它们会被删除。这是我所期待的行为。
val producer = new Producer(producerTopic)
val consumer = new Consumer(consumerTopic, groupId)
producer.send(event1)
producer.send(event2)
Thread.sleep(30000)
producer.send(event3)
Thread.sleep(30000)
producer.send(event4)
val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()
但现在更奇怪的是,为什么当我将睡眠时间设置为 10 秒而不是 30 秒时,我只收到第一种情况(水印应该已经更新(周期性水印生成器的默认值为 200 毫秒)
执行摘要:
Flink 的Non-determinism event-time-based 逻辑来自混合处理时间和事件时间——就像周期性水印生成器和空闲检测一样。只有当您永远不会有延迟事件或闲置资源时,您才能确定确定性结果。
更多详情:
如你所料
event3 = ("2022-01-01T05:25:00Z")
要迟到,只有足够大的水印成功先到,才是真正的迟到。使用 forBoundedOutOfOrderness
策略无法保证这一点——这是一个 periodic 水印生成器,每 200 毫秒生成一次水印。所以可能是在event3和event4之间产生了一个基于event2时间戳的水印。
这是一种可能的解释;根据具体情况,可能还有其他人。例如,随着所有这些睡眠的进行,水印生成器的一个并行实例空闲至少一分钟,这可能与产生所观察到的行为有关(取决于 idleness[ 的值=43=], 等等).
更多背景:
在并行度 > 1 的情况下,有多个独立的水印策略实例,每个实例都根据它们处理的事件做自己的事情。
具有多个输入通道的运算符,例如键控 window,将通过将输入水印(来自所有 non-idle 个通道)中的最小值作为它们自己的水印来组合这些水印。
回答原问题:
Flink 是否保留已经关闭的 windows 的状态? 否。一旦允许的延迟(如果有)过期,事件的状态时间 window 被清除。