如何修复 ListenerExecutionFailedException:侦听器抛出异常 amqp.AmqpRejectAndDontRequeueException:超时后收到回复

How to fix ListenerExecutionFailedException: Listener threw exception amqp.AmqpRejectAndDontRequeueException: Reply received after timeout

我在我的 java spring-boot 应用程序上设置了 rabbbitMQ,它工作正常(看起来),但在 运行 一段时间后,以某种方式以相同的时间间隔它抛出以下异常。

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1646) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1550) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at com.rabbitmq.client.impl.ConsumerDispatcher.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar!/:5.4.3]
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar!/:5.4.3]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_201]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_201]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
        at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2523) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener(DirectReplyToMessageListenerContainer.java:115) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        ... 11 common frames omitted 

你可以在下面找到兔子配置的消费者代码

 @Bean
    public DirectExchange exchange() {
        return new DirectExchange("rpc");
    }


    @Bean
    @Qualifier("Consumer")
    public Queue queue() {
        return new Queue(RoutingEngine.class.getSimpleName()+"_"+config.getDatasetName());
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Consumer.class.getSimpleName()+"_"+config.getDatasetName());
    }


    @Bean
    @Qualifier("ConsumerExport")
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, Consumer service) {
        AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
        exporter.setAmqpTemplate(template);
        exporter.setService(service);
        exporter.setServiceInterface(Consumer.class);
        return exporter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,@Qualifier("consumer") Queue queue,
                                                    @Qualifier("RoutingEngineExport") AmqpInvokerServiceExporter exporter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setPrefetchCount(5);
        container.setQueues(queue);
        container.setMessageListener(exporter);
        logger.info("initialize rabbitmq with {} Consumers",config.getCount());
        container.setConcurrency(1+"-"+config.getCount());
        return container;
    }




    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("event");
    }

    @Bean
    @Qualifier("reinitialize")
    public Queue reInitQueue() {
        return new Queue("bus."+config.getConsumerName(),false,true,true);
    }

    @Bean
    public Binding topicBinding(@Qualifier("reinitialize") Queue queue, FanoutExchange fanoutExchangee) {
        return BindingBuilder
                .bind(queue)
                .to(fanoutExchangee);
    }

    @Bean
    public MessageListener<Consumer> messageListener(RabbitTemplate rabbitTemplate,Consumer target){
        return new MessageListener<>(rabbitTemplate, target, "engine", config.getConsumerName());
    }

生产者配置代码也是

    @Bean
    public AmqpProxyFactoryBean rerouteProxy(RabbitTemplate template) {
        AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
        proxy.setAmqpTemplate(template);
        proxy.setServiceInterface(ConsumerService.class);
        proxy.setRoutingKey(ConsumerService.class.getSimpleName());
        return proxy;
    }

    @Bean
    public Map<String,Consumer> consumerEngines( RabbitTemplate template){
        Map<String,Consumer> ret= new ConcurrentHashMap<>();
        //FIXme read from config
        List<String> lst = Arrays.asList(config.getEngines());
        lst.parallelStream().forEach(k->{
                AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
                template.setReceiveTimeout(400);
                template.setReplyTimeout(400);
                proxy.setAmqpTemplate(template);
                proxy.setServiceInterface(Consumer.class);
                proxy.setRoutingKey(Consumer.class.getSimpleName() + "_" + k);
                proxy.afterPropertiesSet();
                ret.put(k, (Consumer) proxy.getObject());
        });
        return ret;
    }

导致此问题的原因以及解决方法?

注意 1:我在不同的服务器上有 3 个生产者和 3 个消费者,而 rabbit 在另一台服务器上 运行

ٔ注意2:消费者非常快,他们的响应时间不到100毫秒

Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout

这是由以下两个原因之一造成的

  • 回复到达的时间太长(在这种情况下,发送和接收操作会提前 null 返回)。

  • 消费者对同一个请求发送了多个回复