每隔一段时间从 rabbitMQ 队列读取消息不起作用

Reading messages from rabbitMQ queue at an interval is not working

我想要实现的是每 15 分钟从 RabbitMQ 队列读取消息。从文档中,我可以看到我可以使用“receiveTimeout”方法来设置时间间隔。

Polling Consumer

The AmqpTemplate itself can be used for polled Message reception. By default, if no message is available, null is returned immediately. There is no blocking. Starting with version 1.5, you can set a receiveTimeout, in milliseconds, and the receive methods block for up to that long, waiting for a message.

但是我尝试通过 sprint 集成来实现它,receiveTimeout 没有像我预期的那样工作。

我的测试代码如下。

@Bean
    Queue createMessageQueue() {
        return new Queue(RetryQueue, false);
    }

    @Bean
    public SimpleMessageListenerContainer QueueMessageListenerContainer(ConnectionFactory connectionFactory) {
        final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(
                connectionFactory);
        messageListenerContainer.setQueueNames(RetryQueue);
        messageListenerContainer.setReceiveTimeout(900000);
        return messageListenerContainer;
    }

    @Bean
    public AmqpInboundChannelAdapter inboundQueueChannelAdapter(
            @Qualifier("QueueMessageListenerContainer") AbstractMessageListenerContainer messageListenerContainer) {
        final AmqpInboundChannelAdapter amqpInboundChannelAdapter = new AmqpInboundChannelAdapter(
                messageListenerContainer);
        amqpInboundChannelAdapter.setOutputChannelName("channelRequestFromQueue");
        return amqpInboundChannelAdapter;
    }

    @ServiceActivator(inputChannel = "channelRequestFromQueue")
    public void activatorRequestFromQueue(Message<String> message) {
        System.out.println("Message: " + message.getPayload() + ", recieved at: " + LocalDateTime.now());
    }

我正在以近乎实时的方式将负载记录在控制台中。 谁能帮忙?消费者启动后会活跃多少时间?

更新

IntegrationFlow 我曾经每隔一段时间从队列中检索消息,

@Bean
    public IntegrationFlow inboundIntegrationFlowPaymentRetry() {
        return IntegrationFlows
                .from(Amqp.inboundPolledAdapter(connectionFactory, RetryQueue),
                        e -> e.poller(Pollers.fixedDelay(20_000).maxMessagesPerPoll(-1)).autoStartup(true))
                .handle(message -> {
                    channelRequestFromQueue()
                            .send(MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders())
                                    .setHeader(IntegrationConstants.QUEUED_MESSAGE, message).build());
                }).get();
    }

Polling Consumer 文档来自关于 `RabbitTemplate 的 Spring AMQP 文档,与侦听器容器或 Spring 集成无关。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#polling-consumer

Spring 集成的适配器是消息驱动的,只要消息可用,您就会收到消息。

要按需获取消息,您需要在任意时间间隔调用 RabbitTemplate