如何中断或取消 Spring 集成 Java DSL 流程?

How to Interrupt or Cancel a Spring Integration Java DSL flow?

考虑实现一项功能,其中 JdbcRepositoryHandler(实现 MessageHandler)可能会监听外部事件(例如,CancelRunEvent)。

我想我会使用 Spring' ApplicationEvent 支持通过 REST 控制器端点发布事件。而且我想我应该让上述处理程序实现 ApplicationListener 来侦听特定事件?

问题是:如果处理程序充满了它需要处理的消息,我将如何发出信号终止所有可能已从上游发出的后续消息,例如来自 FileSplitter?

虽然我可以在调用负责的方法之前轻松构建要检查的条件,例如,对于持久性操作(基于从 CancelRunEvent 接收到的某些状态),我怎么能中断流量完全?

出于说明目的,想象一个流程如下:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
        .channel(runStatsChannel())
        .transform(new FileToInputStreamTransformer())
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

@Bean
protected IntegrationFlow persistenceSubFlow() {
    // @formatter:off
    return f -> f
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> a
                    .correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
                    .releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
                    .sendPartialResultOnExpiry(true)
                    .expireGroupsUponCompletion(true)
                    .groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
            )
            .handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
    // @formatter:on
}

不完全清楚你的意思或为什么你需要 JdbcRepositoryHandler 来管理这个而不是其他 ApplicationListener

您的流程是 运行 在 s3Channel() 上游的某个线程上。根据那是什么,您可以 stop() 消息源并且在当前消息(或多线程消息)之后不会发出新消息。

但是,您可能(很可能会)在内存中保留部分聚合,直到组超时。