RabbitMQ CachingConnectionFactory 和 publisherReturns 配置

RabbitMQ CachingConnectionFactory and publisherReturns configuration

的延续

我为多个队列注册了 MessageListenerContainer。

我应该在哪里以及如何配置通道 returnListener? - 我认为我做事的方式是错误的。我将 CachingConnectionFactory configuration 插入到 createQueueBMessageListener(...) - 负责创建多个 MessageListeners 之一的方法。

1. CachingConnectionFactory 应该如何额外配置 Spring 和 Rabbit 方式? 目前我没有在 Java 中配置它(仅由 application.properties 和 K8S 环境中的管理员配置)。我只注入了 ConnectionFactory 并在 SimpleMessageListenerContainer 中将其设置为 connectionFactory(如 createQueueAMessageListener(...)),我什至不知道它是 CachingConnectionFactory。

有没有类似 CachingConnectionFactoryConfigurer 的东西?

2。为什么 ReturnListener.handleReturn(..) 没有被执行? ChannelListener.onCreate(...) 被执行了。

3。在 cachingConnectionFactory.setCloseExceptionLogger 中检查丢失的交换异常并执行 System.exit(1) 对我来说似乎是错误的,不是吗? 但这就是我现在设法做的。我想在绑定创建期间没有交换时应用程序不启动。当我抛出异常时,应用程序仍然启动。 ReturnListener.handleReturn 似乎是一个更好的地方,但是当配置如下时它不会执行。

4.我怎样才能优雅地停止 Spring 应用程序上下文而不是 System.exit(1)? - 抛出异常不会停止应用程序上下文。这种情况下如何让RabbitMq启动失败? - 当在 Spring 应用程序上下文启动时创建 @Bean 绑定失败。

@Bean
MessageListenerContainer createQueueAMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                   ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueA");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueBMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory,
                                                     CachingConnectionFactory cachingConnectionFactory) {

    // I think configuring CachingConnectionFactory here is a lame, isn't it? Of course connectionFactory is redundant now, I left it to show how was it done earlier.
    // Where and how should I add listeners to CachingConnectionFactory?

    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    cachingConnectionFactory.setPublisherReturns(true);
    cachingConnectionFactory.addChannelListener(new ChannelListener() {
        @Override
        public void onCreate(final Channel channel, final boolean transactional) {

            log.info("channelListener onCreate - this is executed");

            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(final int replyCode, final String replyText, final String exchange, final String routingKey,
                                         final AMQP.BasicProperties properties,
                                         final byte[] body) throws IOException
                {
                    log.info("!!! Why is this not executed ?!!! handleReturn replyCode: " + replyCode + " replyText: " + replyText);
                }
            });
        }
    });
    cachingConnectionFactory.addConnectionListener(new ConnectionListener() {
        @Override
        public void onCreate(final Connection connection) {
            log.info("connectionListener onCreate - this is executed" + connection);
        }
    });

    cachingConnectionFactory.setCloseExceptionLogger(new ConditionalExceptionLogger() {
        @Override
        public void log(final Log logger, final String message, final Throwable t) {
            try {
                logger.error(message + ": " + t.getMessage());
                if (t.getMessage().contains("reply-code=404, reply-text=NOT_FOUND")) {
                    // throw new RuntimeException(); it doesn't stop Spring ApplicationContext from starting
                    log.error("Executing System.exit(1) command.");
                    // System.exit(1);
                }
            } catch (Exception e) {
                log.error("err in listener ", e);
            }
           
        }
    });


    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueB");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueCMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueC");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

// I think configuring CachingConnectionFactory here is a lame, isn't it?

不“蹩脚”;这是使用 Boot 不直接公开的附加属性配置 bean 的正常方法。

  1. 应该被调用;你试过调试吗?

  2. 你为什么不按照我在这里的建议去做呢 - 这要简单得多。

  3. close() 它 - 但是,由于您使用的是 Spring Boot,它会为您做到这一点 - 它会注册一个关闭上下文的 JVM 关闭挂钩。

编辑

绑定到一个不存在的交易所将失败;您只需要在应用程序完全初始化之前强制它发生,例如在 ApplicationRunner.

@SpringBootApplication
public class So70212347Application {

    public static void main(String[] args) {
        SpringApplication.run(So70212347Application.class, args);
    }

    @Bean
    Binding binding() {
        return new Binding("foo", DestinationType.QUEUE, "doesn't exist", "foo", null);
    }

    @Bean
    Queue queue() {
        return new Queue("foo");
    }


    @Bean
    ApplicationRunner runner(ConnectionFactory cf) {
        return args -> {
            cf.createConnection().close();
        };
    }

}
Created new connection: rabbitConnectionFactory#6a0cbc6f:0/SimpleConnection@6cd164a6 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62884]
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)


Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
Application run failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:761)
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:748)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
    at com.example.demo.So70212347Application.main(So70212347Application.java:16)
Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2192)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2138)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2118)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:691)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$null(RabbitAdmin.java:619)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$afterPropertiesSet(RabbitAdmin.java:618)
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.lambda$onCreate[=11=](CompositeConnectionListener.java:38)
    at java.base/java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807)
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:38)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:730)
    at com.example.demo.So70212347Application.lambda[=11=](So70212347Application.java:33)
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:758)
    ... 5 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:1077)
    at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:46)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157)
    at com.sun.proxy.$Proxy47.queueBind(Unknown Source)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.declareBindings(RabbitAdmin.java:870)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$initialize(RabbitAdmin.java:694)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2227)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2186)
    ... 18 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
...