KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量
KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset
我正在尝试在 Kafka 中实现一个事务 Processor
以确保我不会重复处理同一条消息。给定一条消息 (A),我需要创建一个消息列表,这些消息将在事务中的另一个主题上生成,并且我想在同一事务中提交原始消息 (A)。从文档中我找到了 Producer
方法 sendOffsetsToTransaction
,它似乎只有在成功时才能在事务中提交偏移量。这是我的 Processor
:
的 process()
方法中的代码
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
不幸的是,对于这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时都会重新处理已处理的消息 (A)。
我设法让它工作,将 +1
添加到 this.context().offset()
返回的偏移量并以这种方式重新定义 val offsetAndMetadata
:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
这是正常行为还是我做错了什么?
谢谢:)
您的代码是正确的。
您提交的偏移量是您接下来要阅读的消息的偏移量(不是您上次阅读的消息的偏移量)。
您可以使用
而不是将偏移量加 1
long newOffset = consumer.position(topicPartition);
这将return 给出下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大 1
我正在尝试在 Kafka 中实现一个事务 Processor
以确保我不会重复处理同一条消息。给定一条消息 (A),我需要创建一个消息列表,这些消息将在事务中的另一个主题上生成,并且我想在同一事务中提交原始消息 (A)。从文档中我找到了 Producer
方法 sendOffsetsToTransaction
,它似乎只有在成功时才能在事务中提交偏移量。这是我的 Processor
:
process()
方法中的代码
producer.beginTransaction()
val topicPartition = new TopicPartition(this.context().topic(), this.context().partition())
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
val map = Map(topicPartition -> offsetAndMetadata).asJava
producer.sendOffsetsToTransaction(map, "consumer-group-id")
items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
producer.commitTransaction()
throw new RuntimeException("expected exception")
不幸的是,对于这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时都会重新处理已处理的消息 (A)。
我设法让它工作,将 +1
添加到 this.context().offset()
返回的偏移量并以这种方式重新定义 val offsetAndMetadata
:
val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
这是正常行为还是我做错了什么?
谢谢:)
您的代码是正确的。
您提交的偏移量是您接下来要阅读的消息的偏移量(不是您上次阅读的消息的偏移量)。
您可以使用
而不是将偏移量加 1 long newOffset = consumer.position(topicPartition);
这将return 给出下一条记录的偏移量。它将比消费者在该分区中看到的最高偏移量大 1