使用 Spring 与 Project Reactor 集成时的主要处理流程编程方法

Main processing flow programmatic approach when using Spring Integration with Project Reactor

我想定义一个流程,使用 Reactor Kafka 使用 kafka 并写入 MongoDB,只有在成功时才会将 ID 写入 Kafka。我正在使用带有 Spring Integration JavaDSL 的 Project Reactor,并且我希望有一个 FlowBuilder class 在高级别定义我的管道。我目前有以下方向:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

我在docs that there is a support for a different approach, that also works with Project Reactor看过了。此方法不包括使用 IntegrationFlows。这看起来像这样:

@MessagingGateway
public static interface TestGateway {

    @Gateway(requestChannel = "promiseChannel")
    Mono<Integer> multiply(Integer value);

    }

        ...

    @ServiceActivator(inputChannel = "promiseChannel")
    public Integer multiply(Integer value) {
            return value * 2;
    }

        ...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。如果没有 IntegrationFlows 包装器,我不确定第二种方法是否可行。

@MessagingGateway 是为 high-level end-user API 设计的,以尽可能隐藏下面的消息。因此,当您开发目标服务的逻辑时,目标服务不受任何消息传递抽象的影响。

可以使用 IntegrationFlow 中的此类接口适配器,您应该将其视为常规服务激活器,因此它看起来像这样:

.handle("testGateway", "multiply", e -> e.async(true))

async(true) 使此服务激活器订阅 returned Mono。你可以省略它然后你自己订阅它下游因为这个 Mono 将成为流中下一条消息的 payload

如果你想要相反的东西:从 Flux 调用 IntegrationFlow,比如 flatMap(),然后考虑使用流程中的 toReactivePublisher() 运算符定义为 return a Publisher<?> 并将其声明为 bean。在这种情况下,最好不要使用 MongoDb.reactiveOutboundChannelAdapter(),而只使用 ReactiveMongoDbStoringMessageHandler 让其 returned Mono 传播到 Publisher

另一方面,如果您想要 @MessagingGatewayMono return,但仍从中调用 ReactiveMongoDbStoringMessageHandler,则将其声明为bean 并用 @ServiceActivator.

标记它

我们还有一个 ExpressionEvaluatingRequestHandlerAdvice 来捕获特定端点上的错误(或成功)并分别处理它们:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice

我想你要找的是这样的:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}

注意 .handle(reactiveMongoDbStoringMessageHandler) - 它与 MongoDb.reactiveOutboundChannelAdapter() 无关。因为这个把一个ReactiveMessageHandler包装成一个ReactiveMessageHandlerAdapter来实现自动订阅。您需要的是让 Mono<Void> return 看起来更像您自己的控件,因此您可以将其用作 writeToKafka 服务的输入并自己订阅并按照您的解释处理成功或错误。关键是使用 Reactive Stream 我们不能提供命令式的错误处理。该方法与任何异步 API 用法相同。因此,我们也将错误发送到 Reactive Streams 的 errorChannel

我们可能可以通过 returnMono(true/false) 之类的东西改进 MongoDb.reactiveOutboundChannelAdapter(),让像您这样的 use-case 可用 out-of-the-box。