如何在 Spring 集成中的反应流中使用事务?
How do I use a Transaction in a Reactive Flow in Spring Integration?
我正在使用 R2DBC 和 Spring 集成在数据库中查询项目。我想稍微扩展事务边界以包含一个处理程序——如果处理程序失败,我想回滚数据库操作。但我什至在我的集成流程中明确建立事务性也有困难。流量定义为
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
其中事务管理器是这样获取的:
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
尝试此操作时,出现异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented:
org.springframework.messaging.MessagingException:
nested exception is java.lang.IllegalStateException:
Cannot apply reactive transaction to non-reactive return type: class java.lang.Object
查看 Spring 集成代码揭示了问题。
将 TransactionInterceptor 作为建议添加到 AbstractPollingEndpoint#createPollingTask
中创建的轮询任务。该任务是 Callable<Message<?>>
.
很快,轮询器(一种反应式实现)触发并调用 AbstractPollingEndpoint#pollForMessage
- 导致调用建议 TransactionInterceptor#invoke
。它调用 TransactionAspectSupport#invokeWithinTransaction
,它确定事务管理器是反应式的,并且轮询任务不是 return 反应式类型(它是从 Callable return 编辑的对象)- 和抛出异常。
故障发生在切换到频道之前或之后的任何事情。
这是有道理的,因为 R2DBC 端点 return 是 Message<Flux<Event>>
。
所以,我想知道如何访问反应流的事务语义 -
换句话说,我期望事务边界将从轮询开始并在其 return 结束 - 并且事务适用于反应上下文(订阅)的生命周期。实际应用程序确实利用了 FluxChannels,并且处理程序是这些通道之一的消费者。
我意识到我想让这些边界保持苗条,但在我的用例中,我需要对 table 行执行锁定操作,执行外部任务并在交易范围。
我的选择似乎是:
- 在流程开始时启动事务 - 这可能会遇到我现在遇到的相同问题,因为没有迹象表明这是一个反应性流程:
@Transactional
@Bean
IntegrationFlow flow(...
试试像
.transactional(new TransactionInterceptorBuilder().build())
这将在需要时创建一个事务 - 但我不确定它是否会知道反应式事务管理器及其状态...
使用一个补偿动作——调用行中数据的先前状态
使用我自己的处理程序来封装数据库操作和其他操作,将所有操作包装在一起。
自己管理交易
根据此论坛的帮助以不同方式配置流程。
有没有办法在定义流程时扩展事务边界?或者什么是完成任务的有效方法?
非常感谢您。
这个完整的测试说明了问题:
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTransactionalTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Qualifier("queue")
@Autowired
QueueChannel validationChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List < String > statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it - > this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(1));
// Validate string
final Message << ? > message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Autowired
ReactiveTransactionManager transactionManager;
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
public Mono < Message < String >> doSomethingUsingSameTransaction(Mono < Event > eventMono) {
return eventMono.map(event - >
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
好吧,声明式的方式确实是不可能的,因为我们没有钩子来注入到那个级别中间的反应类型。
尝试从 Java DSL 的 fluxTransform()
中查看 TransactionalOperator
及其用法:
/**
* Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
* wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
* and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
* @param fluxFunction the {@link Function} to process data reactive manner.
* @param <I> the input payload type.
* @param <O> the output type.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
@SuppressWarnings(UNCHECKED)
public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
另请参阅 Spring 框架文档:https://docs.spring.io/spring-framework/docs/current/reference/html/data-access.html#tx-prog-operator。
我会考虑您在问题中提到的级别的声明式反应式交易...
我正在使用 R2DBC 和 Spring 集成在数据库中查询项目。我想稍微扩展事务边界以包含一个处理程序——如果处理程序失败,我想回滚数据库操作。但我什至在我的集成流程中明确建立事务性也有困难。流量定义为
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
其中事务管理器是这样获取的:
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
尝试此操作时,出现异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented:
org.springframework.messaging.MessagingException:
nested exception is java.lang.IllegalStateException:
Cannot apply reactive transaction to non-reactive return type: class java.lang.Object
查看 Spring 集成代码揭示了问题。
将 TransactionInterceptor 作为建议添加到 AbstractPollingEndpoint#createPollingTask
中创建的轮询任务。该任务是 Callable<Message<?>>
.
很快,轮询器(一种反应式实现)触发并调用 AbstractPollingEndpoint#pollForMessage
- 导致调用建议 TransactionInterceptor#invoke
。它调用 TransactionAspectSupport#invokeWithinTransaction
,它确定事务管理器是反应式的,并且轮询任务不是 return 反应式类型(它是从 Callable return 编辑的对象)- 和抛出异常。
故障发生在切换到频道之前或之后的任何事情。
这是有道理的,因为 R2DBC 端点 return 是 Message<Flux<Event>>
。
所以,我想知道如何访问反应流的事务语义 -
换句话说,我期望事务边界将从轮询开始并在其 return 结束 - 并且事务适用于反应上下文(订阅)的生命周期。实际应用程序确实利用了 FluxChannels,并且处理程序是这些通道之一的消费者。
我意识到我想让这些边界保持苗条,但在我的用例中,我需要对 table 行执行锁定操作,执行外部任务并在交易范围。
我的选择似乎是:
- 在流程开始时启动事务 - 这可能会遇到我现在遇到的相同问题,因为没有迹象表明这是一个反应性流程:
@Transactional
@Bean
IntegrationFlow flow(...
试试像
.transactional(new TransactionInterceptorBuilder().build())
这将在需要时创建一个事务 - 但我不确定它是否会知道反应式事务管理器及其状态...
使用一个补偿动作——调用行中数据的先前状态
使用我自己的处理程序来封装数据库操作和其他操作,将所有操作包装在一起。
自己管理交易
根据此论坛的帮助以不同方式配置流程。
有没有办法在定义流程时扩展事务边界?或者什么是完成任务的有效方法?
非常感谢您。
这个完整的测试说明了问题:
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTransactionalTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Qualifier("queue")
@Autowired
QueueChannel validationChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List < String > statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it - > this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(1));
// Validate string
final Message << ? > message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Autowired
ReactiveTransactionManager transactionManager;
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
public Mono < Message < String >> doSomethingUsingSameTransaction(Mono < Event > eventMono) {
return eventMono.map(event - >
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
好吧,声明式的方式确实是不可能的,因为我们没有钩子来注入到那个级别中间的反应类型。
尝试从 Java DSL 的 fluxTransform()
中查看 TransactionalOperator
及其用法:
/**
* Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
* wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
* and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
* @param fluxFunction the {@link Function} to process data reactive manner.
* @param <I> the input payload type.
* @param <O> the output type.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
@SuppressWarnings(UNCHECKED)
public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
另请参阅 Spring 框架文档:https://docs.spring.io/spring-framework/docs/current/reference/html/data-access.html#tx-prog-operator。
我会考虑您在问题中提到的级别的声明式反应式交易...