如何将 R2dbcMessageSource 的结果转换为从查询结果派生的新消息?

How do I transform the result of R2dbcMessageSource into a new Message derived from the query result?

我正在使用 R2dbcMessageSource 查询一个项目的 table 并希望使用其中一个列来创建消息以发送到消息处理程序。

查询的结果是Message<Mono<Event>>,这很合理。我想 event.getDetails 并创建一条消息。

使用 DirectChannelTransformer,我试过这样的方法

  @Bean
  @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
  public GenericTransformer<Message<Mono<Event>>, Message<String>> monoToString() {
    return message -> {
      Mono<Event> payload = message.getPayload();
      final String details = payload.block();
      if (details == null) {
        return null;
      }
      return MessageBuilder
        .withPayload(details)
        .setHeader("foo", "baz")
        .build();
    };
  }

当然,我需要将它放在它自己的执行程序上以避免占用线程,但无论如何它都不起作用 - 它抛出一个 class 转换异常。

error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1651b34e]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoOnAssembly cannot be cast to org.springframework.messaging.Message, failedMessage=GenericMessage [payload=Mono.flatMap

这似乎是因为 Transformer 是在程序集上计算的。

使用 FluxMessageChannel 似乎很奇怪,因为源本身不是反应式的,但它允许我使用服务激活器转换有效负载。

    @Bean
    @ServiceActivator(inputChannel = "fromR2dbcChannel", reactive = @Reactive("publishMessageWithString"))
    public MessageHandler toValidator() {
      return message -> validationChannel().send(message);
    }

    @Bean
    public Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> publishMessageWithString() {
      return flux -> flux
        .as(toMessageWithString());
    }

    private Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> toMessageWithString() {
      return messageFlux -> messageFlux.map(message -> {
        final Mono<Event> payload = message.getPayload();
        if (payload != null) {
          final Event event = payload.block();
          if (event != null) {
            return MessageBuilder
              .withPayload(event.getDetails())
              .setHeader("foo", "baz")
              .build();
          }
        }
        return null;
      });
    }

在现实生活中,附加到服务激活器的消息处理程序本身并不理解流量。这是一个 KinesisMessageHandler.

虽然这似乎有效,但感觉很尴尬。有没有更好的方法来改造它对反应堆友好?或者订阅和调用处理程序的适配器?首尾相传Message<Mono<Event>>似乎很合理,所以后者未必合适。谢谢!

更新

感谢 Artem,工作测试如下:

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.dialect.H2Dialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.r2dbc.inbound.R2dbcMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTest {

  @Autowired
  DatabaseClient client;

  R2dbcEntityTemplate entityTemplate;

  @Autowired
  QueueChannel validationChannel;

  @Autowired
  FluxMessageChannel fluxChannel;

  @BeforeEach
  public void setup() {
    this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);

    List<String> statements =
      Arrays.asList(
        "DROP TABLE IF EXISTS events;",
        "CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");

    statements.forEach(it -> this.client.sql(it)
      .fetch()
      .rowsUpdated()
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete());
  }

  @Test
  public void validateSuccessfulIntegrationFlow() throws InterruptedException {
    this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
      .then()
      .as(StepVerifier::create)
      .verifyComplete();

    // Validate string
    final Message<?> message = validationChannel.receive();
    assertThat(message.getPayload()).isEqualTo("Event details");
    assertThat(message.getHeaders()).containsKey("foo");
  }

  @Import(R2dbcDatabaseConfiguration.class)
  @Configuration
  @EnableIntegration
  static class SpringIntegrationConfiguration {

    @Autowired
    R2dbcEntityTemplate r2dbcEntityTemplate;

    @Bean
    FluxMessageChannel fromR2dbcChannel() {
      return new FluxMessageChannel();
    }

    @BridgeTo(value = "validationChannel")
    @Bean
    FluxMessageChannel fluxChannel() {
      return new FluxMessageChannel();
    }

    @Bean
    QueueChannel validationChannel() {
      return new QueueChannel();
    }

    @ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "fluxChannel", async = "true")
    public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
      return eventMono.map(event ->
        MessageBuilder
          .withPayload(event.getDetails())
          .setHeader("foo", "baz")
          .build());
    }

    // Cron expression is only here because Spring environment is fully initialized before test
    // creates table, so wait for the test to start.
    @Bean
    @InboundChannelAdapter(value = "fromR2dbcChannel", poller = @Poller(cron = "30/2 * * * * *"))
    public R2dbcMessageSource r2dbcMessageSourceSelectOne(R2dbcEntityTemplate r2dbcEntityTemplate) {
      R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(r2dbcEntityTemplate,
        "SELECT * FROM events LIMIT 1");
      r2dbcMessageSource.setPayloadType(Event.class);
      r2dbcMessageSource.setExpectSingleResult(true);
      return r2dbcMessageSource;
    }

  }

  @Configuration
  @EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
  static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
      return createConnectionFactory();
    }

    public ConnectionFactory createConnectionFactory() {
      return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
        .inMemory("r2dbc")
        .username("sa")
        .password("")
        .option("DB_CLOSE_DELAY=-1").build());
    }

    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
      return DatabaseClient.create(connectionFactory);
    }

    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
      return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
    }

  }

  @Table("events")
  @Getter
  @Setter
  @RequiredArgsConstructor
  static class Event {
    @Id
    private Integer id;

    @NonNull
    public Instant timestamp;
    @NonNull
    public String details;

  }

}

我建议重新考虑您对反应流的看法,并开始避免手动调用 .block()

考虑到您的整个流程需求,将 fromR2dbcChannel 设为 FluxMessageChannel 确实更好,因此您来自 R2DBC 的 Mono 将在内部顺利订阅和处理有数据的话框架就可以了

您的 @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer") 然后可以只处理一个普通的 Event 作为输入参数。然后你的 KinesisMessageHandler 可以很好地处理你从 event.getDetails().

发送到 palyoad 中的输入通道的任何内容

更新

所以,我的错。独立于 @InboundChannelAdapter 的通道,它仍然会在消息的有效负载中生成 Mono。从这里开始,频道类型真的无关紧要。 |但同时您可以将 validationChannel 设为 FluxMessageChannel 然后您的变压器必须更改为服务激活器:

 @ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "validationChannel", async = "true")
    public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
      return eventMono.map(
                MessageBuilder
                   .withPayload(event.getDetails())
                   .setHeader("foo", "baz")
                   .build());
    }

这样 Mono 将被 FluxMessageChannel 订阅,其余的“阻塞”流应该保持不变。

如果 POJO 方法的结果不是 Message

,它会 return Message 的转换器问题