Spring 事务同步不工作(TransactionalEventListener)

Spring transaction synchronisation not working (TransactionalEventListener)

我知道这个问题在这个网站上被问到的格式略有不同,但按照这些帖子中给出的建议,我一无所获。我已经在这上面花了将近两天了,但我没有想法。

我们有一个 spring 启动微服务,它只是侦听进入 IBM MQ 队列的消息,做一些转换并将其转发到 Kafka 主题。我们希望这是事务性的,这样就不会丢失消息(对我们的业务至关重要)。我们还希望能够对事务提交和回滚事件做出反应,以达到监控和支持的目的。

我只是关注了互联网上的一些“操作方法”,我可以使用如下所示的 @Transactional 注释以声明方式轻松实现事务行为:

@Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Throwable.class)
@JmsListener(destination = "DEV.QUEUE.1", containerFactory = "mqListenerContainerFactory", concurrency = "10")
public void receiveMessage(@Headers Map<String, Object> jmsHeaders, String message) {
    // Some work here including forward to Kafka topic:
    // ...
    // ...

    // Then publish an event which is supposed to be acted on:
    applicationEventPublisher.publishEvent(new MqConsumedEvent("JMS Correlation ID", "Message Payload"));

    // Uncommented exception below to create a rollback scenario
    // or comment it out to have the processing completed
    throw new RuntimeException("No good Pal!");
}

正如预期的那样,当播放消息时出现异常,由于事务管理器一次又一次回滚,处理将永远旋转。这对我们有好处。

现在我们希望在我们的侦听器方法中发布的 MqConsumedEvent 被下面的 onRollback 方法拦截:

@Component
@Slf4j
public class MqConsumedEventListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = MqConsumedEvent.class)
    public void onCommit(MqConsumedEvent event) {
        log.info("MQ message with correlation id {} committed to Kafka", event.getCorrelationId());
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, classes = MqConsumedEvent.class)
    public void onRollback(MqConsumedEvent event) {
        log.info("Failed to commit MQ message with correlation id {} to Kafka", event.getCorrelationId());
    }
}

这不会发生。类似的注释掉监听器中抛出的Exception使得我们的MQ消息被传递给Kafka。但是 onCommit 方法没有执行。

从进一步的研究和 spring 调试来看,我认为这没有执行,因为 spring 认为发布事件时没有活动事务,因此我的事件只是被忽略了。评估 TransactionSynchronizationManager.isActualTransactionActive() 并将其打印在日志中显示 false 这很难解释,因为正如我所说,当故意抛出异常时事务会按预期回滚。

提前感谢您的意见。

更新:

我设置的断点让我执行了这个 ApplicationListenerMethodTransactionalAdapter class:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (TransactionSynchronizationManager.isSynchronizationActive() &&
            TransactionSynchronizationManager.isActualTransactionActive()) {
        TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
        TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
    }
    else if (this.annotation.fallbackExecution()) {
        if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
            logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
        }
        processEvent(event);
    }
    else {
        // No transactional event execution at all
        if (logger.isDebugEnabled()) {
            logger.debug("No transaction is active - skipping " + event);
        }
    }
}

出于某种原因,我不理解第一个条件是否为假。然后回退执行是 false 因为我没有在我的 @TransactionalEventListener 用法中设置它 true 它将最终在 else 分支上并跳过事件。

我遇到了同样的问题。就我而言,事实证明我在我的项目中定义了一个 ApplicationEventMulticaster

    @Bean
    public ApplicationEventMulticaster applicationEventMulticaster() {
        var eventMulticaster = new SimpleApplicationEventMulticaster();
        eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return eventMulticaster;
    }

这使得 ApplicationListenerMethodTransactionalAdapter 在不同的线程中执行(不是发布事件的线程)。这就是为什么 TransactionSynchronizationManager.isActualTransactionActive() 最终为假并且事件没有被执行的原因。

删除 ApplicationEventMulticaster 的定义对我来说效果很好。