Mule AMQP 连接器尝试重新排队消息失败

Mule AMQP connector fails trying to requeue message

我正在使用 AMQP 端点并在我的 Mule 应用程序的其他地方让它工作(这里提到一个警告 [Possible bug with Mule AMQP transport 3.6.2 community)。

我正在尝试构建可靠的 SMTP 队列,但遇到了问题。

我假设如果 SMTP 连接器无法执行,它会抛出异常,因此通过将消息确认设置为 MANUAL,并捕获 SMTP 异常并拒绝(并重新排队)异常策略中的消息系统应处理 SMTP 中断。

我的代码(稍微简化)如下所示:

<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
  <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct"  responseTimeout="10000"  doc:name="AMQP-0-9" queueDurable="true"/>
    <logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
    <smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
    <amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
    <catch-exception-strategy doc:name="Catch Exception Strategy">
        <logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
        <amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
    </catch-exception-strategy>

当我的 SMTP 连接器抛出异常时,异常策略已正确启动,但随后 AMQP 抛出错误,如下所示:

INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO  2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO  2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy: 
********************************************************************************
Message               : Unable to connect to mail transport.
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
  java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
  com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
  org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN  2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO  2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://guest@127.0.0.1:5672/,3)

我是否遗漏了一些明显的东西,或者我误解了应该如何使用它?如果有人能指出正确的方向,非常感谢。

正如您在评论中指出的那样,smtp:outbound-endpoint 是一个 one-way 端点,其调度操作在另一个线程中执行,因此它与流程的执行并行发生。因此,无论如何都会调用 amqp:acknowledge-message,与 smtp:outbound-endpoint 操作的成功或失败无关。

您可以通过在流程上设置 processingStrategy="synchronous" 来解决这个问题:这将强制所有端点交互在入站线程上执行。

Shameless plug: this is discussed in details in chapter 11.2 of Mule in Action, Second Edition.