Spring-AMQP Transactional publish without Exception

Spring-AMQP Transactionnal publish without Exception

我正在尝试将 Transactional RabbitMQ 通道与 Spring-AMQP 一起使用,但我想实际吞下异常以记录它们并能够恢复它们。

使用 channelTransacted=true 强制 Channel 也加入当前的 transactionManager(在我的例子中是 Hibernate),这导致提交异常被重新抛出 @Transactionnal 边界,导致上层失败而无法捕捉并记录它。

我还尝试手动将发布附加到事务,以便它仅在提交成功后执行:

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String message) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                rabbitTemplate.convertAndSend(routingKey, message);
            } catch (Exception exception) {
                logger.error("Error while publishing message to RabbitMQ ");
            }
        }
});

以这种方式使用:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

但在那种情况下我不能使用 channelTransacted=true 因为它会将 registeringSynchronization 嵌套在另一个 registeringSynchronization 中并且根本无法调用...

有什么办法可以实现吗?

更新:理想情况下,我想覆盖 ConnectionFactoryUtils class 中使用的 RabbitResourceSynchronization,但它是私有的 class,没有使用

实例化的工厂
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder, connectionFactory, synched));

我实施的解决方案是在主事务提交后在新事务中进行发布。

第一次通话:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

此方法在主事务提交后注册进行发布。

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String event) {

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                publishFailSafe(routingKey, event);
            } catch (Exception exception) {
                //Do some recovering
            }
        }
    });
}

主事务提交后,此事务将进行发布。当通道被处理时,它会在提交新事务时提交消息,并且只有那个失败,并且错误将在以前的方法中被捕获。

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishFailSafe(String routingKey, String event) {
    try {
        rabbitTemplate.convertAndSend(routingKey.getRoutingKey(), event);
    } catch (Exception exception) {
        //Do some recovering
    }
}