在 Spring-Boot 中停止 RabbitMQ-Connection

Stop RabbitMQ-Connection in Spring-Boot

我有一个 spring-boot 应用程序,它从 RabbitMQ 队列中提取所有消息,然后终止。我使用包 spring-boot-starter-amqp(版本 2.4.0)中的 rabbitTemplate,即 receiveAndConvert()。不知何故,我无法让我的应用程序再次启动和停止。当 rabbitConnectionFactory 创建时,它永远不会停止。 根据 Google 和其他 Whosebug 问题,在 rabbitTemplate 上调用 stop()destroy() 应该可以完成这项工作,但这不起作用。 rabbitTemplate 被注入到构造函数中。

这是一些代码:

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Object msg = getMessage();
while (msg != null) {
    try {
        String name = ((LinkedHashMap) msg).get(propertyName).toString();
        //business logic
        logger.debug("added_" + name);
    } catch (Exception e) {
        logger.error("" + e.getMessage());
    }
    msg = getMessage();
}
rabbitTemplate.stop();
private Object getMessage() {
    try {
        return rabbitTemplate.receiveAndConvert(queueName);
    } catch (Exception e) {
        logger.error("" + e.getMessage());
        return null;
    }
}

那么,如何正确终止与 RabbitMQ 的连接?

感谢您的咨询。

如果我要这样做,我会使用 @RabbitListener 接收消息并使用 RabbitListenerEndpointRegistry 启动和停止侦听器。示例代码如下

@EnableScheduling
@SpringBootApplication
public class Application implements ApplicationRunner {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    public static final String queueName = "Hello";

    @Bean
    public Queue hello() {
        return new Queue(queueName);
    }

    @Autowired
    private RabbitTemplate template;
    
    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        String message = "Hello World!";
        this.template.convertAndSend(queueName, message);
        System.out.println(" [x] Sent '" + message + "'");
    }
    
    @Autowired
    RabbitListenerEndpointRegistry registry;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        registry.getListenerContainer( Application.queueName).start();
        Thread.sleep(10000L);
        registry.getListenerContainer( Application.queueName).stop();
    }
}

@Component
class Receiver {
    @RabbitListener(id= Application.queueName,queues = Application.queueName)
    public void receive(String in) {
        System.out.println(" [x] Received '" + in + "'");
    }
}

您可以在 CachingConnectionFactory 上调用 resetConnection() 来关闭连接。

close()应用上下文。