Mule AMQP 连接器尝试重新排队消息失败
Mule AMQP connector fails trying to requeue message
我正在使用 AMQP 端点并在我的 Mule 应用程序的其他地方让它工作(这里提到一个警告 [Possible bug with Mule AMQP transport 3.6.2 community)。
我正在尝试构建可靠的 SMTP 队列,但遇到了问题。
我假设如果 SMTP 连接器无法执行,它会抛出异常,因此通过将消息确认设置为 MANUAL,并捕获 SMTP 异常并拒绝(并重新排队)异常策略中的消息系统应处理 SMTP 中断。
我的代码(稍微简化)如下所示:
<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
<amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct" responseTimeout="10000" doc:name="AMQP-0-9" queueDurable="true"/>
<logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
<smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
<amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
<catch-exception-strategy doc:name="Catch Exception Strategy">
<logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
<amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
</catch-exception-strategy>
当我的 SMTP 连接器抛出异常时,异常策略已正确启动,但随后 AMQP 抛出错误,如下所示:
INFO 2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO 2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO 2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO 2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO 2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO 2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO 2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy:
********************************************************************************
Message : Unable to connect to mail transport.
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN 2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO 2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO 2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
我是否遗漏了一些明显的东西,或者我误解了应该如何使用它?如果有人能指出正确的方向,非常感谢。
正如您在评论中指出的那样,smtp:outbound-endpoint
是一个 one-way
端点,其调度操作在另一个线程中执行,因此它与流程的执行并行发生。因此,无论如何都会调用 amqp:acknowledge-message
,与 smtp:outbound-endpoint
操作的成功或失败无关。
您可以通过在流程上设置 processingStrategy="synchronous"
来解决这个问题:这将强制所有端点交互在入站线程上执行。
Shameless plug: this is discussed in details in chapter 11.2 of Mule in Action, Second Edition.
我正在使用 AMQP 端点并在我的 Mule 应用程序的其他地方让它工作(这里提到一个警告 [Possible bug with Mule AMQP transport 3.6.2 community)。
我正在尝试构建可靠的 SMTP 队列,但遇到了问题。
我假设如果 SMTP 连接器无法执行,它会抛出异常,因此通过将消息确认设置为 MANUAL,并捕获 SMTP 异常并拒绝(并重新排队)异常策略中的消息系统应处理 SMTP 中断。
我的代码(稍微简化)如下所示:
<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
<amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct" responseTimeout="10000" doc:name="AMQP-0-9" queueDurable="true"/>
<logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
<smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
<amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
<catch-exception-strategy doc:name="Catch Exception Strategy">
<logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
<amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
</catch-exception-strategy>
当我的 SMTP 连接器抛出异常时,异常策略已正确启动,但随后 AMQP 抛出错误,如下所示:
INFO 2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO 2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO 2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO 2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO 2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO 2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO 2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy:
********************************************************************************
Message : Unable to connect to mail transport.
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
INFO 2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN 2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO 2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO 2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO 2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)
我是否遗漏了一些明显的东西,或者我误解了应该如何使用它?如果有人能指出正确的方向,非常感谢。
正如您在评论中指出的那样,smtp:outbound-endpoint
是一个 one-way
端点,其调度操作在另一个线程中执行,因此它与流程的执行并行发生。因此,无论如何都会调用 amqp:acknowledge-message
,与 smtp:outbound-endpoint
操作的成功或失败无关。
您可以通过在流程上设置 processingStrategy="synchronous"
来解决这个问题:这将强制所有端点交互在入站线程上执行。
Shameless plug: this is discussed in details in chapter 11.2 of Mule in Action, Second Edition.