Kafka spring 当主题名称不正确或不存在时,集成不会抛出异常
Kakfa spring Integration not throws exception when topic name is incorrect or doesn't exists
我正在使用 Spring 集成向 Kakfa 发送消息 channel.When 服务器名称不正确,它会抛出正常且符合预期行为的异常。但是当我给出错误的主题名称时,它会默默地失败并且不会抛出任何异常。这是我正在使用的配置
kafka-template="kafkaTemplate"
auto-startup="true"
topic="topicName" -- if i give incorrect topic name here
sync="false" >
<int-kafka:request-handler-advice-chain>
<ref bean="requestHandlerAdvice"/>
<ref bean="retryAdvice"/>
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
而kafkaTemplate配置为
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafkaCF_hostName}:${kafkaCF_port}" />
<entry key="key.serializer" value="serializer"/>
<entry key="value.serializer" value="value.serializer}"/>
<entry key="security.protocol" value="${security.protocol}"/>
<entry key="ssl.truststore.location" value="${ssl.truststore.location}"/>
<entry key="ssl.truststore.password" value="${ssl.truststore.password}"/>
<entry key="ssl.keystore.location" value="${ssl.keystore.location}"/>
<entry key="ssl.keystore.password" value="${ssl.keystore.password}"/>
<entry key="ssl.key.password" value="${ssl.key.password}"/>
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
我的要求是在数据库中记录 success/failure 条目。如果我使用错误的主题名称进行测试,那么如果认为成功并传递给 requestHandlerAdvice,它会进一步配置成功/失败通道。
任何建议如何在 Kafka 中使用 spring 集成处理这种情况?
Kafka 客户端在生产者端分两个阶段工作:它与 Kafka 代理建立连接并检查元数据。当您在 KafkaTemplate
上调用 send()
时,它会立即完成。其余逻辑以异步方式完成,并推迟到 KafkaProducer
.
内的某个缓冲区和事件循环
所以,显然那个错误的话题后来也解决了,如果你想用那个 requestHandlerAdvice
抓住它,你应该考虑这样 sync="true"
。或者查看 sendFailureChannel
的 KafkaProducerMessageHandler
:
/**
* Set the failure channel. After a send failure, an
* {@link org.springframework.messaging.support.ErrorMessage} will be sent
* to this channel with a payload of a {@link KafkaSendFailureException} with the
* failed message and cause.
* @param sendFailureChannel the failure channel.
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
XML 配置的 send-failure-channel
。
更新
问题就在这里auto.create.topics.enable=false
。默认为 true
。有关详细信息,请参阅本文:https://blog.softwaremill.com/7-mistakes-when-using-apache-kafka-44358cd9cd6
我正在使用 Spring 集成向 Kakfa 发送消息 channel.When 服务器名称不正确,它会抛出正常且符合预期行为的异常。但是当我给出错误的主题名称时,它会默默地失败并且不会抛出任何异常。这是我正在使用的配置
kafka-template="kafkaTemplate"
auto-startup="true"
topic="topicName" -- if i give incorrect topic name here
sync="false" >
<int-kafka:request-handler-advice-chain>
<ref bean="requestHandlerAdvice"/>
<ref bean="retryAdvice"/>
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
而kafkaTemplate配置为
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${kafkaCF_hostName}:${kafkaCF_port}" />
<entry key="key.serializer" value="serializer"/>
<entry key="value.serializer" value="value.serializer}"/>
<entry key="security.protocol" value="${security.protocol}"/>
<entry key="ssl.truststore.location" value="${ssl.truststore.location}"/>
<entry key="ssl.truststore.password" value="${ssl.truststore.password}"/>
<entry key="ssl.keystore.location" value="${ssl.keystore.location}"/>
<entry key="ssl.keystore.password" value="${ssl.keystore.password}"/>
<entry key="ssl.key.password" value="${ssl.key.password}"/>
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
我的要求是在数据库中记录 success/failure 条目。如果我使用错误的主题名称进行测试,那么如果认为成功并传递给 requestHandlerAdvice,它会进一步配置成功/失败通道。
任何建议如何在 Kafka 中使用 spring 集成处理这种情况?
Kafka 客户端在生产者端分两个阶段工作:它与 Kafka 代理建立连接并检查元数据。当您在 KafkaTemplate
上调用 send()
时,它会立即完成。其余逻辑以异步方式完成,并推迟到 KafkaProducer
.
所以,显然那个错误的话题后来也解决了,如果你想用那个 requestHandlerAdvice
抓住它,你应该考虑这样 sync="true"
。或者查看 sendFailureChannel
的 KafkaProducerMessageHandler
:
/**
* Set the failure channel. After a send failure, an
* {@link org.springframework.messaging.support.ErrorMessage} will be sent
* to this channel with a payload of a {@link KafkaSendFailureException} with the
* failed message and cause.
* @param sendFailureChannel the failure channel.
*/
public void setSendFailureChannel(MessageChannel sendFailureChannel) {
XML 配置的 send-failure-channel
。
更新
问题就在这里auto.create.topics.enable=false
。默认为 true
。有关详细信息,请参阅本文:https://blog.softwaremill.com/7-mistakes-when-using-apache-kafka-44358cd9cd6