当我重新运行 Flink 消费者时,Kafka 再次消费最新的消息

Kafka consuming the latest message again when I rerun the Flink consumer

我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者。每当我从一个主题传递一些消息时,它都会及时接收它们。但是,当我重新启动消费者时,它没有接收到新的或未使用的消息,而是使用了发送到该主题的最新消息。

这是我正在做的事情:

  1. 运行制作人:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
  2. 运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
  3. 传递一些消息

  4. 停止消费者
  5. 运行 消费者再次打印我发送的最后一条消息。我希望它只打印新消息。

您是 运行 一个检查点间隔为 5 秒的 Kafka 消费者。 因此,每 5 秒,Flink 就会创建一个操作符状态(偏移量)的副本以供恢复。

检查点完成后,它会让操作员知道检查点已完成。根据该通知,Kafka 消费者将偏移量提交给 Zookeeper。所以大约每 5 秒,我们将最后一个检查点的偏移量写入 ZK。

当你再次启动 Flink 作业时,它会在 ZK 中找到偏移量并从那里继续。根据时间的不同,所有提交到 ZK 后收到的消息都会重新发送。

您无法避免此行为,因为 .print() "operator" 不是检查点的一部分。它的意思是调试实用程序。 然而,参与检查点的数据接收器(例如滚动文件接收器)将确保没有重复写入文件系统。