Spring聚合组

Spring Aggregation Group

我确实创建了一个聚合服务如下

@EnableBinding(Processor.class)
class Configuration {

@Autowired
Processor processor;


@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {

    AggregatingMessageHandler aggregatingMessageHandler =
            new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                    new SimpleMessageStore(10));

    //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    //aggregatorFactoryBean.setMessageStore();
    aggregatingMessageHandler.setOutputChannel(processor.output());
    //aggregatorFactoryBean.setDiscardChannel(processor.output());
    aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
    aggregatingMessageHandler.setSendTimeout(1000L);
    aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("requestType"));
    aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
    aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
    aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
    aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
    return aggregatingMessageHandler;
    }
}

现在我想一创建新群就释放群,所以我一次只有一个群。

更具体地说,我确实收到了两种类型的请求 'PUT' 和 'DEL'。我想按照上述规则继续聚合,但是一旦我收到请求类型而不是我正在聚合的类型,我想释放当前组并开始聚合新类型。

我想这样做的原因是因为这些请求被发送到不支持同时具有 PUT 和 DEL 请求的另一方,我不能延迟任何 DEL 请求作为 PUT 和 DEL 之间的顺序很重要。

我知道我需要创建一个自定义版本 Pojo,但我可以检查当前组吗?

例如

如果我收到如下 6 条消息

PUT PUT PUT DEL DEL PUT

它们应该汇总如下

3PUT

2 DEL

1 PUT

好的。感谢您分享更多信息。

是的,您自定义 ReleaseStrategy 可以检查该消息类型和 return true 以导致组完成功能。

只要你只有静态correlationKey,所以店里只有一组。当您的消息进入 ReleaseStrategy 时,仅仅检查当前组的完成信号并没有什么魔力。由于商店中没有任何其他组,因此不需要任何复杂的发布逻辑。

您应该添加expireGroupsUponCompletion = true以让该组在完成后被删除,下一条消息将形成一个新组相同的correlationKey

更新

感谢您提供更多信息!

所以,是的,您的原始 PoC 很好。甚至静态 correlationKey 也可以,因为您只是要将传入消息分批收集。

您的自定义 ReleaseStrategy 应该分析 MessageGroup 以查找具有不同 key 和 return true 的消息。

自定义 MessageGroupProcessor 应该使用与输出 List 不同的键过滤一条消息,并将该消息发送回聚合器,以便为其 [=19] 的序列形成一个新组=].

我最终实施了以下 ReleaseStrategy,因为我发现它比删除消息并再次排队更简单。

class MessageCountAndOnlyOneGroupReleaseStrategy implements org.springframework.integration.aggregator.ReleaseStrategy {

    private final int threshold;

    private final MessageGroupProcessor messageGroupProcessor;


    public MessageCountAndOnlyOneGroupReleaseStrategy(int threshold,MessageGroupProcessor messageGroupProcessor) {
        super();
        this.threshold = threshold;
        this.messageGroupProcessor = messageGroupProcessor;
    }

    private MessageGroup currentGroup;

    @Override
    public boolean canRelease(MessageGroup group) {
        if(currentGroup == null)
            currentGroup = group;

        if(!group.getGroupId().equals(currentGroup.getGroupId())) {
            messageGroupProcessor.processMessageGroup(currentGroup);
            currentGroup = group;
            return false;
        }


        return group.size() >= this.threshold;
    }

}

请注意,对于 CollorationStrategy

,我确实使用了 new HeaderAttributeCorrelationStrategy("request_type") 而不是 FOO