如何处理 Spring 集成流的事务(Java DSL)
How to handle transactions for Spring integration flows (Java DSL)
如何在 spring 集成 (Java DSL) 中为完整流程定义事务?
通过 Spring 集成,我们可以定义一个示例流程:
@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
我需要一个跨越整个流程的事务。目前,当我使用 'aMessageTransformer' 访问数据库时,事务将在处理完此消息转换器后关闭。
但是我需要一个在处理 'anotherMessageTransformer'?
时仍未提交的事务
我预计我只需要添加一个“@Transactional”(或@Transactional(propagation = Propagation.REQUIRED,readOnly = true))
@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
但这会导致 'no session exception' 在 'anotherMessageTransformer'
您需要遵循此 documentation,因此将其添加到您的流程中:
.transform(aMessageTransformer, e -> e.transactional(true))
.transactional()
大约是:
/**
* Specify a {@link TransactionInterceptor} {@link Advice} with default
* {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
* {@link MessageHandler}.
* @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
* {@code false} - regular {@link TransactionInterceptor}; {@code true} -
* {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
* extension.
* @return the spec.
*/
public S transactional(boolean handleMessageAdvice) {
TransactionHandleMessageAdvice
表示:
* When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
* to the {@link MessageHandler#handleMessage}
* (not to the
* {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
* therefore the entire downstream process is wrapped to the transaction.
如何在 spring 集成 (Java DSL) 中为完整流程定义事务?
通过 Spring 集成,我们可以定义一个示例流程:
@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
我需要一个跨越整个流程的事务。目前,当我使用 'aMessageTransformer' 访问数据库时,事务将在处理完此消息转换器后关闭。 但是我需要一个在处理 'anotherMessageTransformer'?
时仍未提交的事务我预计我只需要添加一个“@Transactional”(或@Transactional(propagation = Propagation.REQUIRED,readOnly = true))
@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
return IntegrationFlows.from(myInboundChannel)
.transform(aMessageTransformer)
.transform(anotherMessageTransformer)
.channel(anOutputChannel)
.get();
}
但这会导致 'no session exception' 在 'anotherMessageTransformer'
您需要遵循此 documentation,因此将其添加到您的流程中:
.transform(aMessageTransformer, e -> e.transactional(true))
.transactional()
大约是:
/**
* Specify a {@link TransactionInterceptor} {@link Advice} with default
* {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
* {@link MessageHandler}.
* @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
* {@code false} - regular {@link TransactionInterceptor}; {@code true} -
* {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
* extension.
* @return the spec.
*/
public S transactional(boolean handleMessageAdvice) {
TransactionHandleMessageAdvice
表示:
* When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
* to the {@link MessageHandler#handleMessage}
* (not to the
* {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
* therefore the entire downstream process is wrapped to the transaction.