Spring 集成中的发布-订阅限时聚合
Time-limited aggregation with publish-subscribe in Spring Integration
我正在尝试使用 Spring 与 DSL 和 lambda 的集成来实现以下内容:
给定一条消息,将其发送给 N
消费者(通过 publish-subscribe
)。等待有限的时间和 return 在此间隔期间从消费者 (<= N
) 收到的所有结果。
这是我目前的示例配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class ExampleConfiguration {
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(1000).maxMessagesPerPoll(1).get();
}
@Bean
public MessageChannel publishSubscribeChannel() {
return MessageChannels.publishSubscribe(splitterExecutorService()).applySequence(true).get();
}
@Bean
public ThreadPoolTaskExecutor splitterExecutorService() {
final ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(3);
executorService.setMaxPoolSize(10);
return executorService;
}
@Bean
public DirectChannel errorChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel requestChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel channel1() {
return new DirectChannel();
}
@Bean
public DirectChannel channel2() {
return new DirectChannel();
}
@Bean
public DirectChannel collectorChannel() {
return new DirectChannel();
}
@Bean
public TransformerChannel1 transformerChannel1() {
return new TransformerChannel1();
}
@Bean
public TransformerChannel2 transformerChannel2() {
return new TransformerChannel2();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(errorChannel())
.handle(m -> System.err.println("[" + Thread.currentThread().getName() + "] " + m.getPayload()))
.get();
}
@Bean
public IntegrationFlow channel1Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("1: "::concat)
.transform(transformerChannel1())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow channel2Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("2: "::concat)
.transform(transformerChannel2())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow splitterFlow() {
return IntegrationFlows.from(requestChannel())
.channel(publishSubscribeChannel())
.get();
}
@Bean
public IntegrationFlow collectorFlow() {
return IntegrationFlows.from(collectorChannel())
.resequence(r -> r.releasePartialSequences(true),
null)
.aggregate(a ->
a.sendPartialResultOnExpiry(true)
.groupTimeout(500)
, null)
.get();
}
}
TransformerChannel1
和 TransformerChannel2
是示例消费者,仅通过睡眠来模拟延迟。
消息流为:
splitterFlow -> channel1Flow \
-> channel2Flow / -> collectorFlow
一切似乎都按预期工作,但我看到如下警告:
Reply message received but the receiving thread has already received a reply
这是意料之中的,因为部分结果是 returned。
问题:
- 总的来说,这是一个好方法吗?
- 优雅地服务或丢弃那些延迟消息的正确方法是什么?
- 如何处理异常?理想情况下,我想将它们发送到
errorChannel
,但不确定在哪里指定。
是的,解决方案看起来不错。我想它适合 Scatter-Gather
模式。该实现自版本 4.1
.
起提供
从另一方面来看,aggregator
自该版本以来也有更多选项 - expire-groups-upon-timeout
,默认情况下聚合器为 true
。使用此选项作为 false
,您将能够实现丢弃所有这些迟到消息的要求。不幸的是 DSL 不支持它 yet。因此,即使您将项目升级为使用 Spring Integration 4.1.
也无济于事
那些 "Reply message received but the receiving thread has already received a reply" 的另一个选项是 spring.integraton.messagingTemplate.throwExceptionOnLateReply = true
选项,使用 jar 之一的 META-INF
中的 spring.integration.properties
文件。
无论如何,我认为 Scatter-Gather
是您用例的最佳解决方案。
您可以找到 here 如何从 JavaConfig 配置它。
更新
What about exceptions and error channel?
由于您已经与 throwExceptionOnLateReply
达成交易,我猜您是通过 @MessagingGateway
向 requestChannel
发送了一条消息。最后一个有 errorChannel
选项。另一方面,PublishSubscribeChannel
有 errorHandler
选项,您可以使用 MessagePublishingErrorHandler
和 errorChannel
作为默认选项。
顺便说一句,不要忘记 Framework 为 LoggingHandler
提供了 errorChannel
bean 和上面的 endpoint
。所以,请想想,如果你真的需要覆盖那些东西。默认 errorChannel
是 PublishSubscribeChannel
,因此您可以简单地添加自己的订阅者。
我正在尝试使用 Spring 与 DSL 和 lambda 的集成来实现以下内容:
给定一条消息,将其发送给 N
消费者(通过 publish-subscribe
)。等待有限的时间和 return 在此间隔期间从消费者 (<= N
) 收到的所有结果。
这是我目前的示例配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class ExampleConfiguration {
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(1000).maxMessagesPerPoll(1).get();
}
@Bean
public MessageChannel publishSubscribeChannel() {
return MessageChannels.publishSubscribe(splitterExecutorService()).applySequence(true).get();
}
@Bean
public ThreadPoolTaskExecutor splitterExecutorService() {
final ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(3);
executorService.setMaxPoolSize(10);
return executorService;
}
@Bean
public DirectChannel errorChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel requestChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel channel1() {
return new DirectChannel();
}
@Bean
public DirectChannel channel2() {
return new DirectChannel();
}
@Bean
public DirectChannel collectorChannel() {
return new DirectChannel();
}
@Bean
public TransformerChannel1 transformerChannel1() {
return new TransformerChannel1();
}
@Bean
public TransformerChannel2 transformerChannel2() {
return new TransformerChannel2();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(errorChannel())
.handle(m -> System.err.println("[" + Thread.currentThread().getName() + "] " + m.getPayload()))
.get();
}
@Bean
public IntegrationFlow channel1Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("1: "::concat)
.transform(transformerChannel1())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow channel2Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("2: "::concat)
.transform(transformerChannel2())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow splitterFlow() {
return IntegrationFlows.from(requestChannel())
.channel(publishSubscribeChannel())
.get();
}
@Bean
public IntegrationFlow collectorFlow() {
return IntegrationFlows.from(collectorChannel())
.resequence(r -> r.releasePartialSequences(true),
null)
.aggregate(a ->
a.sendPartialResultOnExpiry(true)
.groupTimeout(500)
, null)
.get();
}
}
TransformerChannel1
和 TransformerChannel2
是示例消费者,仅通过睡眠来模拟延迟。
消息流为:
splitterFlow -> channel1Flow \
-> channel2Flow / -> collectorFlow
一切似乎都按预期工作,但我看到如下警告:
Reply message received but the receiving thread has already received a reply
这是意料之中的,因为部分结果是 returned。
问题:
- 总的来说,这是一个好方法吗?
- 优雅地服务或丢弃那些延迟消息的正确方法是什么?
- 如何处理异常?理想情况下,我想将它们发送到
errorChannel
,但不确定在哪里指定。
是的,解决方案看起来不错。我想它适合 Scatter-Gather
模式。该实现自版本 4.1
.
从另一方面来看,aggregator
自该版本以来也有更多选项 - expire-groups-upon-timeout
,默认情况下聚合器为 true
。使用此选项作为 false
,您将能够实现丢弃所有这些迟到消息的要求。不幸的是 DSL 不支持它 yet。因此,即使您将项目升级为使用 Spring Integration 4.1.
那些 "Reply message received but the receiving thread has already received a reply" 的另一个选项是 spring.integraton.messagingTemplate.throwExceptionOnLateReply = true
选项,使用 jar 之一的 META-INF
中的 spring.integration.properties
文件。
无论如何,我认为 Scatter-Gather
是您用例的最佳解决方案。
您可以找到 here 如何从 JavaConfig 配置它。
更新
What about exceptions and error channel?
由于您已经与 throwExceptionOnLateReply
达成交易,我猜您是通过 @MessagingGateway
向 requestChannel
发送了一条消息。最后一个有 errorChannel
选项。另一方面,PublishSubscribeChannel
有 errorHandler
选项,您可以使用 MessagePublishingErrorHandler
和 errorChannel
作为默认选项。
顺便说一句,不要忘记 Framework 为 LoggingHandler
提供了 errorChannel
bean 和上面的 endpoint
。所以,请想想,如果你真的需要覆盖那些东西。默认 errorChannel
是 PublishSubscribeChannel
,因此您可以简单地添加自己的订阅者。