代理中断后不会重新创建自动删除队列

Auto-delete queue not recreated after broker outage

我有一个关于通过 XML 连接和在 java 代码中创建各种对象来使用 spring-amqp 的问题。

通过 XML 和 Java 代码执行此操作非常简单,并且在大多数情况下对我来说效果很好。但是,在代理中断后(通过关闭并重新启动我的代理来模拟),我得到了两个不同的结果。

当使用 XML 连接创建 spring-amqp 对象时,一切正常。重新建立连接,重新创建队列并恢复接收消息。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:ctx="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
                        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
                        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

    <rabbit:connection-factory id="connectionFactory" 
        addresses="192.168.1.10:5672" 
        username="guest" 
        password="guest" 
    />

    <rabbit:admin connection-factory="connectionFactory"/> 

    <rabbit:queue name="testQueue" id="testQueue" auto-declare="true" auto-delete="true" exclusive="false"/>

    <rabbit:fanout-exchange name="testExchange" id="testExchange" >
        <rabbit:bindings>
            <rabbit:binding queue="testQueue" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <bean class="TestConsumer" id="testConsumer" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="testQueue" ref="testConsumer" method="inbound" />
    </rabbit:listener-container>

</beans>

但是,当使用 Java 创建 spring-amqp 对象时,我 运行 遇到了问题。重新建立连接,但随后我立即遇到异常,报告未找到队列。管理员不会尝试重新创建队列,并且在出现几次异常后,侦听器容器将停止。

public static void main(String[] args) {

    CachingConnectionFactory cf = new CachingConnectionFactory("192.168.1.10", 5672);

    RabbitAdmin admin = new RabbitAdmin(cf);

    FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
    admin.declareExchange(testExchange);

    Queue testQueue = new Queue("testQueue", true, false, true);
    admin.declareQueue(testQueue);

    admin.declareBinding(BindingBuilder.bind(testQueue).to(testExchange));

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(cf);
    container.setRabbitAdmin(admin);

    container.setQueues(testQueue);
    container.setMessageListener(new MessageListenerAdapter() {
        public void handleMessage(String text) {
            System.out.println("Received : " + text);
        }
    });
    container.afterPropertiesSet();

    container.start();

    try {
        Thread.sleep(600000L);
    } catch(Exception e) {
        e.printStackTrace();
    }

    container.stop();
    container.destroy();

    System.out.println("Exiting");
}

这是我在重新建立连接之后和侦听器容器退出之前看到的异常(三四次):

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'testQueue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)

我假设 XML 接线盒和纯 Java 接线盒应该等效,但显然情况并非如此。我不确定是否 我的 Java 案例中缺少某些东西,或者这是否不受支持。

我当然可以检测到侦听器容器的故障并使用管理员重新声明队列,但是由于 XML 有线示例不需要这样做,我想知道我可能会怎么做不见了。

使用org.springframework.amqp.spring-rabbit 1.4.5.RELEASE

管理员需要一个 Spring 应用程序上下文来自动声明元素(队列等)。

它向连接工厂注册一个连接侦听器,当建立新连接时,它会在应用程序上下文中查找所有队列等并注册它们。

由于您正在使用 "pure" java(没有 Spring 应用程序上下文),此机制无法工作。

您可以使用 Spring Java 配置来替换您的 XML(使用 @Bean 定义),或者您必须注册自己的 class 与连接工厂一起执行声明。

查看管理源代码 for how he registers the listener and the initialize method 了解他是如何进行声明的。

您只需要做您的

FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);

Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);

在听众中。