如何处理 Spring 集成流的事务(Java DSL)

How to handle transactions for Spring integration flows (Java DSL)

如何在 spring 集成 (Java DSL) 中为完整流程定义事务?

通过 Spring 集成,我们可以定义一个示例流程:

@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

我需要一个跨越整个流程的事务。目前,当我使用 'aMessageTransformer' 访问数据库时,事务将在处理完此消息转换器后关闭。 但是我需要一个在处理 'anotherMessageTransformer'?

时仍未提交的事务

我预计我只需要添加一个“@Transactional”(或@Transactional(propagation = Propagation.REQUIRED,readOnly = true))

@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

但这会导致 'no session exception' 在 'anotherMessageTransformer'

您需要遵循此 documentation,因此将其添加到您的流程中:

.transform(aMessageTransformer, e -> e.transactional(true))

.transactional() 大约是:

/**
 * Specify a {@link TransactionInterceptor} {@link Advice} with default
 * {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
 * {@link MessageHandler}.
 * @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
 * {@code false} - regular {@link TransactionInterceptor}; {@code true} -
 * {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
 * extension.
 * @return the spec.
 */
public S transactional(boolean handleMessageAdvice) {

TransactionHandleMessageAdvice表示:

* When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
 * to the {@link MessageHandler#handleMessage}
 * (not to the
 * {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
 * therefore the entire downstream process is wrapped to the transaction.