Kafka spring 当主题名称不正确或不存在时,集成不会抛出异常

Kakfa spring Integration not throws exception when topic name is incorrect or doesn't exists

我正在使用 Spring 集成向 Kakfa 发送消息 channel.When 服务器名称不正确,它会抛出正常且符合预期行为的异常。但是当我给出错误的主题名称时,它会默默地失败并且不会抛出任何异常。这是我正在使用的配置

                                        kafka-template="kafkaTemplate"
                                        auto-startup="true"
                                        topic="topicName"   -- if i give incorrect topic name here
                                        sync="false" >
        <int-kafka:request-handler-advice-chain>
            <ref bean="requestHandlerAdvice"/>
            <ref bean="retryAdvice"/>
        </int-kafka:request-handler-advice-chain>
    </int-kafka:outbound-channel-adapter>

而kafkaTemplate配置为

        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="${kafkaCF_hostName}:${kafkaCF_port}" />
                        <entry key="key.serializer" value="serializer"/>
                        <entry key="value.serializer" value="value.serializer}"/>
                        <entry key="security.protocol" value="${security.protocol}"/>
                        <entry key="ssl.truststore.location" value="${ssl.truststore.location}"/>
                        <entry key="ssl.truststore.password" value="${ssl.truststore.password}"/>
                        <entry key="ssl.keystore.location" value="${ssl.keystore.location}"/>
                        <entry key="ssl.keystore.password" value="${ssl.keystore.password}"/>
                        <entry key="ssl.key.password" value="${ssl.key.password}"/>
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
    </bean>

我的要求是在数据库中记录 success/failure 条目。如果我使用错误的主题名称进行测试,那么如果认为成功并传递给 requestHandlerAdvice,它会进一步配置成功/失败通道。

任何建议如何在 Kafka 中使用 spring 集成处理这种情况?

Kafka 客户端在生产者端分两个阶段工作:它与 Kafka 代理建立连接并检查元数据。当您在 KafkaTemplate 上调用 send() 时,它会立即完成。其余逻辑以异步方式完成,并推迟到 KafkaProducer.

内的某个缓冲区和事件循环

所以,显然那个错误的话题后来也解决了,如果你想用那个 requestHandlerAdvice 抓住它,你应该考虑这样 sync="true"。或者查看 sendFailureChannelKafkaProducerMessageHandler:

/**
 * Set the failure channel. After a send failure, an
 * {@link org.springframework.messaging.support.ErrorMessage} will be sent
 * to this channel with a payload of a {@link KafkaSendFailureException} with the
 * failed message and cause.
 * @param sendFailureChannel the failure channel.
 */
public void setSendFailureChannel(MessageChannel sendFailureChannel) {

XML 配置的 send-failure-channel

更新

问题就在这里auto.create.topics.enable=false。默认为 true。有关详细信息,请参阅本文:https://blog.softwaremill.com/7-mistakes-when-using-apache-kafka-44358cd9cd6