如何在 Spring 集成中的反应流中使用事务?

How do I use a Transaction in a Reactive Flow in Spring Integration?

我正在使用 R2DBC 和 Spring 集成在数据库中查询项目。我想稍微扩展事务边界以包含一个处理程序——如果处理程序失败,我想回滚数据库操作。但我什至在我的集成流程中明确建立事务性也有困难。流量定义为

@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
                "SELECT * FROM events LIMIT 1")
            .expectSingleResult(true)
            .payloadType(Event.class),
            e - > e.poller(Pollers.cron("30/2 * * * * *")
                .transactional(transactionManager)))
        .channel(MessageChannels.flux())
        .handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
        .channel(MessageChannels.queue("queue"))
        .get();
}

其中事务管理器是这样获取的:

@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new R2dbcTransactionManager(connectionFactory);
}

尝试此操作时,出现异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: 
org.springframework.messaging.MessagingException: 
    nested exception is java.lang.IllegalStateException: 
    Cannot apply reactive transaction to non-reactive return type: class java.lang.Object

查看 Spring 集成代码揭示了问题。

将 TransactionInterceptor 作为建议添加到 AbstractPollingEndpoint#createPollingTask 中创建的轮询任务。该任务是 Callable<Message<?>>.

很快,轮询器(一种反应式实现)触发并调用 AbstractPollingEndpoint#pollForMessage - 导致调用建议 TransactionInterceptor#invoke。它调用 TransactionAspectSupport#invokeWithinTransaction,它确定事务管理器是反应式的,并且轮询任务不是 return 反应式类型(它是从 Callable return 编辑的对象)- 和抛出异常。

故障发生在切换到频道之前或之后的任何事情。

这是有道理的,因为 R2DBC 端点 return 是 Message<Flux<Event>>

所以,我想知道如何访问反应流的事务语义 -

换句话说,我期望事务边界将从轮询开始并在其 return 结束 - 并且事务适用于反应上下文(订阅)的生命周期。实际应用程序确实利用了 FluxChannels,并且处理程序是这些通道之一的消费者。

我意识到我想让这些边界保持苗条,但在我的用例中,我需要对 table 行执行锁定操作,执行外部任务并在交易范围。

我的选择似乎是:

  1. 在流程开始时启动事务 - 这可能会遇到我现在遇到的相同问题,因为没有迹象表明这是一个反应性流程:
    @Transactional
    @Bean
    IntegrationFlow flow(...
  1. 试试像 .transactional(new TransactionInterceptorBuilder().build())

    这将在需要时创建一个事务 - 但我不确定它是否会知道反应式事务管理器及其状态...

  2. 使用一个补偿动作——调用行中数据的先前状态

  3. 使用我自己的处理程序来封装数据库操作和其他操作,将所有操作包装在一起。

  4. 自己管理交易

  5. 根据此论坛的帮助以不同方式配置流程。

有没有办法在定义流程时扩展事务边界?或者什么是完成任务的有效方法?

非常感谢您。

这个完整的测试说明了问题:

@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTransactionalTest {

    @Autowired
    DatabaseClient client;

    R2dbcEntityTemplate entityTemplate;

    @Qualifier("queue")
    @Autowired
    QueueChannel validationChannel;

    @BeforeEach
    public void setup() {
        this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);

        List < String > statements =
            Arrays.asList(
                "DROP TABLE IF EXISTS events;",
                "CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");

        statements.forEach(it - > this.client.sql(it)
            .fetch()
            .rowsUpdated()
            .as(StepVerifier::create)
            .expectNextCount(1)
            .verifyComplete());
    }

    @Test
    public void validateSuccessfulIntegrationFlow() {
        this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
            .then()
            .as(StepVerifier::create)
            .expectComplete()
            .verify(Duration.ofSeconds(1));

        // Validate string
        final Message << ? > message = validationChannel.receive();
        assertThat(message.getPayload()).isEqualTo("Event details");
        assertThat(message.getHeaders()).containsKey("foo");
    }

    @Import(R2dbcDatabaseConfiguration.class)
    @Configuration
    @EnableIntegration
    static class SpringIntegrationConfiguration {

        @Autowired
        R2dbcEntityTemplate r2dbcEntityTemplate;

        @Autowired
        ReactiveTransactionManager transactionManager;

        @Bean
        IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
            return IntegrationFlows
                .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
                        "SELECT * FROM events LIMIT 1")
                    .expectSingleResult(true)
                    .payloadType(Event.class),
                    e - > e.poller(Pollers.cron("30/2 * * * * *")
                        .transactional(transactionManager)))
                .channel(MessageChannels.flux())
                .handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
                .channel(MessageChannels.queue("queue"))
                .get();
        }

        public Mono < Message < String >> doSomethingUsingSameTransaction(Mono < Event > eventMono) {
            return eventMono.map(event - >
                MessageBuilder
                .withPayload(event.getDetails())
                .setHeader("foo", "baz")
                .build());
        }
    }

    @Configuration
    @EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
    static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {

        @Bean
        @Override
        public ConnectionFactory connectionFactory() {
            return createConnectionFactory();
        }

        public ConnectionFactory createConnectionFactory() {
            return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
                .inMemory("r2dbc")
                .username("sa")
                .password("")
                .option("DB_CLOSE_DELAY=-1").build());
        }

        @Bean
        public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
            return DatabaseClient.create(connectionFactory);
        }

        @Bean
        public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
            return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
        }

        @Bean
        ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new R2dbcTransactionManager(connectionFactory);
        }

    }

    @Table("events")
    @Getter
    @Setter
    @RequiredArgsConstructor
    static class Event {
        @Id
        private Integer id;

        @NonNull
        public Instant timestamp;
        @NonNull
        public String details;

    }

}

好吧,声明式的方式确实是不可能的,因为我们没有钩子来注入到那个级别中间的反应类型。

尝试从 Java DSL 的 fluxTransform() 中查看 TransactionalOperator 及其用法:

/**
 * Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
 * wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
 * and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
 * @param fluxFunction the {@link Function} to process data reactive manner.
 * @param <I> the input payload type.
 * @param <O> the output type.
 * @return the current {@link BaseIntegrationFlowDefinition}.
 */
@SuppressWarnings(UNCHECKED)
public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {

另请参阅 Spring 框架文档:https://docs.spring.io/spring-framework/docs/current/reference/html/data-access.html#tx-prog-operator

我会考虑您在问题中提到的级别的声明式反应式交易...