Spring 未从句柄调用集成流

Spring Integration flow not invoked from handle

我有以下 2 个组件,它们应该先从 Mongo 中删除一个文档,然后再从 Elastic 中删除。

主要流程:

@Component
public class DeleteDocumentFlow {

    @Autowired
    private StoreInMongoFlow storeInMongoFlow;

    @Bean
    public IntegrationFlow deleteDocument() {
        return IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
                .handle(storeInMongoFlow.deleteDocumentInMongo())
                .channel("deleteDocumentInES.input")
                .get();
    }
}

服务:

@Component
public class StoreInMongoFlow {
    @Bean
    public IntegrationFlow deleteDocumentInMongo() {
        return flow -> flow.
                <Metadata>handle((p, h) -> {
                    DBObject obj = BasicDBObjectBuilder.start("i", p.getId()).get();
                    DeleteResult documentEntry = this.mongoTemplate.remove(obj, "docs");
                    return documentEntry.getDeletedCount();
                })
                .log(LoggingHandler.Level.INFO, m -> "Number of documents deleted: " + m.getPayload());
    }
}

不幸的是,deleteDocumentInMongo 从未被调用。正如我在日志中看到的那样,该 bean 已正确注册。

我是不是做错了什么,或者您需要更多调试信息吗?如果我窃听句柄,则会执行 deleteDocumentInES.input 但会忽略 mongo 流。

你肯定做错了根本性的事情。您尝试将 IntegrationFlow 视为从 handle() 调用的服务。这与 IntegrationFlow 的设计无关。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl

The DSL provides an IntegrationFlow component to define a composition of channels and endpoints between them, but now IntegrationFlow plays only the configuration role to populate real beans in the application context and is not used at runtime.

如果您将逻辑声明为单独的 IntegrationFlow,则无需担心 handle() - 只需使用 channel("deleteDocumentInMongo.input") 从主流点发送消息到那个 MongoDB 子流的第一个通道。

如果你想用 Elastic 做同样的操作,你应该考虑有一个 PublishSubscribeChannel 来发送消息和从这个通道开始的两个流。

由于您以 log() 结束 deleteDocumentInMongo 流,您无法收到任何回复,并且您的 .channel("deleteDocumentInES.input") 将无法访问。

请阅读更多文档以了解什么是发布-订阅、请求-回复、服务激活器和流程本身。