@KafkaListener 在 RecordInterceptor 更改消费者记录后使用 Acknowledgment.nack() 时跳过消息

@KafkaListener skipping messages when using Acknowledgment.nack() after RecordInterceptor changed consumer record

让我们假设以下 RecordInterceptor 只是 return 收到的消费者记录的副本。

class CustomRecordInterceptor : RecordInterceptor<Any, Any> {

  override fun intercept(record: ConsumerRecord<Any, Any>): ConsumerRecord<Any, Any>? {
    return with(record) {
      ConsumerRecord(
          topic(),
          partition(),
          offset(),
          timestamp(),
          timestampType(),
          checksum(),
          serializedKeySize(),
          serializedValueSize(),
          key(),
          value(),
          headers(),
          leaderEpoch())
    }
  }
}

有了这样的拦截器,我们会遇到以下 Kafka 侦听器丢失记录的情况。

注意:record是拦截器return的结果。

@KafkaListener(topics = ["topic"])
fun listenToEvents(
    record: ConsumerRecord<SpecificRecordBase, SpecificRecordBase?>,
    ack: Acknowledgment
) {
  if (shouldNegativelyAcknowledge()) {
    ack.nack(2_000L)
    return
  }
  processRecord(record)
  ack.acknowledge()
}

每当 shouldNegativelyAcknowledge() 为真时,我们希望监听器在 > 2 秒后重新处理该记录。我们正在使用 ackMode = MANUAL.

然而,我们看到的是,过了一段时间,跳过的记录没有被侦听器重新处理:processRecord 从未为该记录调用。一段时间后,消费组的滞后为0。

调试时,我们在KafkaMessageListenerContainer.ListenerConsumer#handleNack中发现了这个代码块:

if (next.equals(record) || list.size() > 0) {
  list.add(next);
}

请注意 nextrecord 永远不会相等,因为 ConsumerRecord 不会覆盖 equals

这可能是意外跳过记录的原因,甚至可能是错误?

或者是记录拦截器误用return一个不同的ConsumerRecord对象,不等于原来的?

这是一个错误,它确实解释了为什么剩余的记录没有发送给侦听器 - 请在 GitHub

上打开一个问题

https://github.com/spring-projects/spring-kafka/issues