spring 将 redis 轮询器与事务集成
spring integration redis poller with transaction
我使用spring整数redis,从redis轮询消息,像这样:
@Bean
public PseudoTransactionManager transactionManager() {
final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
return pseudoTransactionManager;
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
return transactionSynchronizationFactory;
}
@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {
SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(10);
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
pollerMetadata.setTrigger(periodicTrigger);
sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);
return sourcePollingChannelAdapterFactoryBean;
}
@Bean
public TestHandler testHandler() {
return new TestHandler();
}
@Bean
public IntegrationFlow trans() {
return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}
正常情况下,处理完成后,会进行afterCommit #store.rename('commit')
操作,但现在不做,会继续轮询,调试,发现:AbstractPollingEndpoint#bindResourceHolderIfNecessary
TransactionSynchronizationManager.isActualTransactionActive()
总是错误的。
我该如何改进程序。
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
还不够。您缺少将 adviceChain
添加到 PollerMetadata
中,其中一个应该是 TransactionInterceptor
。为方便起见,请参阅 TransactionInterceptorBuilder
。
尽管完全不清楚如果项目中已经有 Java DSL 并且 IntegrationFlow
可以为您处理所有样板代码,为什么还要手动使用 SourcePollingChannelAdapterFactoryBean
。我的意思是你需要调查:
/**
* Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageSource}.
* In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
* {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
* @param messageSource the {@link MessageSource} to populate.
* @param endpointConfigurer the {@link Consumer} to provide more options for the
* {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
* @return new {@link IntegrationFlowBuilder}.
* @see MessageSource
* @see SourcePollingChannelAdapterSpec
*/
public static IntegrationFlowBuilder from(MessageSource<?> messageSource,
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
并在 PollerSpec
上配置 .transactional()
和 transactionSynchronizationFactory()
。
我使用spring整数redis,从redis轮询消息,像这样:
@Bean
public PseudoTransactionManager transactionManager() {
final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
return pseudoTransactionManager;
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
return transactionSynchronizationFactory;
}
@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {
SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(10);
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
pollerMetadata.setTrigger(periodicTrigger);
sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);
return sourcePollingChannelAdapterFactoryBean;
}
@Bean
public TestHandler testHandler() {
return new TestHandler();
}
@Bean
public IntegrationFlow trans() {
return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}
正常情况下,处理完成后,会进行afterCommit #store.rename('commit')
操作,但现在不做,会继续轮询,调试,发现:AbstractPollingEndpoint#bindResourceHolderIfNecessary
TransactionSynchronizationManager.isActualTransactionActive()
总是错误的。
我该如何改进程序。
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
还不够。您缺少将 adviceChain
添加到 PollerMetadata
中,其中一个应该是 TransactionInterceptor
。为方便起见,请参阅 TransactionInterceptorBuilder
。
尽管完全不清楚如果项目中已经有 Java DSL 并且 IntegrationFlow
可以为您处理所有样板代码,为什么还要手动使用 SourcePollingChannelAdapterFactoryBean
。我的意思是你需要调查:
/**
* Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageSource}.
* In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
* {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
* @param messageSource the {@link MessageSource} to populate.
* @param endpointConfigurer the {@link Consumer} to provide more options for the
* {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
* @return new {@link IntegrationFlowBuilder}.
* @see MessageSource
* @see SourcePollingChannelAdapterSpec
*/
public static IntegrationFlowBuilder from(MessageSource<?> messageSource,
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
并在 PollerSpec
上配置 .transactional()
和 transactionSynchronizationFactory()
。