Spring Kafka:非事务性生产者回调处理

Spring Kafka: Non-transactional producer callback handling

我正在使用非事务性生产者并试图了解如何为 success/failure 场景处理回调。

对于成功发送,我看到回调由 kafka-producer-network-thread 线程("Sent ok" 消息)执行。

发送消息成功-kafka-producer-network-thread 00:59:17.522

00:59:16.850 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.3.1
00:59:16.858 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:59:16.863 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer 
00:59:17.326 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent
:::
00:59:17.522 [kafka-producer-network-thread | producer-1] TRACE o.s.kafka.core.KafkaTemplate - Sent ok

然后我通过向 kafka 模板提供一个不存在的主题名称来模拟失败,这次回调似乎在 Listener 容器线程("Failed to send" 消息)中执行,然后是 "sent" 消息监听器容器线程!

容器线程如何记录 "failed to send" 消息 - 它是回调的一部分!记录失败消息后,它会记录发送的消息(Kafka 模板中 doSend 方法的一部分)。容器线程是否阻塞 post 发送?

发送消息失败 - KafkaListenerEndpointContainer 00:27:33.975,已发送消息 00:27:33.982

00:27:33.773 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:27:33.779 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer 
00:27:33.957 [kafka-producer-network-thread | producer-1] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test1=TOPIC_AUTHORIZATION_FAILED}
00:27:33.957 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [test1]
00:27:33.958 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BgLUXrqZSLKvOw2Kn0nhVQ
00:27:33.973 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message
:::
00:27:33.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.kafka.core.KafkaTemplate - Failed to send

00:27:33.977 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE 
o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@57033d15] close(PT5S)

00:27:33.982 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent: ProducerRecord

"sent ok"和"failed to send"消息都记录在KafkaTemplate的buildCallback方法中。

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {

     producer.send(producerRecord, buildCallback(producerRecord, producer, future));
            if (this.autoFlush) {
                flush();
            }
            this.logger.trace(() -> "Sent: " + producerRecord);
            return future;
}

private Callback buildCallback
    return (metadata, exception) -> {
            try {
                if (exception == null) {
:::
                    KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
                }
                else {
:::
                    KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
                }

回调不应该总是由生产者网络线程执行吗?

I then simulate a failure by...

这是你的问题 - 为了发送请求,需要主题的元数据。

Error while fetching metadata

调用线程阻塞,直到元数据可用。

在这种情况下,生产者网络线程报告获取元数据失败,而不是发送失败,因此未来由调用线程异常完成。

您需要一个真正的发送失败。有几件事可以尝试:

  • 发送的记录太大,经纪人无法处理
  • 当没有足够的同步副本可用时发送记录

第一个可能被生产者代码检测到,我不记得了。