Spring Kafka:非事务性生产者回调处理
Spring Kafka: Non-transactional producer callback handling
我正在使用非事务性生产者并试图了解如何为 success/failure 场景处理回调。
对于成功发送,我看到回调由 kafka-producer-network-thread 线程("Sent ok" 消息)执行。
发送消息成功-kafka-producer-network-thread 00:59:17.522
00:59:16.850 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.3.1
00:59:16.858 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:59:16.863 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:59:17.326 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent
:::
00:59:17.522 [kafka-producer-network-thread | producer-1] TRACE o.s.kafka.core.KafkaTemplate - Sent ok
然后我通过向 kafka 模板提供一个不存在的主题名称来模拟失败,这次回调似乎在 Listener 容器线程("Failed to send" 消息)中执行,然后是 "sent" 消息监听器容器线程!
容器线程如何记录 "failed to send" 消息 - 它是回调的一部分!记录失败消息后,它会记录发送的消息(Kafka 模板中 doSend 方法的一部分)。容器线程是否阻塞 post 发送?
发送消息失败 - KafkaListenerEndpointContainer 00:27:33.975,已发送消息 00:27:33.982
00:27:33.773 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:27:33.779 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:27:33.957 [kafka-producer-network-thread | producer-1] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test1=TOPIC_AUTHORIZATION_FAILED}
00:27:33.957 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [test1]
00:27:33.958 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BgLUXrqZSLKvOw2Kn0nhVQ
00:27:33.973 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message
:::
00:27:33.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.kafka.core.KafkaTemplate - Failed to send
00:27:33.977 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE
o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@57033d15] close(PT5S)
00:27:33.982 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent: ProducerRecord
"sent ok"和"failed to send"消息都记录在KafkaTemplate的buildCallback方法中。
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
private Callback buildCallback
return (metadata, exception) -> {
try {
if (exception == null) {
:::
KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
}
else {
:::
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
}
回调不应该总是由生产者网络线程执行吗?
I then simulate a failure by...
这是你的问题 - 为了发送请求,需要主题的元数据。
Error while fetching metadata
调用线程阻塞,直到元数据可用。
在这种情况下,生产者网络线程报告获取元数据失败,而不是发送失败,因此未来由调用线程异常完成。
您需要一个真正的发送失败。有几件事可以尝试:
- 发送的记录太大,经纪人无法处理
- 当没有足够的同步副本可用时发送记录
第一个可能被生产者代码检测到,我不记得了。
我正在使用非事务性生产者并试图了解如何为 success/failure 场景处理回调。
对于成功发送,我看到回调由 kafka-producer-network-thread 线程("Sent ok" 消息)执行。
发送消息成功-kafka-producer-network-thread 00:59:17.522
00:59:16.850 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version: 2.3.1
00:59:16.858 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:59:16.863 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:59:17.326 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent
:::
00:59:17.522 [kafka-producer-network-thread | producer-1] TRACE o.s.kafka.core.KafkaTemplate - Sent ok
然后我通过向 kafka 模板提供一个不存在的主题名称来模拟失败,这次回调似乎在 Listener 容器线程("Failed to send" 消息)中执行,然后是 "sent" 消息监听器容器线程!
容器线程如何记录 "failed to send" 消息 - 它是回调的一部分!记录失败消息后,它会记录发送的消息(Kafka 模板中 doSend 方法的一部分)。容器线程是否阻塞 post 发送?
发送消息失败 - KafkaListenerEndpointContainer 00:27:33.975,已发送消息 00:27:33.982
00:27:33.773 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sending: ProducerRecord(
00:27:33.779 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer
00:27:33.957 [kafka-producer-network-thread | producer-1] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test1=TOPIC_AUTHORIZATION_FAILED}
00:27:33.957 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Topic authorization failed for topics [test1]
00:27:33.958 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BgLUXrqZSLKvOw2Kn0nhVQ
00:27:33.973 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message
:::
00:27:33.975 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.s.kafka.core.KafkaTemplate - Failed to send
00:27:33.977 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE
o.s.k.c.DefaultKafkaProducerFactory - CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@57033d15] close(PT5S)
00:27:33.982 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] TRACE o.s.kafka.core.KafkaTemplate - Sent: ProducerRecord
"sent ok"和"failed to send"消息都记录在KafkaTemplate的buildCallback方法中。
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
this.logger.trace(() -> "Sent: " + producerRecord);
return future;
}
private Callback buildCallback
return (metadata, exception) -> {
try {
if (exception == null) {
:::
KafkaTemplate.this.logger.trace(() -> "Sent ok: " + producerRecord + ", metadata: " + metadata);
}
else {
:::
KafkaTemplate.this.logger.debug(exception, () -> "Failed to send: " + producerRecord);
}
回调不应该总是由生产者网络线程执行吗?
I then simulate a failure by...
这是你的问题 - 为了发送请求,需要主题的元数据。
Error while fetching metadata
调用线程阻塞,直到元数据可用。
在这种情况下,生产者网络线程报告获取元数据失败,而不是发送失败,因此未来由调用线程异常完成。
您需要一个真正的发送失败。有几件事可以尝试:
- 发送的记录太大,经纪人无法处理
- 当没有足够的同步副本可用时发送记录
第一个可能被生产者代码检测到,我不记得了。