如何在 Spring Integration Reactor 中创建响应式入站通道适配器
How to create a Reactive Inbound Channel Adapter in Spring Integration Reactor
我想了解如何为 Spring 与 Reactor 核心集成创建 反应式 通道适配器。我从其他论坛 I've read that this Mongo DB reactive adapter can be a good example 了解到,但它包含很多 Mongo 领域特定的 classes.
我已经阅读了文档的 Reactive 部分,我发现需要实现 MessageProducerSupport
,但是从代码示例来看,似乎需要实现 [=21] =] 扩展 MessageProducerSpec
并调用第一个。有人可以举一个最基本用法的例子,并解释创建这样一个通道适配器的真正需求是什么吗?据我所知,我应该做的是:
public IntegrationFlow buildPipe() {
return IntegrationFlows.from(myMessageProducerSpec)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
MessageProducerSpec
适用于 Java DSL。它与通道适配器的 low-level 逻辑无关。如果你有一个MessageProducerSupport
,那么这个就足够你在流定义中使用了:
/**
* Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
* @param messageProducer the {@link MessageProducerSupport} to populate.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {
在文档中查看有关 Java DSL 中任意通道适配器用法的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters
但是再一次:忘掉你的频道适配器的 Java DSL API。首先实现该通道适配器。是的,反应式 MessageProducerSupport
必须在其 doStart()
实现中使用 subscribeToPublisher()
。从源系统构建 Flux
的其余逻辑取决于您和您将依赖的库。
还有一个ReactiveRedisStreamMessageProducer
和ZeroMqMessageProducer
,但我不能说他们的代码比上面提到的MongoDbChangeStreamMessageProducer
.
更容易理解。
我想了解如何为 Spring 与 Reactor 核心集成创建 反应式 通道适配器。我从其他论坛 I've read that this Mongo DB reactive adapter can be a good example 了解到,但它包含很多 Mongo 领域特定的 classes.
我已经阅读了文档的 Reactive 部分,我发现需要实现 MessageProducerSupport
,但是从代码示例来看,似乎需要实现 [=21] =] 扩展 MessageProducerSpec
并调用第一个。有人可以举一个最基本用法的例子,并解释创建这样一个通道适配器的真正需求是什么吗?据我所知,我应该做的是:
public IntegrationFlow buildPipe() {
return IntegrationFlows.from(myMessageProducerSpec)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
MessageProducerSpec
适用于 Java DSL。它与通道适配器的 low-level 逻辑无关。如果你有一个MessageProducerSupport
,那么这个就足够你在流定义中使用了:
/**
* Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
* @param messageProducer the {@link MessageProducerSupport} to populate.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {
在文档中查看有关 Java DSL 中任意通道适配器用法的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters
但是再一次:忘掉你的频道适配器的 Java DSL API。首先实现该通道适配器。是的,反应式 MessageProducerSupport
必须在其 doStart()
实现中使用 subscribeToPublisher()
。从源系统构建 Flux
的其余逻辑取决于您和您将依赖的库。
还有一个ReactiveRedisStreamMessageProducer
和ZeroMqMessageProducer
,但我不能说他们的代码比上面提到的MongoDbChangeStreamMessageProducer
.