在 Spring-Boot 中停止 RabbitMQ-Connection
Stop RabbitMQ-Connection in Spring-Boot
我有一个 spring-boot 应用程序,它从 RabbitMQ 队列中提取所有消息,然后终止。我使用包 spring-boot-starter-amqp
(版本 2.4.0)中的 rabbitTemplate
,即 receiveAndConvert()
。不知何故,我无法让我的应用程序再次启动和停止。当 rabbitConnectionFactory 创建时,它永远不会停止。
根据 Google 和其他 Whosebug 问题,在 rabbitTemplate 上调用 stop()
或 destroy()
应该可以完成这项工作,但这不起作用。
rabbitTemplate 被注入到构造函数中。
这是一些代码:
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Object msg = getMessage();
while (msg != null) {
try {
String name = ((LinkedHashMap) msg).get(propertyName).toString();
//business logic
logger.debug("added_" + name);
} catch (Exception e) {
logger.error("" + e.getMessage());
}
msg = getMessage();
}
rabbitTemplate.stop();
private Object getMessage() {
try {
return rabbitTemplate.receiveAndConvert(queueName);
} catch (Exception e) {
logger.error("" + e.getMessage());
return null;
}
}
那么,如何正确终止与 RabbitMQ 的连接?
感谢您的咨询。
如果我要这样做,我会使用 @RabbitListener
接收消息并使用 RabbitListenerEndpointRegistry
启动和停止侦听器。示例代码如下
@EnableScheduling
@SpringBootApplication
public class Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
public static final String queueName = "Hello";
@Bean
public Queue hello() {
return new Queue(queueName);
}
@Autowired
private RabbitTemplate template;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
System.out.println(" [x] Sent '" + message + "'");
}
@Autowired
RabbitListenerEndpointRegistry registry;
@Override
public void run(ApplicationArguments args) throws Exception {
registry.getListenerContainer( Application.queueName).start();
Thread.sleep(10000L);
registry.getListenerContainer( Application.queueName).stop();
}
}
@Component
class Receiver {
@RabbitListener(id= Application.queueName,queues = Application.queueName)
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
您可以在 CachingConnectionFactory
上调用 resetConnection()
来关闭连接。
或close()
应用上下文。
我有一个 spring-boot 应用程序,它从 RabbitMQ 队列中提取所有消息,然后终止。我使用包 spring-boot-starter-amqp
(版本 2.4.0)中的 rabbitTemplate
,即 receiveAndConvert()
。不知何故,我无法让我的应用程序再次启动和停止。当 rabbitConnectionFactory 创建时,它永远不会停止。
根据 Google 和其他 Whosebug 问题,在 rabbitTemplate 上调用 stop()
或 destroy()
应该可以完成这项工作,但这不起作用。
rabbitTemplate 被注入到构造函数中。
这是一些代码:
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
Object msg = getMessage();
while (msg != null) {
try {
String name = ((LinkedHashMap) msg).get(propertyName).toString();
//business logic
logger.debug("added_" + name);
} catch (Exception e) {
logger.error("" + e.getMessage());
}
msg = getMessage();
}
rabbitTemplate.stop();
private Object getMessage() {
try {
return rabbitTemplate.receiveAndConvert(queueName);
} catch (Exception e) {
logger.error("" + e.getMessage());
return null;
}
}
那么,如何正确终止与 RabbitMQ 的连接?
感谢您的咨询。
如果我要这样做,我会使用 @RabbitListener
接收消息并使用 RabbitListenerEndpointRegistry
启动和停止侦听器。示例代码如下
@EnableScheduling
@SpringBootApplication
public class Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
public static final String queueName = "Hello";
@Bean
public Queue hello() {
return new Queue(queueName);
}
@Autowired
private RabbitTemplate template;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
System.out.println(" [x] Sent '" + message + "'");
}
@Autowired
RabbitListenerEndpointRegistry registry;
@Override
public void run(ApplicationArguments args) throws Exception {
registry.getListenerContainer( Application.queueName).start();
Thread.sleep(10000L);
registry.getListenerContainer( Application.queueName).stop();
}
}
@Component
class Receiver {
@RabbitListener(id= Application.queueName,queues = Application.queueName)
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}
}
您可以在 CachingConnectionFactory
上调用 resetConnection()
来关闭连接。
或close()
应用上下文。