如何从 kafka Stream 获取最新值

How to get the latest value from a kafka Stream

我对 Kafka 相当陌生,streaming.I 有一个要求,就像每次我 运行 kafka 生产者和消费者我应该得到生产者产生的唯一消息。

下面是生产者和消费者的基本代码

制作人

 val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("test", "key", jsonstring)
    producer.send(record)
    producer.close()

消费者

val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "earliest")
    props.put("group.id", "13")
    val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
    consumer.subscribe(util.Arrays.asList("test"))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator){
        println(data.value())

      }

我使用的输入Json如下

{

"id":1,

"Name":"foo"

}

现在我面临的问题是每次我 运行 程序我得到重复的 values.For 示例如果我 运行 代码两次消费者输出看起来像这样

{

"id":1,

"Name":"foo"

}

{

"id":1,

"Name":"foo"

}

我想要的输出就像我 运行 程序一样,生产者处理的唯一消息应该被消费并且应该被打印。

我尝试了一些方法,比如将消费者属性的偏移量更改为最新

props.put("auto.offset.reset", "latest")

我也试过下面提到的东西,但它对我不起作用

你能推荐一些替代方案吗??

Consumer 按顺序从主题分区读取消息。 如果你调用 poll(),它 returns 记录写入 Kafka,我们组中的消费者还没有读过。 Kafka 跟踪它们在每个分区上的消费偏移量,以了解在重启时从哪里开始消费。 消费者使用 commit.

在主题 __consumer_offsets 中维护他们的分区偏移量

Commit is the action of updating the current position in __consumer_offsets.

如果消费者重新启动,为了知道从哪里开始消费,消费者将读取每个分区的最新提交的偏移量并从那里继续。

您可以通过两种方式控制提交,或者将自动提交设置为 true 和提交间隔

1.By enable.auto.commit 真

props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

2.Manual 提交

consumer.commitAsync();//asyn commit
or  
consumer.commitSync();//sync commit

如果您提交失败,它将从上次提交的位置重新开始,如下图所示

auto.offset.reset:

消费者第一次重启后,它使用auto.offset.reset来确定每个分配分区的初始位置。请注意,当组首次使用唯一组 ID 创建时,在使用任何消息之前,位置将根据可配置的偏移量重置策略设置 (auto.offset.reset)。之后,它将继续增量消费消息并使用提交(如上所述)来跟踪最新的消费消息

Note: If the consumer crashes before any offset has been committed, then the consumer which takes over its partitions will use the reset policy.

所以在你的情况下

  1. 使用手动偏移量提交或 enable.auto.commit true 自动提交。
  2. 如果您更改组,如果它将处理不同的消费者并使用 auto.offset.reset 分配偏移量,请始终使用相同的组 ID。

参考:https://www.confluent.io/resources/kafka-the-definitive-guide/