Flink Missing Events With Windowed Processor(Event Time Windows) and Kafka Source
Flink Missing Events With Windowed Processor(Event Time Windows) and Kafka Source
我们有一个流式作业,它有 20 个独立的管道,每个管道都有 one/many 个 Kafka 主题源,一些管道有窗口处理器,其他管道是非窗口处理器。
我们注意到当作业停止并需要一些时间 recover/when 作业需要重新启动时,窗口处理器管道的数据丢失。
我已经为所有操作员设置了 UID,我可以在日志中看到偏移量正在从 Kafka 消费者操作员的保存点恢复
我们正在使用 BoundedOutOfOrdernessTimestampExtractor 根据事件时间分配水印。
public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{
public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Event element) {
try {
log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Long ts = null;
ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
ts = ts.toString().length() < 13 ? ts * 1000 : ts;
return ts;
}
}
管道配置看起来像这样。
- 非窗口
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
- 开窗
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.assignTimestampsAndWatermarks(
new KafkaEventTimestampExtractor(Time.seconds(4)))
.windowAll(TumblingEventTimeWindows.of(
Time.milliseconds(kafkaSourceSet.bufferWindowSize))
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
假设作业停止了 30 分钟,在这种情况下,我们不使用 window 处理器的管道不会丢失任何数据,但 window 中会丢失部分数据为这 30 分钟编辑处理器
当我们增加 TimeWinows 中的乱序事件延迟时,即 - 我们将它从 4 秒增加到 30 分钟,那么如果应用程序在 [=56 内启动,事件就不会丢失=] 离解决方案还差得很远,因为超过 1 分钟的延迟对我们来说是不可行的,而且会有太多的实时 windows,这对我们来说意味着巨大的基础设施变化。
我能想到的唯一可以解释这一点的情况是事件时间戳是否受到中断的影响。然后 30 分钟的中断会导致时间戳出现 30 分钟的差距,并且使用 out-or-order 摄取,4 秒 bounded-out-of-orderness 策略将产生一些延迟事件,这些事件将被 window.
这是由于我的管道中的错误而发生的,而不是在 flinkKafkaConsumer 中使用时间戳分配器,而是将其添加到从 flinkKafkaConsumer 生成的数据流中。
此更改已解决我这边的自动恢复问题,但如果手动重启 post 对管道进行任何更改,最后 window 时仍然会丢失一些数据作业已停止。
注意:-- 我们正在使用检查点进行手动恢复。
根据文档,检查点是作业失败时自动恢复的理想选择。
如果我们需要创建一个保存点,以防我们需要对管道进行一些更改并手动重新启动它,或者我们可以使用检查点进行完全恢复,那么对此的任何说明都会有所帮助。
我们在使用保存点的情况下唯一关心的是对可能发生的相同事件的重新处理,这在少数情况下对我们来说并不理想。
我们有一个流式作业,它有 20 个独立的管道,每个管道都有 one/many 个 Kafka 主题源,一些管道有窗口处理器,其他管道是非窗口处理器。
我们注意到当作业停止并需要一些时间 recover/when 作业需要重新启动时,窗口处理器管道的数据丢失。
我已经为所有操作员设置了 UID,我可以在日志中看到偏移量正在从 Kafka 消费者操作员的保存点恢复
我们正在使用 BoundedOutOfOrdernessTimestampExtractor 根据事件时间分配水印。
public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{
public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Event element) {
try {
log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Long ts = null;
ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
ts = ts.toString().length() < 13 ? ts * 1000 : ts;
return ts;
}
}
管道配置看起来像这样。
- 非窗口
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
- 开窗
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.assignTimestampsAndWatermarks(
new KafkaEventTimestampExtractor(Time.seconds(4)))
.windowAll(TumblingEventTimeWindows.of(
Time.milliseconds(kafkaSourceSet.bufferWindowSize))
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
假设作业停止了 30 分钟,在这种情况下,我们不使用 window 处理器的管道不会丢失任何数据,但 window 中会丢失部分数据为这 30 分钟编辑处理器
当我们增加 TimeWinows 中的乱序事件延迟时,即 - 我们将它从 4 秒增加到 30 分钟,那么如果应用程序在 [=56 内启动,事件就不会丢失=] 离解决方案还差得很远,因为超过 1 分钟的延迟对我们来说是不可行的,而且会有太多的实时 windows,这对我们来说意味着巨大的基础设施变化。
我能想到的唯一可以解释这一点的情况是事件时间戳是否受到中断的影响。然后 30 分钟的中断会导致时间戳出现 30 分钟的差距,并且使用 out-or-order 摄取,4 秒 bounded-out-of-orderness 策略将产生一些延迟事件,这些事件将被 window.
这是由于我的管道中的错误而发生的,而不是在 flinkKafkaConsumer 中使用时间戳分配器,而是将其添加到从 flinkKafkaConsumer 生成的数据流中。
此更改已解决我这边的自动恢复问题,但如果手动重启 post 对管道进行任何更改,最后 window 时仍然会丢失一些数据作业已停止。
注意:-- 我们正在使用检查点进行手动恢复。 根据文档,检查点是作业失败时自动恢复的理想选择。
如果我们需要创建一个保存点,以防我们需要对管道进行一些更改并手动重新启动它,或者我们可以使用检查点进行完全恢复,那么对此的任何说明都会有所帮助。
我们在使用保存点的情况下唯一关心的是对可能发生的相同事件的重新处理,这在少数情况下对我们来说并不理想。