Spring 集成处理器
Spring Integration Processor
我想定义一个写入 MongoDB 的流程,只有在成功时才会将 ID 写入 Kafka。我正在使用 JavaDSL,我希望有一个 FlowBuilder
class 来定义我的高级管道。我正在寻找使我能够编写流程的功能,例如:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.process(writeToMongo) // <-- Searching for this kind of function
.handle(writeToKafka)
.get();
}
我看过Apache Camel works exactly like this,我想知道Spring集成是否也有一个简单而好的解决这个基本问题的方法。
您正在寻找的是 publishSubscribeChannel()
具有多个订阅者的能力。默认情况下,如果没有在通道上配置执行程序,下一个订阅者只会在前一个订阅者之后被调用,并且只有当这个订阅者成功时才会被调用。
它可能看起来与您表达的相似 process()
:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
另一个选项是 gateway()
,但您需要从那里 return 做一些事情才能继续。在 Spring 集成中,如果没有回复,流程就停止了。如果没有 out
.
,它没有为 out
重用 in
的概念
我想定义一个写入 MongoDB 的流程,只有在成功时才会将 ID 写入 Kafka。我正在使用 JavaDSL,我希望有一个 FlowBuilder
class 来定义我的高级管道。我正在寻找使我能够编写流程的功能,例如:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.process(writeToMongo) // <-- Searching for this kind of function
.handle(writeToKafka)
.get();
}
我看过Apache Camel works exactly like this,我想知道Spring集成是否也有一个简单而好的解决这个基本问题的方法。
您正在寻找的是 publishSubscribeChannel()
具有多个订阅者的能力。默认情况下,如果没有在通道上配置执行程序,下一个订阅者只会在前一个订阅者之后被调用,并且只有当这个订阅者成功时才会被调用。
它可能看起来与您表达的相似 process()
:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
另一个选项是 gateway()
,但您需要从那里 return 做一些事情才能继续。在 Spring 集成中,如果没有回复,流程就停止了。如果没有 out
.
out
重用 in
的概念