Spring Integration resequencer 不释放最后一组消息

Spring Integration resequencer does not release the last group of messages

我有以下配置:

    @Bean
    public IntegrationFlow messageFlow(JdbcMessageStore groupMessageStore, TransactionSynchronizationFactory syncFactory, TaskExecutor te, ThreadPoolTaskScheduler ts, RealTimeProcessor processor) {
        return IntegrationFlows
                .from("inputChannel")
                .handle(processor, "handleInputMessage", consumer -> consumer
                        .taskScheduler(ts)
                        .poller(poller -> poller
                                .fixedDelay(pollerFixedDelay)
                                .receiveTimeout(pollerReceiveTimeout)
                                .maxMessagesPerPoll(pollerMaxMessagesPerPoll)
                                .taskExecutor(te)
                                .transactional()
                                .transactionSynchronizationFactory(syncFactory)))
                .resequence(s -> s.messageStore(groupMessageStore)
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 30000)))
                .channel("sendingChannel")
                .handle(processor, "sendMessage")
                .get();
    }

如果我发送一批,例如发送到 inputChannel 的 100 条消息它按预期工作,直到 inputChannel 中没有消息。 inputChannel 变空后,它还会停止处理等待排序的消息。因此,即使在设置的发布超时后,groupMessageStore 中始终会留下一些消息。

我猜这是因为轮询器仅针对 inputChannel 配置,如果其中没有消息,它将永远不会到达音序器(因此永远不会调用 canRelease发布策略)。 但是如果我尝试为重新排序器添加一个单独的轮询器,我会收到以下错误 A poller should not be specified for endpoint since channel x is a SubscribableChannel (not pollable).

是否有不同的配置方式,以便始终释放最后一组消息?

释放策略是被动的,需要一些东西来触发它被调用。

添加.groupTimeout(...)在指定时间结束后释放部分序列。

编辑

@SpringBootApplication
public class So67993972Application {

    private static final Logger log = LoggerFactory.getLogger(So67993972Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So67993972Application.class, args);
    }

    @Bean
    IntegrationFlow flow(MessageGroupStore mgs) {
        return IntegrationFlows.from(MessageChannels.direct("input"))
                .resequence(e -> e.messageStore(mgs)
                                    .groupTimeout(5_000)
                                    .sendPartialResultOnExpiry(true)
                                    .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 2000)))
                .channel(MessageChannels.queue("output"))
                .get();
    }

    @Bean
    MessageGroupStore mgs() {
        return new SimpleMessageStore();
    }

    @Bean
    public ApplicationRunner runner(MessageChannel input, QueueChannel output, MessageGroupStore mgs) {
        return args -> {
            MessagingTemplate template = new MessagingTemplate(input);
            log.info("Sending");
            template.send(MessageBuilder.withPayload("foo")
                    .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "bar")
                    .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                    .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                    .build());
            log.info(output.receive(10_000).toString());
            Thread.sleep(1000);
            log.info(mgs.getMessagesForGroup("bar").toString());
        };
    }

}