如何在 RabbitMQ 中使用 ConnectionListner and/or ChannelListner 记录 failure/success 消息传递

How to use ConnectionListner and/or ChannelListner for logging failure/success of message delivery in RabbitMQ

我正在尝试记录在 RabbitMQ 中发送消息期间发生的任何信息或异常,为此我尝试在现有连接工厂上添加 ConnectionListener。

    kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {

        @Override
        public void onCreate(Connection connection) {
            System.out.println("Connection Created");
        }

        @Override
        public void onShutDown(ShutdownSignalException signal) {
            System.out.println("Connection Shutdown "+signal.getMessage());
        }

    });
    kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       
    

为了测试异常情况,我从 RabbitMQ 控制台取消绑定甚至删除了队列。但是我没有得到任何异常或任何关闭方法调用。

虽然,当我停止 RabbitMQ 服务时,我得到了

Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

但是这个异常不是我添加的监听器。

我想知道

  1. 为什么我没有收到任何异常或来自关闭方法的调用
  2. 如何使用 ConnectionListner and/or ChannelListner 来记录 failure/success 消息传递。
  3. 我们可以使用 AMQP 附加程序吗?如果可以,我们该怎么做? (任何示例/教程)
  4. 确保消息发送的其他方法是什么?

注意:我不想使用发布确认的方式。

Connection Refused 不是 ShutdownSignalException - 从未建立连接,因为 server/port.

上不存在代理

您不能使用侦听器来确认传送或 return 个别消息;为此使用发布者确认和 returns。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async

有关如何使用附加程序的信息,请参阅文档。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging

编辑

要获得连接失败的通知,您目前需要使用其他技术,具体取决于您是发送还是接收。

这是一个说明如何操作的示例:

@SpringBootApplication
public class So66882099Application {

    private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So66882099Application.class, args);
    }

    @RabbitListener(queues = "foo")
    void listen(String in) {

    }

    // consumer side listeners for no connection

    @EventListener
    void consumerFailed(ListenerContainerConsumerFailedEvent event) {
        log.error(event + " via event listener");
        if (event.getThrowable() instanceof AmqpConnectException) {
            log.error("Broker down?");
        }
    }

    // or

    @Bean
    ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
        return event -> log.error(event + " via application listener");
    }

    // producer side - use a RetryListener

    @Bean
    RabbitTemplate template(ConnectionFactory cf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
        RetryTemplate retry = new RetryTemplate();
        // configure retries here as needed
        retry.registerListener(new RetryListener() {

            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                return true;
            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {

                log.error("Send failed " + throwable.getMessage());
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {
            }

        });
        rabbitTemplate.setRetryTemplate(retry);
        return rabbitTemplate;
    }


    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            try {
                template.convertAndSend("foo", "bar");
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
    }

}