SimpleMessageListenerContainer 在 TransactionSystemException 之后持续消费
SimpleMessageListenerContainer is continuously consuming after TransactionSystemException
我不希望 SimpleMessageListenerContainer
在我的事务标记为回滚时继续使用消息。
我没有在我的配置文件中添加 listener-container
并尝试重新抛出 AmqpRejectAndDontRequeueException
。它不工作。
这是我的代码库:
@Transactional
public class MySecondService {
@Resource
private MySecondRepository mySecondRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-output-channel",
outputChannel = "my-Second-servie-output-channel")
public String saveEntity(final MyTestEntity myTestEntity)
throws AmqpRejectAndDontRequeueException {
try {
mySecondRepository.save(myTestEntity);
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("exception");
}
}
}
spring-整合-context.xml
<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
</int:chain>
<int-amqp:inbound-channel-adapter
channel="transaction-inbound-channel"
queue-names="sample.queue"
concurrent-consumers="5"
error-channel="failed-channel"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
transaction-manager="transactionManager" />
<rabbit:connection-factory
id="rabbitConnectionFactory"
connection-factory="rcf"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
/>
<bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="${spring.rabbitmq.host}"/>
<property name="requestedHeartbeat" value="10" />
</bean>
这是堆栈跟踪:
WARN [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1062) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74) ~[hibernate-entitymanager-4.3.11.Final.jar:4.3.11.Final]
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
... 7 common frames omitted
2016-10-19 00:52:50,673 [SimpleAsyncTaskExecutor-1] INFO [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-QBWGIY7co5vUpXwGDXDDBg=sample.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://test@999.99.99.999:9999/,2), acknowledgeMode=AUTO local queue size=0
我不希望这种情况发生org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer
有人可以帮助我了解我还需要添加什么来重新抛出 AmqpRejectAndDontRequeueException
。提前致谢。
你不显示流量
如果您只调用单个服务,则不要将事务管理器添加到入站通道适配器。
事务将在调用服务时启动;只要 failed-channel
上的错误流不抛出异常(或抛出 AmqpRejectAndDontRequeueException
),消息就不会重新排队。
如果您有多个服务想要参与单个事务,您可以使用 AOP 通知使 transaction-inbound-channel
事务化,下游流将全部 运行 在该事务中。
或者,如果您从不希望消息重新排队,请在入站适配器的容器(您必须将其连接起来)上将 defaultRequeueRejected
设置为 false
作为 <bean/>
).
我不希望 SimpleMessageListenerContainer
在我的事务标记为回滚时继续使用消息。
我没有在我的配置文件中添加 listener-container
并尝试重新抛出 AmqpRejectAndDontRequeueException
。它不工作。
这是我的代码库:
@Transactional
public class MySecondService {
@Resource
private MySecondRepository mySecondRepository;
@Transactional(propagation = Propagation.REQUIRED)
@ServiceActivator(inputChannel = "my-first-servie-output-channel",
outputChannel = "my-Second-servie-output-channel")
public String saveEntity(final MyTestEntity myTestEntity)
throws AmqpRejectAndDontRequeueException {
try {
mySecondRepository.save(myTestEntity);
} catch (Exception e) {
throw new AmqpRejectAndDontRequeueException("exception");
}
}
}
spring-整合-context.xml
<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel">
</int:chain>
<int-amqp:inbound-channel-adapter
channel="transaction-inbound-channel"
queue-names="sample.queue"
concurrent-consumers="5"
error-channel="failed-channel"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="*"
transaction-manager="transactionManager" />
<rabbit:connection-factory
id="rabbitConnectionFactory"
connection-factory="rcf"
host="${spring.rabbitmq.host}"
port="${spring.rabbitmq.port}"
username="${spring.rabbitmq.username}"
password="${spring.rabbitmq.password}"
/>
<bean id="rcf" class="com.rabbitmq.client.ConnectionFactory">
<property name="host" value="${spring.rabbitmq.host}"/>
<property name="requestedHeartbeat" value="10" />
</bean>
这是堆栈跟踪:
WARN [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1062) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) ~[spring-rabbit-1.5.3.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly
at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74) ~[hibernate-entitymanager-4.3.11.Final.jar:4.3.11.Final]
at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE]
... 7 common frames omitted
2016-10-19 00:52:50,673 [SimpleAsyncTaskExecutor-1] INFO [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-QBWGIY7co5vUpXwGDXDDBg=sample.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://test@999.99.99.999:9999/,2), acknowledgeMode=AUTO local queue size=0
我不希望这种情况发生org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer
有人可以帮助我了解我还需要添加什么来重新抛出 AmqpRejectAndDontRequeueException
。提前致谢。
你不显示流量
如果您只调用单个服务,则不要将事务管理器添加到入站通道适配器。
事务将在调用服务时启动;只要 failed-channel
上的错误流不抛出异常(或抛出 AmqpRejectAndDontRequeueException
),消息就不会重新排队。
如果您有多个服务想要参与单个事务,您可以使用 AOP 通知使 transaction-inbound-channel
事务化,下游流将全部 运行 在该事务中。
或者,如果您从不希望消息重新排队,请在入站适配器的容器(您必须将其连接起来)上将 defaultRequeueRejected
设置为 false
作为 <bean/>
).