使用 Spring 与 Project Reactor 集成时的主要处理流程编程方法
Main processing flow programmatic approach when using Spring Integration with Project Reactor
我想定义一个流程,使用 Reactor Kafka 使用 kafka 并写入 MongoDB,只有在成功时才会将 ID 写入 Kafka。我正在使用带有 Spring Integration JavaDSL 的 Project Reactor,并且我希望有一个 FlowBuilder
class 在高级别定义我的管道。我目前有以下方向:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
我在docs that there is a support for a different approach, that also works with Project Reactor看过了。此方法不包括使用 IntegrationFlows
。这看起来像这样:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。如果没有 IntegrationFlows
包装器,我不确定第二种方法是否可行。
@MessagingGateway
是为 high-level end-user API 设计的,以尽可能隐藏下面的消息。因此,当您开发目标服务的逻辑时,目标服务不受任何消息传递抽象的影响。
可以使用 IntegrationFlow
中的此类接口适配器,您应该将其视为常规服务激活器,因此它看起来像这样:
.handle("testGateway", "multiply", e -> e.async(true))
async(true)
使此服务激活器订阅 returned Mono
。你可以省略它然后你自己订阅它下游因为这个 Mono
将成为流中下一条消息的 payload
。
如果你想要相反的东西:从 Flux
调用 IntegrationFlow
,比如 flatMap()
,然后考虑使用流程中的 toReactivePublisher()
运算符定义为 return a Publisher<?>
并将其声明为 bean。在这种情况下,最好不要使用 MongoDb.reactiveOutboundChannelAdapter()
,而只使用 ReactiveMongoDbStoringMessageHandler
让其 returned Mono
传播到 Publisher
。
另一方面,如果您想要 @MessagingGateway
与 Mono
return,但仍从中调用 ReactiveMongoDbStoringMessageHandler
,则将其声明为bean 并用 @ServiceActivator
.
标记它
我们还有一个 ExpressionEvaluatingRequestHandlerAdvice
来捕获特定端点上的错误(或成功)并分别处理它们:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice
我想你要找的是这样的:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
注意 .handle(reactiveMongoDbStoringMessageHandler)
- 它与 MongoDb.reactiveOutboundChannelAdapter()
无关。因为这个把一个ReactiveMessageHandler
包装成一个ReactiveMessageHandlerAdapter
来实现自动订阅。您需要的是让 Mono<Void>
return 看起来更像您自己的控件,因此您可以将其用作 writeToKafka
服务的输入并自己订阅并按照您的解释处理成功或错误。关键是使用 Reactive Stream 我们不能提供命令式的错误处理。该方法与任何异步 API 用法相同。因此,我们也将错误发送到 Reactive Streams 的 errorChannel
。
我们可能可以通过 returnMono(true/false)
之类的东西改进 MongoDb.reactiveOutboundChannelAdapter()
,让像您这样的 use-case 可用 out-of-the-box。
我想定义一个流程,使用 Reactor Kafka 使用 kafka 并写入 MongoDB,只有在成功时才会将 ID 写入 Kafka。我正在使用带有 Spring Integration JavaDSL 的 Project Reactor,并且我希望有一个 FlowBuilder
class 在高级别定义我的管道。我目前有以下方向:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
我在docs that there is a support for a different approach, that also works with Project Reactor看过了。此方法不包括使用 IntegrationFlows
。这看起来像这样:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
我想知道在使用这两个库时更推荐的处理方式是什么。我想知道如何在第二个示例中使用 Reactive MongoDB 适配器。如果没有 IntegrationFlows
包装器,我不确定第二种方法是否可行。
@MessagingGateway
是为 high-level end-user API 设计的,以尽可能隐藏下面的消息。因此,当您开发目标服务的逻辑时,目标服务不受任何消息传递抽象的影响。
可以使用 IntegrationFlow
中的此类接口适配器,您应该将其视为常规服务激活器,因此它看起来像这样:
.handle("testGateway", "multiply", e -> e.async(true))
async(true)
使此服务激活器订阅 returned Mono
。你可以省略它然后你自己订阅它下游因为这个 Mono
将成为流中下一条消息的 payload
。
如果你想要相反的东西:从 Flux
调用 IntegrationFlow
,比如 flatMap()
,然后考虑使用流程中的 toReactivePublisher()
运算符定义为 return a Publisher<?>
并将其声明为 bean。在这种情况下,最好不要使用 MongoDb.reactiveOutboundChannelAdapter()
,而只使用 ReactiveMongoDbStoringMessageHandler
让其 returned Mono
传播到 Publisher
。
另一方面,如果您想要 @MessagingGateway
与 Mono
return,但仍从中调用 ReactiveMongoDbStoringMessageHandler
,则将其声明为bean 并用 @ServiceActivator
.
我们还有一个 ExpressionEvaluatingRequestHandlerAdvice
来捕获特定端点上的错误(或成功)并分别处理它们:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-advice
我想你要找的是这样的:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
注意 .handle(reactiveMongoDbStoringMessageHandler)
- 它与 MongoDb.reactiveOutboundChannelAdapter()
无关。因为这个把一个ReactiveMessageHandler
包装成一个ReactiveMessageHandlerAdapter
来实现自动订阅。您需要的是让 Mono<Void>
return 看起来更像您自己的控件,因此您可以将其用作 writeToKafka
服务的输入并自己订阅并按照您的解释处理成功或错误。关键是使用 Reactive Stream 我们不能提供命令式的错误处理。该方法与任何异步 API 用法相同。因此,我们也将错误发送到 Reactive Streams 的 errorChannel
。
我们可能可以通过 returnMono(true/false)
之类的东西改进 MongoDb.reactiveOutboundChannelAdapter()
,让像您这样的 use-case 可用 out-of-the-box。