当我重新运行 Flink 消费者时,Kafka 再次消费最新的消息
Kafka consuming the latest message again when I rerun the Flink consumer
我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者。每当我从一个主题传递一些消息时,它都会及时接收它们。但是,当我重新启动消费者时,它没有接收到新的或未使用的消息,而是使用了发送到该主题的最新消息。
这是我正在做的事情:
运行制作人:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
运行消费者:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
env.enableCheckpointing(5000)
st.print()
env.execute()
传递一些消息
- 停止消费者
- 运行 消费者再次打印我发送的最后一条消息。我希望它只打印新消息。
您是 运行 一个检查点间隔为 5 秒的 Kafka 消费者。
因此,每 5 秒,Flink 就会创建一个操作符状态(偏移量)的副本以供恢复。
检查点完成后,它会让操作员知道检查点已完成。根据该通知,Kafka 消费者将偏移量提交给 Zookeeper。所以大约每 5 秒,我们将最后一个检查点的偏移量写入 ZK。
当你再次启动 Flink 作业时,它会在 ZK 中找到偏移量并从那里继续。根据时间的不同,所有提交到 ZK 后收到的消息都会重新发送。
您无法避免此行为,因为 .print()
"operator" 不是检查点的一部分。它的意思是调试实用程序。
然而,参与检查点的数据接收器(例如滚动文件接收器)将确保没有重复写入文件系统。
我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者。每当我从一个主题传递一些消息时,它都会及时接收它们。但是,当我重新启动消费者时,它没有接收到新的或未使用的消息,而是使用了发送到该主题的最新消息。
这是我正在做的事情:
运行制作人:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
运行消费者:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val env = StreamExecutionEnvironment.getExecutionEnvironment val st = env .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties)) env.enableCheckpointing(5000) st.print() env.execute()
传递一些消息
- 停止消费者
- 运行 消费者再次打印我发送的最后一条消息。我希望它只打印新消息。
您是 运行 一个检查点间隔为 5 秒的 Kafka 消费者。 因此,每 5 秒,Flink 就会创建一个操作符状态(偏移量)的副本以供恢复。
检查点完成后,它会让操作员知道检查点已完成。根据该通知,Kafka 消费者将偏移量提交给 Zookeeper。所以大约每 5 秒,我们将最后一个检查点的偏移量写入 ZK。
当你再次启动 Flink 作业时,它会在 ZK 中找到偏移量并从那里继续。根据时间的不同,所有提交到 ZK 后收到的消息都会重新发送。
您无法避免此行为,因为 .print()
"operator" 不是检查点的一部分。它的意思是调试实用程序。
然而,参与检查点的数据接收器(例如滚动文件接收器)将确保没有重复写入文件系统。