spring 与 java DSL 集成中的免费队列通道

Free queue channel in spring integration with java DSL

我有一个存储消息的频道。当新消息到达时,如果服务器尚未处理所有消息(仍在队列中),我需要清除队列(例如,通过将所有数据重新路由到另一个通道)。为此,我使用了路由器。但问题是当新消息到达时,不仅旧消息而且新消息都会重新路由到另一个通道。新消息必须保留在队列中。我怎么解决这个问题? 这是我的代码:

    @Bean
    public IntegrationFlow integerFlow() {
        return IntegrationFlows.from("input")
                .bridge(e -> e.poller(Pollers.fixedDelay(500, TimeUnit.MILLISECONDS, 1000).maxMessagesPerPoll(1)))
                .route(r -> {
                    if (flag) {
                        return "mainChannel";
                    } else {
                        return "garbageChannel";
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow outFlow() {
        return IntegrationFlows.from("mainChannel")
                .handle(m -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(m.getPayload() + "\tmainFlow");
                })
                .get();
    }

    @Bean
    public IntegrationFlow outGarbage() {
        return IntegrationFlows.from("garbageChannel")
                .handle(m -> System.out.println(m.getPayload() + "\tgarbage"))
                .get();
    }

通过按“q”和“e”键通过@GateWay 更改标志值。

我建议您查看 QueueChannelpurge API:

/**
 * Remove any {@link Message Messages} that are not accepted by the provided selector.
 * @param selector The message selector.
 * @return The list of messages that were purged.
 */
List<Message<?>> purge(@Nullable MessageSelector selector);

通过自定义 MessageSelector,您将能够从 queue 中删除旧消息。请参阅 timestamp 消息 header 进行咨询。使用此方法后,您可以对旧消息做任何需要做的事情。