Spring Integration resequencer 不释放最后一组消息
Spring Integration resequencer does not release the last group of messages
我有以下配置:
@Bean
public IntegrationFlow messageFlow(JdbcMessageStore groupMessageStore, TransactionSynchronizationFactory syncFactory, TaskExecutor te, ThreadPoolTaskScheduler ts, RealTimeProcessor processor) {
return IntegrationFlows
.from("inputChannel")
.handle(processor, "handleInputMessage", consumer -> consumer
.taskScheduler(ts)
.poller(poller -> poller
.fixedDelay(pollerFixedDelay)
.receiveTimeout(pollerReceiveTimeout)
.maxMessagesPerPoll(pollerMaxMessagesPerPoll)
.taskExecutor(te)
.transactional()
.transactionSynchronizationFactory(syncFactory)))
.resequence(s -> s.messageStore(groupMessageStore)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 30000)))
.channel("sendingChannel")
.handle(processor, "sendMessage")
.get();
}
如果我发送一批,例如发送到 inputChannel
的 100 条消息它按预期工作,直到 inputChannel
中没有消息。 inputChannel
变空后,它还会停止处理等待排序的消息。因此,即使在设置的发布超时后,groupMessageStore
中始终会留下一些消息。
我猜这是因为轮询器仅针对 inputChannel
配置,如果其中没有消息,它将永远不会到达音序器(因此永远不会调用 canRelease
发布策略)。
但是如果我尝试为重新排序器添加一个单独的轮询器,我会收到以下错误 A poller should not be specified for endpoint since channel x is a SubscribableChannel (not pollable).
是否有不同的配置方式,以便始终释放最后一组消息?
释放策略是被动的,需要一些东西来触发它被调用。
添加.groupTimeout(...)
在指定时间结束后释放部分序列。
编辑
@SpringBootApplication
public class So67993972Application {
private static final Logger log = LoggerFactory.getLogger(So67993972Application.class);
public static void main(String[] args) {
SpringApplication.run(So67993972Application.class, args);
}
@Bean
IntegrationFlow flow(MessageGroupStore mgs) {
return IntegrationFlows.from(MessageChannels.direct("input"))
.resequence(e -> e.messageStore(mgs)
.groupTimeout(5_000)
.sendPartialResultOnExpiry(true)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 2000)))
.channel(MessageChannels.queue("output"))
.get();
}
@Bean
MessageGroupStore mgs() {
return new SimpleMessageStore();
}
@Bean
public ApplicationRunner runner(MessageChannel input, QueueChannel output, MessageGroupStore mgs) {
return args -> {
MessagingTemplate template = new MessagingTemplate(input);
log.info("Sending");
template.send(MessageBuilder.withPayload("foo")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "bar")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());
log.info(output.receive(10_000).toString());
Thread.sleep(1000);
log.info(mgs.getMessagesForGroup("bar").toString());
};
}
}
我有以下配置:
@Bean
public IntegrationFlow messageFlow(JdbcMessageStore groupMessageStore, TransactionSynchronizationFactory syncFactory, TaskExecutor te, ThreadPoolTaskScheduler ts, RealTimeProcessor processor) {
return IntegrationFlows
.from("inputChannel")
.handle(processor, "handleInputMessage", consumer -> consumer
.taskScheduler(ts)
.poller(poller -> poller
.fixedDelay(pollerFixedDelay)
.receiveTimeout(pollerReceiveTimeout)
.maxMessagesPerPoll(pollerMaxMessagesPerPoll)
.taskExecutor(te)
.transactional()
.transactionSynchronizationFactory(syncFactory)))
.resequence(s -> s.messageStore(groupMessageStore)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 30000)))
.channel("sendingChannel")
.handle(processor, "sendMessage")
.get();
}
如果我发送一批,例如发送到 inputChannel
的 100 条消息它按预期工作,直到 inputChannel
中没有消息。 inputChannel
变空后,它还会停止处理等待排序的消息。因此,即使在设置的发布超时后,groupMessageStore
中始终会留下一些消息。
我猜这是因为轮询器仅针对 inputChannel
配置,如果其中没有消息,它将永远不会到达音序器(因此永远不会调用 canRelease
发布策略)。
但是如果我尝试为重新排序器添加一个单独的轮询器,我会收到以下错误 A poller should not be specified for endpoint since channel x is a SubscribableChannel (not pollable).
是否有不同的配置方式,以便始终释放最后一组消息?
释放策略是被动的,需要一些东西来触发它被调用。
添加.groupTimeout(...)
在指定时间结束后释放部分序列。
编辑
@SpringBootApplication
public class So67993972Application {
private static final Logger log = LoggerFactory.getLogger(So67993972Application.class);
public static void main(String[] args) {
SpringApplication.run(So67993972Application.class, args);
}
@Bean
IntegrationFlow flow(MessageGroupStore mgs) {
return IntegrationFlows.from(MessageChannels.direct("input"))
.resequence(e -> e.messageStore(mgs)
.groupTimeout(5_000)
.sendPartialResultOnExpiry(true)
.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(50, 2000)))
.channel(MessageChannels.queue("output"))
.get();
}
@Bean
MessageGroupStore mgs() {
return new SimpleMessageStore();
}
@Bean
public ApplicationRunner runner(MessageChannel input, QueueChannel output, MessageGroupStore mgs) {
return args -> {
MessagingTemplate template = new MessagingTemplate(input);
log.info("Sending");
template.send(MessageBuilder.withPayload("foo")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "bar")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());
log.info(output.receive(10_000).toString());
Thread.sleep(1000);
log.info(mgs.getMessagesForGroup("bar").toString());
};
}
}