Spring 集成处理器

Spring Integration Processor

我想定义一个写入 MongoDB 的流程,只有在成功时才会将 ID 写入 Kafka。我正在使用 JavaDSL,我希望有一个 FlowBuilder class 来定义我的高级管道。我正在寻找使我能够编写流程的功能,例如:

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .process(writeToMongo) // <-- Searching for this kind of function
      .handle(writeToKafka)
      .get();
}

我看过Apache Camel works exactly like this,我想知道Spring集成是否也有一个简单而好的解决这个基本问题的方法。

您正在寻找的是 publishSubscribeChannel() 具有多个订阅者的能力。默认情况下,如果没有在通道上配置执行程序,下一个订阅者只会在前一个订阅者之后被调用,并且只有当这个订阅者成功时才会被调用。

它可能看起来与您表达的相似 process():

public IntegrationFlow buildFlow() {
   return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
      .publishSubscribeChannel(c -> c
                        .subscribe(sf -> sf
                                .handle(MongoDb.reactiveOutboundChannelAdapter())) 
      .handle(writeToKafka)
      .get();
}

另一个选项是 gateway(),但您需要从那里 return 做一些事情才能继续。在 Spring 集成中,如果没有回复,流程就停止了。如果没有 out.

,它没有为 out 重用 in 的概念