如何在 Spring Integration Reactor 中创建响应式入站通道适配器

How to create a Reactive Inbound Channel Adapter in Spring Integration Reactor

我想了解如何为 Spring 与 Reactor 核心集成创建 反应式 通道适配器。我从其他论坛 I've read that this Mongo DB reactive adapter can be a good example 了解到,但它包含很多 Mongo 领域特定的 classes.

我已经阅读了文档的 Reactive 部分,我发现需要实现 MessageProducerSupport,但是从代码示例来看,似乎需要实现 [=21] =] 扩展 MessageProducerSpec 并调用第一个。有人可以举一个最基本用法的例子,并解释创建这样一个通道适配器的真正需求是什么吗?据我所知,我应该做的是:

public IntegrationFlow buildPipe() {
   return IntegrationFlows.from(myMessageProducerSpec)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}

MessageProducerSpec 适用于 Java DSL。它与通道适配器的 low-level 逻辑无关。如果你有一个MessageProducerSupport,那么这个就足够你在流定义中使用了:

/**
 * Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
 * The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
 * @param messageProducer the {@link MessageProducerSupport} to populate.
 * @return new {@link IntegrationFlowBuilder}.
 */
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {

在文档中查看有关 Java DSL 中任意通道适配器用法的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters

但是再一次:忘掉你的频道适配器的 Java DSL API。首先实现该通道适配器。是的,反应式 MessageProducerSupport 必须在其 doStart() 实现中使用 subscribeToPublisher()。从源系统构建 Flux 的其余逻辑取决于您和您将依赖的库。

还有一个ReactiveRedisStreamMessageProducerZeroMqMessageProducer,但我不能说他们的代码比上面提到的MongoDbChangeStreamMessageProducer.

更容易理解。