如何在 RabbitMQ 中使用 ConnectionListner and/or ChannelListner 记录 failure/success 消息传递
How to use ConnectionListner and/or ChannelListner for logging failure/success of message delivery in RabbitMQ
我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,为此我尝试在现有连接工厂上添加 ConnectionListener。
kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
System.out.println("Connection Created");
}
@Override
public void onShutDown(ShutdownSignalException signal) {
System.out.println("Connection Shutdown "+signal.getMessage());
}
});
kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);
为了测试异常情况,我从 RabbitMQ 控制台取消绑定甚至删除了队列。但是我没有得到任何异常或任何关闭方法调用。
虽然,当我停止 RabbitMQ 服务时,我得到了
Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
但是这个异常不是我添加的监听器。
我想知道
- 为什么我没有收到任何异常或来自关闭方法的调用
- 如何使用 ConnectionListner and/or ChannelListner 来记录 failure/success 消息传递。
- 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
- 确保消息发送的其他方法是什么?
注意:我不想使用发布确认的方式。
Connection Refused
不是 ShutdownSignalException
- 从未建立连接,因为 server/port.
上不存在代理
您不能使用侦听器来确认传送或 return 个别消息;为此使用发布者确认和 returns。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async
有关如何使用附加程序的信息,请参阅文档。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging
编辑
要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。
这是一个说明如何操作的示例:
@SpringBootApplication
public class So66882099Application {
private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
public static void main(String[] args) {
SpringApplication.run(So66882099Application.class, args);
}
@RabbitListener(queues = "foo")
void listen(String in) {
}
// consumer side listeners for no connection
@EventListener
void consumerFailed(ListenerContainerConsumerFailedEvent event) {
log.error(event + " via event listener");
if (event.getThrowable() instanceof AmqpConnectException) {
log.error("Broker down?");
}
}
// or
@Bean
ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
return event -> log.error(event + " via application listener");
}
// producer side - use a RetryListener
@Bean
RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
RetryTemplate retry = new RetryTemplate();
// configure retries here as needed
retry.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
log.error("Send failed " + throwable.getMessage());
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
}
});
rabbitTemplate.setRetryTemplate(retry);
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
try {
template.convertAndSend("foo", "bar");
}
catch (Exception e) {
e.printStackTrace();
}
};
}
}
我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,为此我尝试在现有连接工厂上添加 ConnectionListener。
kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
System.out.println("Connection Created");
}
@Override
public void onShutDown(ShutdownSignalException signal) {
System.out.println("Connection Shutdown "+signal.getMessage());
}
});
kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);
为了测试异常情况,我从 RabbitMQ 控制台取消绑定甚至删除了队列。但是我没有得到任何异常或任何关闭方法调用。
虽然,当我停止 RabbitMQ 服务时,我得到了
Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
但是这个异常不是我添加的监听器。
我想知道
- 为什么我没有收到任何异常或来自关闭方法的调用
- 如何使用 ConnectionListner and/or ChannelListner 来记录 failure/success 消息传递。
- 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
- 确保消息发送的其他方法是什么?
注意:我不想使用发布确认的方式。
Connection Refused
不是 ShutdownSignalException
- 从未建立连接,因为 server/port.
您不能使用侦听器来确认传送或 return 个别消息;为此使用发布者确认和 returns。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async
有关如何使用附加程序的信息,请参阅文档。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging
编辑
要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。
这是一个说明如何操作的示例:
@SpringBootApplication
public class So66882099Application {
private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
public static void main(String[] args) {
SpringApplication.run(So66882099Application.class, args);
}
@RabbitListener(queues = "foo")
void listen(String in) {
}
// consumer side listeners for no connection
@EventListener
void consumerFailed(ListenerContainerConsumerFailedEvent event) {
log.error(event + " via event listener");
if (event.getThrowable() instanceof AmqpConnectException) {
log.error("Broker down?");
}
}
// or
@Bean
ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
return event -> log.error(event + " via application listener");
}
// producer side - use a RetryListener
@Bean
RabbitTemplate template(ConnectionFactory cf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
RetryTemplate retry = new RetryTemplate();
// configure retries here as needed
retry.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
log.error("Send failed " + throwable.getMessage());
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
}
});
rabbitTemplate.setRetryTemplate(retry);
return rabbitTemplate;
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
try {
template.convertAndSend("foo", "bar");
}
catch (Exception e) {
e.printStackTrace();
}
};
}
}