如何使用 spring 集成 DSL 聚合来自队列通道的消息?

How to aggrate messages from a queue Channel with using spring integration DSL?

我定义一个队列通道

@Bean("mail-action-laundry-list-channel")
public MessageChannel mailRecipientActionMessageChannel() {
    return new QueueChannel(20);
    }

下面的流程,我将从队列通道聚合消息,我试过这个:

@Bean
public IntegrationFlow mailRecipientActionLaundryListMessageFlow(@Qualifier("laundryListMessageHandler") MessageHandler laundryListMessageHandler) {
    return IntegrationFlows.from("mail-action-laundry-list-channel")
            .log("--> laundry list messages::")
            .aggregate(aggregatorSpec -> aggregatorSpec
                    .correlationExpression("#this.payload.email")
                    .releaseExpression("#this.size() == 5")
                    .messageStore(new SimpleMessageStore(100))
                    .groupTimeout(2000))
            .transform(laundryListMessageToItemProcessDtoTransformer())
            .handle(laundryListMessageHandler)
            .get();
}

但为什么它总是聚合来自频道的前 5 条消息,而不再聚合其他消息

您需要在聚合器上配置 expireGroupsUponCompletion(true)

When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior is to send messages with the same correlation as a completed group to the discard-channel.

看起来您来自队列的后续消息具有相同的 email 属性。因此,聚合器无法为相同的关联键形成新组。

https://docs.spring.io/spring-integration/docs/5.0.3.RELEASE/reference/html/messaging-routing-chapter.html#aggregator-config