关于 Spring 消息驱动通道适配器的查询
Query regarding Spring message-driven-channel-adapter
我正在使用 Spring 的消息驱动通道适配器。
我的组件正在使用来自 Tibco 主题的消息并发布到 RabbitMQ 主题
所以消息流程如下:
Tibco->(订阅)组件(发布到)-> RabbitMQ
服务激活器如下图所示:如我们所见,有一个输入通道和一个输出通道。 bean storeAndForwardActivator 将具有业务逻辑(在方法 createIssueOfInterestOratorRecord 中)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
我还有一个 message=driven-channel-adapter。该适配器将在调用服务适配器之前被调用。
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
即具体来说,容器(如下所示)将保存要使用的主题名称——这是 DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
这个设置工作得很好。但是在某些情况下,我的 consumer/component 会收到一条 'rogue' 消息。即一个空的负载或 HashMap 的消息类型(而不是普通的 TextMessage)——当我们得到这个时——我观察到的是——在 DefaultMessageListener 级别捕获了一个异常(即我没有达到我的业务 bean,即 storeAndForwardActivator ),因此,我的组件不会发回 ACK - 因为这是一个持久的主题 - 在主题中有一个消息构建 - 这是不可取的。
有没有办法让我立即确认消息,而不管天气如何,在 DefaultMessageListener 级别捕获异常?
或者我应该在 DefaultMessageListener 中引入错误处理程序吗?
处理此问题的最佳方法是什么,有什么建议吗?
问候
D
更新:
我尝试将 errorHandler 添加到 org.springframework.jms.listener.DefaultMessageListenerContainer
如下图
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler 是一个如下所示的 bean
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler 实现 ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
我注意到异常被捕获(当订阅者消费恶意消息时)。我一直循环查看此日志
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
即由于事务未提交 - 来自持久主题的相同消息被一次又一次地使用。我的目标是在使用消息后将 ACK 发送回代理(无论是否捕获异常)。
明天我试试错误频道
问候
D
向消息驱动的适配器添加一个error-channel
; ErrorMessage
将包含一个具有两个字段的 MessagingException
有效负载; cause
(异常)和 failedMessage
.
如果您使用默认值 error-channel="errorChannel"
,则会记录异常。
如果您想做更多的事情,您可以配置自己的错误通道并向其添加一些流。
编辑:
根据您在下方的评论...
payload must not be null
不是堆栈跟踪;这是一条消息。
也就是说,payload must not be null
看起来像 Spring 集成消息;它可能在消息转换期间被抛出在消息侦听器适配器中,这是在我们到达失败可以转到 error-channel
的地步之前;这样的异常会被抛回给容器
打开 DEBUG 日志记录并查找此日志条目:
logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");
此外,提供完整的堆栈跟踪。
编辑#2
因此,我通过在自定义 MessageConverter
.
中强制转换后的有效负载为 null 来重现您的问题
事务回滚后容器调用 DMLC 错误处理程序,因此无法停止回滚。
我们可以向适配器添加一个选项来以不同方式处理此类错误,但这需要一些工作。
与此同时,一个解决方法是编写一个自定义 MessageConverter
;类似于 this Gist.
中的那个
然后,您的服务将不得不处理 "Bad Message Received" 负载。
然后您像这样提供自定义转换器...
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue" acknowledge="transacted"
message-converter="converter"
channel="jmsInChannel" />
<beans:bean id="converter" class="foo.MyMessageConverter" />
我正在使用 Spring 的消息驱动通道适配器。 我的组件正在使用来自 Tibco 主题的消息并发布到 RabbitMQ 主题
所以消息流程如下: Tibco->(订阅)组件(发布到)-> RabbitMQ
服务激活器如下图所示:如我们所见,有一个输入通道和一个输出通道。 bean storeAndForwardActivator 将具有业务逻辑(在方法 createIssueOfInterestOratorRecord 中)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
我还有一个 message=driven-channel-adapter。该适配器将在调用服务适配器之前被调用。
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
即具体来说,容器(如下所示)将保存要使用的主题名称——这是 DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
这个设置工作得很好。但是在某些情况下,我的 consumer/component 会收到一条 'rogue' 消息。即一个空的负载或 HashMap 的消息类型(而不是普通的 TextMessage)——当我们得到这个时——我观察到的是——在 DefaultMessageListener 级别捕获了一个异常(即我没有达到我的业务 bean,即 storeAndForwardActivator ),因此,我的组件不会发回 ACK - 因为这是一个持久的主题 - 在主题中有一个消息构建 - 这是不可取的。 有没有办法让我立即确认消息,而不管天气如何,在 DefaultMessageListener 级别捕获异常?
或者我应该在 DefaultMessageListener 中引入错误处理程序吗? 处理此问题的最佳方法是什么,有什么建议吗?
问候 D
更新:
我尝试将 errorHandler 添加到 org.springframework.jms.listener.DefaultMessageListenerContainer 如下图
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler 是一个如下所示的 bean
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler 实现 ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
我注意到异常被捕获(当订阅者消费恶意消息时)。我一直循环查看此日志
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
即由于事务未提交 - 来自持久主题的相同消息被一次又一次地使用。我的目标是在使用消息后将 ACK 发送回代理(无论是否捕获异常)。
明天我试试错误频道
问候 D
向消息驱动的适配器添加一个error-channel
; ErrorMessage
将包含一个具有两个字段的 MessagingException
有效负载; cause
(异常)和 failedMessage
.
如果您使用默认值 error-channel="errorChannel"
,则会记录异常。
如果您想做更多的事情,您可以配置自己的错误通道并向其添加一些流。
编辑:
根据您在下方的评论...
payload must not be null
不是堆栈跟踪;这是一条消息。
也就是说,payload must not be null
看起来像 Spring 集成消息;它可能在消息转换期间被抛出在消息侦听器适配器中,这是在我们到达失败可以转到 error-channel
的地步之前;这样的异常会被抛回给容器
打开 DEBUG 日志记录并查找此日志条目:
logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");
此外,提供完整的堆栈跟踪。
编辑#2
因此,我通过在自定义 MessageConverter
.
事务回滚后容器调用 DMLC 错误处理程序,因此无法停止回滚。
我们可以向适配器添加一个选项来以不同方式处理此类错误,但这需要一些工作。
与此同时,一个解决方法是编写一个自定义 MessageConverter
;类似于 this Gist.
然后,您的服务将不得不处理 "Bad Message Received" 负载。
然后您像这样提供自定义转换器...
<jms:message-driven-channel-adapter id="jmsIn"
destination="requestQueue" acknowledge="transacted"
message-converter="converter"
channel="jmsInChannel" />
<beans:bean id="converter" class="foo.MyMessageConverter" />