Spring KafkaTemplate 的 Apache Kafka onFailure 回调未在连接错误时触发

Spring Apache Kafka onFailure Callback of KafkaTemplate not fired on connection error

我目前正在 Spring 启动应用程序中对 Apache Kafka 进行大量试验。

我目前的目标是编写一个接收一些消息负载的 REST 端点,它将使用 KafkaTemplate 将数据发送到端口 9092 上我的本地 Kafka 运行。

这是我的生产者配置:

@Bean
public Map<String,Object> producerConfig() {

    // config settings for creating producers
    Map<String,Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
    configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,5000);
    configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
    configProps.put(ProducerConfig.RETRIES_CONFIG,0);

    return configProps;

}

@Bean
public ProducerFactory<String,String> producerFactory() {
    // creates a kafka producer
    return new DefaultKafkaProducerFactory<>(producerConfig());
}

@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
    // template which abstracts sending data to kafka
    return new KafkaTemplate<>(producerFactory());
}

我的休息端点转发到服务,服务看起来像这样:

  @Service
    public class KafkaSenderService {

        @Qualifier("kafkaTemplate")
        private final KafkaTemplate<String,String> kafkaTemplate;

        @Autowired
        public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }

        public void sendMessageWithCallback(String message, String topicName) {

            // possibility to add callbacks to define what shall happen in success/ error case
            ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);

            future.addCallback(new KafkaSendCallback<String, String>() {

                @Override
                public void onFailure(KafkaProducerException ex) {
                    logger.warn("Message could not be delivered. " + ex.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, String> result) {
                    logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
                }
            });

        }
}

现在的情况是:我希望在无法发送消息时调用“onFailure()”方法。但这似乎不起作用。当我将生产者配置中的 bootstrapServers 变量更改为 localhost:9091(这是错误的端口,因此应该不可能建立连接)时,生产者尝试连接到代理。它将进行多次连接尝试,5 秒后,将发生 TimeOutException。但是“onFailure()”方法不会被调用。有没有办法实现“onFailure()”方法在无法建立连接的情况下可以被调用事件?

顺便说一句,我将重试次数设置为零,但生产者在第一次连接尝试后仍会进行第二次连接尝试。这是日志输出:

编辑:当代理不可用时,Kafke 生产者/KafkaTemplate 似乎进入无限循环。这真的是预期的行为吗?

KafkaTemplate 在连接和发布方面确实没什么特别的。一切都委托给 KafkaProducer。即使您只使用普通的 Kafka 客户端,您在这里描述的内容也会完全发生。

参见 KafkaProducer.send() JavaDocs:

 * @throws TimeoutException If the record could not be appended to the send buffer due to memory unavailable
 *                          or missing metadata within {@code max.block.ms}.

该生产者中的阻塞逻辑发生的情况:

/**
 * Wait for cluster metadata including partitions for the given topic to be available.
 * @param topic The topic we want metadata for
 * @param partition A specific partition expected to exist in metadata, or null if there's no preference
 * @param nowMs The current time in ms
 * @param maxWaitMs The maximum time in ms for waiting on the metadata
 * @return The cluster containing topic metadata and the amount of time we waited in ms
 * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
 * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
 */
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {

不幸的是,在声称完全异步的 send() JavaDocs 中并未对此进行解释,但显然并非如此。至少在这个元数据部分中,在我们将记录排队发布之前必须可用。

这是我们无法控制的,它不会反映在返回的 Future:

        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }

在 Apache Kafka 文档中查看更多信息如何针对此事调整 KafkaProducerhttps://kafka.apache.org/documentation/#theproducer

https://github.com/spring-projects/spring-kafka/discussions/2250# 的讨论中回答了遇到此问题的其他人的问题。简而言之,kafkaTemplate.getProducerFactory().reset();可以解决问题。