@KafkaListener 在 RecordInterceptor 更改消费者记录后使用 Acknowledgment.nack() 时跳过消息
@KafkaListener skipping messages when using Acknowledgment.nack() after RecordInterceptor changed consumer record
让我们假设以下 RecordInterceptor 只是 return 收到的消费者记录的副本。
class CustomRecordInterceptor : RecordInterceptor<Any, Any> {
override fun intercept(record: ConsumerRecord<Any, Any>): ConsumerRecord<Any, Any>? {
return with(record) {
ConsumerRecord(
topic(),
partition(),
offset(),
timestamp(),
timestampType(),
checksum(),
serializedKeySize(),
serializedValueSize(),
key(),
value(),
headers(),
leaderEpoch())
}
}
}
有了这样的拦截器,我们会遇到以下 Kafka 侦听器丢失记录的情况。
注意:record
是拦截器return的结果。
@KafkaListener(topics = ["topic"])
fun listenToEvents(
record: ConsumerRecord<SpecificRecordBase, SpecificRecordBase?>,
ack: Acknowledgment
) {
if (shouldNegativelyAcknowledge()) {
ack.nack(2_000L)
return
}
processRecord(record)
ack.acknowledge()
}
每当 shouldNegativelyAcknowledge()
为真时,我们希望监听器在 > 2 秒后重新处理该记录。我们正在使用 ackMode = MANUAL
.
然而,我们看到的是,过了一段时间,跳过的记录没有被侦听器重新处理:processRecord
从未为该记录调用。一段时间后,消费组的滞后为0。
调试时,我们在KafkaMessageListenerContainer.ListenerConsumer#handleNack
中发现了这个代码块:
if (next.equals(record) || list.size() > 0) {
list.add(next);
}
next
是拦截器处理后的记录(所以是原记录的副本)
record
是拦截器处理之前的记录
请注意 next
和 record
永远不会相等,因为 ConsumerRecord
不会覆盖 equals
。
这可能是意外跳过记录的原因,甚至可能是错误?
或者是记录拦截器误用return一个不同的ConsumerRecord对象,不等于原来的?
这是一个错误,它确实解释了为什么剩余的记录没有发送给侦听器 - 请在 GitHub
上打开一个问题
让我们假设以下 RecordInterceptor 只是 return 收到的消费者记录的副本。
class CustomRecordInterceptor : RecordInterceptor<Any, Any> {
override fun intercept(record: ConsumerRecord<Any, Any>): ConsumerRecord<Any, Any>? {
return with(record) {
ConsumerRecord(
topic(),
partition(),
offset(),
timestamp(),
timestampType(),
checksum(),
serializedKeySize(),
serializedValueSize(),
key(),
value(),
headers(),
leaderEpoch())
}
}
}
有了这样的拦截器,我们会遇到以下 Kafka 侦听器丢失记录的情况。
注意:record
是拦截器return的结果。
@KafkaListener(topics = ["topic"])
fun listenToEvents(
record: ConsumerRecord<SpecificRecordBase, SpecificRecordBase?>,
ack: Acknowledgment
) {
if (shouldNegativelyAcknowledge()) {
ack.nack(2_000L)
return
}
processRecord(record)
ack.acknowledge()
}
每当 shouldNegativelyAcknowledge()
为真时,我们希望监听器在 > 2 秒后重新处理该记录。我们正在使用 ackMode = MANUAL
.
然而,我们看到的是,过了一段时间,跳过的记录没有被侦听器重新处理:processRecord
从未为该记录调用。一段时间后,消费组的滞后为0。
调试时,我们在KafkaMessageListenerContainer.ListenerConsumer#handleNack
中发现了这个代码块:
if (next.equals(record) || list.size() > 0) {
list.add(next);
}
next
是拦截器处理后的记录(所以是原记录的副本)record
是拦截器处理之前的记录
请注意 next
和 record
永远不会相等,因为 ConsumerRecord
不会覆盖 equals
。
这可能是意外跳过记录的原因,甚至可能是错误?
或者是记录拦截器误用return一个不同的ConsumerRecord对象,不等于原来的?
这是一个错误,它确实解释了为什么剩余的记录没有发送给侦听器 - 请在 GitHub
上打开一个问题