Spring聚合组
Spring Aggregation Group
我确实创建了一个聚合服务如下
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("requestType"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
现在我想一创建新群就释放群,所以我一次只有一个群。
更具体地说,我确实收到了两种类型的请求 'PUT' 和 'DEL'。我想按照上述规则继续聚合,但是一旦我收到请求类型而不是我正在聚合的类型,我想释放当前组并开始聚合新类型。
我想这样做的原因是因为这些请求被发送到不支持同时具有 PUT 和 DEL 请求的另一方,我不能延迟任何 DEL 请求作为 PUT 和 DEL 之间的顺序很重要。
我知道我需要创建一个自定义版本 Pojo,但我可以检查当前组吗?
例如
如果我收到如下 6 条消息
PUT PUT PUT DEL DEL PUT
它们应该汇总如下
3PUT
2 DEL
1 PUT
好的。感谢您分享更多信息。
是的,您自定义 ReleaseStrategy
可以检查该消息类型和 return true
以导致组完成功能。
只要你只有静态correlationKey
,所以店里只有一组。当您的消息进入 ReleaseStrategy
时,仅仅检查当前组的完成信号并没有什么魔力。由于商店中没有任何其他组,因此不需要任何复杂的发布逻辑。
您应该添加expireGroupsUponCompletion = true
以让该组在完成后被删除,下一条消息将形成一个新组相同的correlationKey
。
更新
感谢您提供更多信息!
所以,是的,您的原始 PoC 很好。甚至静态 correlationKey
也可以,因为您只是要将传入消息分批收集。
您的自定义 ReleaseStrategy
应该分析 MessageGroup
以查找具有不同 key
和 return true
的消息。
自定义 MessageGroupProcessor
应该使用与输出 List
不同的键过滤一条消息,并将该消息发送回聚合器,以便为其 [=19] 的序列形成一个新组=].
我最终实施了以下 ReleaseStrategy
,因为我发现它比删除消息并再次排队更简单。
class MessageCountAndOnlyOneGroupReleaseStrategy implements org.springframework.integration.aggregator.ReleaseStrategy {
private final int threshold;
private final MessageGroupProcessor messageGroupProcessor;
public MessageCountAndOnlyOneGroupReleaseStrategy(int threshold,MessageGroupProcessor messageGroupProcessor) {
super();
this.threshold = threshold;
this.messageGroupProcessor = messageGroupProcessor;
}
private MessageGroup currentGroup;
@Override
public boolean canRelease(MessageGroup group) {
if(currentGroup == null)
currentGroup = group;
if(!group.getGroupId().equals(currentGroup.getGroupId())) {
messageGroupProcessor.processMessageGroup(currentGroup);
currentGroup = group;
return false;
}
return group.size() >= this.threshold;
}
}
请注意,对于 CollorationStrategy
,我确实使用了 new HeaderAttributeCorrelationStrategy("request_type")
而不是 FOO
我确实创建了一个聚合服务如下
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("requestType"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
现在我想一创建新群就释放群,所以我一次只有一个群。
更具体地说,我确实收到了两种类型的请求 'PUT' 和 'DEL'。我想按照上述规则继续聚合,但是一旦我收到请求类型而不是我正在聚合的类型,我想释放当前组并开始聚合新类型。
我想这样做的原因是因为这些请求被发送到不支持同时具有 PUT 和 DEL 请求的另一方,我不能延迟任何 DEL 请求作为 PUT 和 DEL 之间的顺序很重要。
我知道我需要创建一个自定义版本 Pojo,但我可以检查当前组吗?
例如
如果我收到如下 6 条消息
PUT PUT PUT DEL DEL PUT
它们应该汇总如下
3PUT
2 DEL
1 PUT
好的。感谢您分享更多信息。
是的,您自定义 ReleaseStrategy
可以检查该消息类型和 return true
以导致组完成功能。
只要你只有静态correlationKey
,所以店里只有一组。当您的消息进入 ReleaseStrategy
时,仅仅检查当前组的完成信号并没有什么魔力。由于商店中没有任何其他组,因此不需要任何复杂的发布逻辑。
您应该添加expireGroupsUponCompletion = true
以让该组在完成后被删除,下一条消息将形成一个新组相同的correlationKey
。
更新
感谢您提供更多信息!
所以,是的,您的原始 PoC 很好。甚至静态 correlationKey
也可以,因为您只是要将传入消息分批收集。
您的自定义 ReleaseStrategy
应该分析 MessageGroup
以查找具有不同 key
和 return true
的消息。
自定义 MessageGroupProcessor
应该使用与输出 List
不同的键过滤一条消息,并将该消息发送回聚合器,以便为其 [=19] 的序列形成一个新组=].
我最终实施了以下 ReleaseStrategy
,因为我发现它比删除消息并再次排队更简单。
class MessageCountAndOnlyOneGroupReleaseStrategy implements org.springframework.integration.aggregator.ReleaseStrategy {
private final int threshold;
private final MessageGroupProcessor messageGroupProcessor;
public MessageCountAndOnlyOneGroupReleaseStrategy(int threshold,MessageGroupProcessor messageGroupProcessor) {
super();
this.threshold = threshold;
this.messageGroupProcessor = messageGroupProcessor;
}
private MessageGroup currentGroup;
@Override
public boolean canRelease(MessageGroup group) {
if(currentGroup == null)
currentGroup = group;
if(!group.getGroupId().equals(currentGroup.getGroupId())) {
messageGroupProcessor.processMessageGroup(currentGroup);
currentGroup = group;
return false;
}
return group.size() >= this.threshold;
}
}
请注意,对于 CollorationStrategy
new HeaderAttributeCorrelationStrategy("request_type")
而不是 FOO