Flink Missing Events With Windowed Processor(Event Time Windows) and Kafka Source

Flink Missing Events With Windowed Processor(Event Time Windows) and Kafka Source

我们有一个流式作业,它有 20 个独立的管道,每个管道都有 one/many 个 Kafka 主题源,一些管道有窗口处理器,其他管道是非窗口处理器。

我们注意到当作业停止并需要一些时间 recover/when 作业需要重新启动时,窗口处理器管道的数据丢失。

public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{

    public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
        super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Event element) {
        try {
            log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        Long ts = null;
        ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
        ts = ts.toString().length() < 13 ? ts * 1000 : ts;
        return ts;
    }
}

管道配置看起来像这样。

SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();
SourceUtil
  .getEventDataStream(env, kafkaSourceSet)
  .assignTimestampsAndWatermarks(
    new KafkaEventTimestampExtractor(Time.seconds(4)))
  .windowAll(TumblingEventTimeWindows.of(
    Time.milliseconds(kafkaSourceSet.bufferWindowSize))
  .process(new S3EventProcessor()).uid(“…..**)
  .addSink();

我能想到的唯一可以解释这一点的情况是事件时间戳是否受到中断的影响。然后 30 分钟的中断会导致时间戳出现 30 分钟的差距,并且使用 out-or-order 摄取,4 秒 bounded-out-of-orderness 策略将产生一些延迟事件,这些事件将被 window.

这是由于我的管道中的错误而发生的,而不是在 flinkKafkaConsumer 中使用时间戳分配器,而是将其添加到从 flinkKafkaConsumer 生成的数据流中。

此更改已解决我这边的自动恢复问题,但如果手动重启 post 对管道进行任何更改,最后 window 时仍然会丢失一些数据作业已停止。

注意:-- 我们正在使用检查点进行手动恢复。 根据文档,检查点是作业失败时自动恢复的理想选择。

如果我们需要创建一个保存点,以防我们需要对管道进行一些更改并手动重新启动它,或者我们可以使用检查点进行完全恢复,那么对此的任何说明都会有所帮助。

我们在使用保存点的情况下唯一关心的是对可能发生的相同事件的重新处理,这在少数情况下对我们来说并不理想。