如何中断或取消 Spring 集成 Java DSL 流程?
How to Interrupt or Cancel a Spring Integration Java DSL flow?
考虑实现一项功能,其中 JdbcRepositoryHandler
(实现 MessageHandler
)可能会监听外部事件(例如,CancelRunEvent
)。
我想我会使用 Spring' ApplicationEvent
支持通过 REST 控制器端点发布事件。而且我想我应该让上述处理程序实现 ApplicationListener
来侦听特定事件?
问题是:如果处理程序充满了它需要处理的消息,我将如何发出信号终止所有可能已从上游发出的后续消息,例如来自 FileSplitter
?
虽然我可以在调用负责的方法之前轻松构建要检查的条件,例如,对于持久性操作(基于从 CancelRunEvent
接收到的某些状态),我怎么能中断流量完全?
出于说明目的,想象一个流程如下:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
protected IntegrationFlow persistenceSubFlow() {
// @formatter:off
return f -> f
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a -> a
.correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
.releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
)
.handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
// @formatter:on
}
不完全清楚你的意思或为什么你需要 JdbcRepositoryHandler
来管理这个而不是其他 ApplicationListener
。
您的流程是 运行 在 s3Channel()
上游的某个线程上。根据那是什么,您可以 stop()
消息源并且在当前消息(或多线程消息)之后不会发出新消息。
但是,您可能(很可能会)在内存中保留部分聚合,直到组超时。
考虑实现一项功能,其中 JdbcRepositoryHandler
(实现 MessageHandler
)可能会监听外部事件(例如,CancelRunEvent
)。
我想我会使用 Spring' ApplicationEvent
支持通过 REST 控制器端点发布事件。而且我想我应该让上述处理程序实现 ApplicationListener
来侦听特定事件?
问题是:如果处理程序充满了它需要处理的消息,我将如何发出信号终止所有可能已从上游发出的后续消息,例如来自 FileSplitter
?
虽然我可以在调用负责的方法之前轻松构建要检查的条件,例如,对于持久性操作(基于从 CancelRunEvent
接收到的某些状态),我怎么能中断流量完全?
出于说明目的,想象一个流程如下:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
protected IntegrationFlow persistenceSubFlow() {
// @formatter:off
return f -> f
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a -> a
.correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
.releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
)
.handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
// @formatter:on
}
不完全清楚你的意思或为什么你需要 JdbcRepositoryHandler
来管理这个而不是其他 ApplicationListener
。
您的流程是 运行 在 s3Channel()
上游的某个线程上。根据那是什么,您可以 stop()
消息源并且在当前消息(或多线程消息)之后不会发出新消息。
但是,您可能(很可能会)在内存中保留部分聚合,直到组超时。