spring 集成 amqp 侦听器已处理
spring integration amqp listener transacted
当消息侦听器容器上的 channelTransacted = true 时,acknowledgeMode.AUTO 和 acknowledgeMode.MANUAL 之间有什么区别?
我知道当我想使用 acknowledgeMode.MANUAL with channelTransacted=false 时,侦听器必须通过调用 Channel.basicAck() 来确认消息。当侦听器容器具有 channelTransacted = true 时,是否不需要使用 acknowledgeMode.MANUAL?因为当我将 acknowledgeMode.MANUAL 和 channelTransacted=true 与调用 Channel.basicAck() 结合使用时,我得到了异常。
@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, EsignContstants.QUEUE)
.acknowledgeMode(AcknowledgeMode.MANUAL)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(false)
.channelTransacted(true)
.transactionManager(transactionManager)
.txSize(1)
)
.enrichHeaders(s -> s.header(MessageHeaders.REPLY_CHANNEL, "basicAck.flow", true))
.log("amqpInbound.start-process")
.channel("insertAndStartProcess.input")
.get();
}
异常
2018-02-19 09:57:27.478 ERROR 4664 --- [rContainer#0-45] o.s.t.s.TransactionSynchronizationUtils : TransactionSynchronization.afterCompletion threw exception
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:168) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:264) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:1002) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:977) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:806) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1198) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1318) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:50) ~[amqp-client-4.0.3.jar:4.0.3]
at sun.reflect.GeneratedMethodAccessor151.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at com.sun.proxy.$Proxy191.txCommit(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar:4.0.3]
... 19 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection.access0(AMQConnection.java:47) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.3.jar:4.0.3]
... 1 common frames omitted
使用交易应该没有任何区别;该错误意味着在您收到消息和发送 basicAck
之间由于某种原因通道已关闭,对于事务我们在事务提交之前不会检测到它。
我们最近 added a fix to detect the channel was closed 拒绝 basicAck
这种情况。
修复在 2.0 中。2.RELEASE。
当消息侦听器容器上的 channelTransacted = true 时,acknowledgeMode.AUTO 和 acknowledgeMode.MANUAL 之间有什么区别?
我知道当我想使用 acknowledgeMode.MANUAL with channelTransacted=false 时,侦听器必须通过调用 Channel.basicAck() 来确认消息。当侦听器容器具有 channelTransacted = true 时,是否不需要使用 acknowledgeMode.MANUAL?因为当我将 acknowledgeMode.MANUAL 和 channelTransacted=true 与调用 Channel.basicAck() 结合使用时,我得到了异常。
@Bean(name = "amqpInboundEsignRequest")
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory, PlatformTransactionManager transactionManager) {
return IntegrationFlows.from(
Amqp.inboundAdapter(connectionFactory, EsignContstants.QUEUE)
.acknowledgeMode(AcknowledgeMode.MANUAL)
.messageConverter(new Jackson2JsonMessageConverter())
.autoStartup(false)
.channelTransacted(true)
.transactionManager(transactionManager)
.txSize(1)
)
.enrichHeaders(s -> s.header(MessageHeaders.REPLY_CHANNEL, "basicAck.flow", true))
.log("amqpInbound.start-process")
.channel("insertAndStartProcess.input")
.get();
}
异常
2018-02-19 09:57:27.478 ERROR 4664 --- [rContainer#0-45] o.s.t.s.TransactionSynchronizationUtils : TransactionSynchronization.afterCompletion threw exception
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:168) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:264) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:168) ~[spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:1002) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:977) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:806) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:730) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) [spring-tx-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1198) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1318) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:50) ~[amqp-client-4.0.3.jar:4.0.3]
at sun.reflect.GeneratedMethodAccessor151.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_131]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_131]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
at com.sun.proxy.$Proxy191.txCommit(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:164) ~[spring-rabbit-1.7.4.RELEASE.jar:na]
... 11 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar:4.0.3]
... 19 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection.access0(AMQConnection.java:47) ~[amqp-client-4.0.3.jar:4.0.3]
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572) ~[amqp-client-4.0.3.jar:4.0.3]
... 1 common frames omitted
使用交易应该没有任何区别;该错误意味着在您收到消息和发送 basicAck
之间由于某种原因通道已关闭,对于事务我们在事务提交之前不会检测到它。
我们最近 added a fix to detect the channel was closed 拒绝 basicAck
这种情况。
修复在 2.0 中。2.RELEASE。