Flink 检查点不重播 savepoint/checkpoint 期间正在处理的 kafka 事件

Flink checkpoint not replaying the kafka events which were in process during the savepoint/checkpoint

我想在 flink 中测试一次端到端处理。我的工作是:

Kafka-source -> mapper1 -> mapper-2 -> kafka-sink

我在 mapper1 中放了一个 Thread.sleep(100000) 然后 运行 作业。我在停止作业时获取了保存点,然后从 mapper1 中删除了 Thread.sleep(100000),我希望事件应该重播,因为它没有下沉。但这并没有发生,作业正在等待新事件。

我的 Kafka 来源:

KafkaSource.<String>builder()
                .setBootstrapServers(consumerConfig.getBrokers())
                .setTopics(consumerConfig.getTopic())
                .setGroupId(consumerConfig.getGroupId())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("commit.offsets.on.checkpoint", "true")
                .build();

我的卡夫卡接收器:

KafkaSink.<String>builder()
                .setBootstrapServers(producerConfig.getBootstrapServers())
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(producerConfig.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema()).build())
                .build();

我的 flink 作业环境设置:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.enableCheckpointing(2000);
        environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
        environment.getCheckpointConfig().setCheckpointTimeout(60000);
        environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        environment.getCheckpointConfig().setCheckpointTimeout(1000);
        environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        environment.getCheckpointConfig().enableUnalignedCheckpoints();
        environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
        environment.configure(configuration);

我在这里做错了什么? 我希望作业 cancellation/stop 期间正在处理的任何事件都应该重新启动。

编辑 1: 我观察到我的 kafka 对我的 flink 的 kafka-source 消费者组显示偏移滞后。我假设这意味着我的检查点运行正常,对吗?

我还观察到,当我从检查点重新启动作业时,它并没有开始使用剩余的偏移量,而我将消费者偏移量设置为 EARLIEST。我不得不发送更多事件来触发 kafka-source 端的消费,然后它消费了所有事件。

对于 exactly-once,您必须针对同一个 Kafka 集群提供在所有应用程序 运行 中唯一的 TransactionalIdPrefix(与传统 FlinkKafkaConsumer 相比,这是一个变化) :

KafkaSink<T> sink =
        KafkaSink.<T>builder()
                .setBootstrapServers(...)
                .setKafkaProducerConfig(...)
                .setRecordSerializer(...)
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("unique-id-for-your-app")
                .build();

从检查点恢复时,Flink 始终使用存储在检查点中的偏移量,而不是代码中配置或存储在代理中的偏移量。