Flink 检查点不重播 savepoint/checkpoint 期间正在处理的 kafka 事件
Flink checkpoint not replaying the kafka events which were in process during the savepoint/checkpoint
我想在 flink 中测试一次端到端处理。我的工作是:
Kafka-source -> mapper1 -> mapper-2 -> kafka-sink
我在 mapper1 中放了一个 Thread.sleep(100000)
然后 运行 作业。我在停止作业时获取了保存点,然后从 mapper1 中删除了 Thread.sleep(100000)
,我希望事件应该重播,因为它没有下沉。但这并没有发生,作业正在等待新事件。
我的 Kafka 来源:
KafkaSource.<String>builder()
.setBootstrapServers(consumerConfig.getBrokers())
.setTopics(consumerConfig.getTopic())
.setGroupId(consumerConfig.getGroupId())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
我的卡夫卡接收器:
KafkaSink.<String>builder()
.setBootstrapServers(producerConfig.getBootstrapServers())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(producerConfig.getTopic())
.setValueSerializationSchema(new SimpleStringSchema()).build())
.build();
我的 flink 作业环境设置:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(2000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
environment.getCheckpointConfig().setCheckpointTimeout(60000);
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointTimeout(1000);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().enableUnalignedCheckpoints();
environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
Configuration configuration = new Configuration();
configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
environment.configure(configuration);
我在这里做错了什么?
我希望作业 cancellation/stop 期间正在处理的任何事件都应该重新启动。
编辑 1:
我观察到我的 kafka 对我的 flink 的 kafka-source 消费者组显示偏移滞后。我假设这意味着我的检查点运行正常,对吗?
我还观察到,当我从检查点重新启动作业时,它并没有开始使用剩余的偏移量,而我将消费者偏移量设置为 EARLIEST。我不得不发送更多事件来触发 kafka-source 端的消费,然后它消费了所有事件。
对于 exactly-once,您必须针对同一个 Kafka 集群提供在所有应用程序 运行 中唯一的 TransactionalIdPrefix
(与传统 FlinkKafkaConsumer
相比,这是一个变化) :
KafkaSink<T> sink =
KafkaSink.<T>builder()
.setBootstrapServers(...)
.setKafkaProducerConfig(...)
.setRecordSerializer(...)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("unique-id-for-your-app")
.build();
从检查点恢复时,Flink 始终使用存储在检查点中的偏移量,而不是代码中配置或存储在代理中的偏移量。
我想在 flink 中测试一次端到端处理。我的工作是:
Kafka-source -> mapper1 -> mapper-2 -> kafka-sink
我在 mapper1 中放了一个 Thread.sleep(100000)
然后 运行 作业。我在停止作业时获取了保存点,然后从 mapper1 中删除了 Thread.sleep(100000)
,我希望事件应该重播,因为它没有下沉。但这并没有发生,作业正在等待新事件。
我的 Kafka 来源:
KafkaSource.<String>builder()
.setBootstrapServers(consumerConfig.getBrokers())
.setTopics(consumerConfig.getTopic())
.setGroupId(consumerConfig.getGroupId())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("commit.offsets.on.checkpoint", "true")
.build();
我的卡夫卡接收器:
KafkaSink.<String>builder()
.setBootstrapServers(producerConfig.getBootstrapServers())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(producerConfig.getTopic())
.setValueSerializationSchema(new SimpleStringSchema()).build())
.build();
我的 flink 作业环境设置:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.enableCheckpointing(2000);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
environment.getCheckpointConfig().setCheckpointTimeout(60000);
environment.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
environment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointTimeout(1000);
environment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
environment.getCheckpointConfig().enableUnalignedCheckpoints();
environment.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
Configuration configuration = new Configuration();
configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
environment.configure(configuration);
我在这里做错了什么? 我希望作业 cancellation/stop 期间正在处理的任何事件都应该重新启动。
编辑 1: 我观察到我的 kafka 对我的 flink 的 kafka-source 消费者组显示偏移滞后。我假设这意味着我的检查点运行正常,对吗?
我还观察到,当我从检查点重新启动作业时,它并没有开始使用剩余的偏移量,而我将消费者偏移量设置为 EARLIEST。我不得不发送更多事件来触发 kafka-source 端的消费,然后它消费了所有事件。
对于 exactly-once,您必须针对同一个 Kafka 集群提供在所有应用程序 运行 中唯一的 TransactionalIdPrefix
(与传统 FlinkKafkaConsumer
相比,这是一个变化) :
KafkaSink<T> sink =
KafkaSink.<T>builder()
.setBootstrapServers(...)
.setKafkaProducerConfig(...)
.setRecordSerializer(...)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("unique-id-for-your-app")
.build();
从检查点恢复时,Flink 始终使用存储在检查点中的偏移量,而不是代码中配置或存储在代理中的偏移量。