代理中断后不会重新创建自动删除队列
Auto-delete queue not recreated after broker outage
我有一个关于通过 XML 连接和在 java 代码中创建各种对象来使用 spring-amqp 的问题。
通过 XML 和 Java 代码执行此操作非常简单,并且在大多数情况下对我来说效果很好。但是,在代理中断后(通过关闭并重新启动我的代理来模拟),我得到了两个不同的结果。
当使用 XML 连接创建 spring-amqp 对象时,一切正常。重新建立连接,重新创建队列并恢复接收消息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:ctx="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<rabbit:connection-factory id="connectionFactory"
addresses="192.168.1.10:5672"
username="guest"
password="guest"
/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="testQueue" id="testQueue" auto-declare="true" auto-delete="true" exclusive="false"/>
<rabbit:fanout-exchange name="testExchange" id="testExchange" >
<rabbit:bindings>
<rabbit:binding queue="testQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<bean class="TestConsumer" id="testConsumer" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="testQueue" ref="testConsumer" method="inbound" />
</rabbit:listener-container>
</beans>
但是,当使用 Java 创建 spring-amqp 对象时,我 运行 遇到了问题。重新建立连接,但随后我立即遇到异常,报告未找到队列。管理员不会尝试重新创建队列,并且在出现几次异常后,侦听器容器将停止。
public static void main(String[] args) {
CachingConnectionFactory cf = new CachingConnectionFactory("192.168.1.10", 5672);
RabbitAdmin admin = new RabbitAdmin(cf);
FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);
Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);
admin.declareBinding(BindingBuilder.bind(testQueue).to(testExchange));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cf);
container.setRabbitAdmin(admin);
container.setQueues(testQueue);
container.setMessageListener(new MessageListenerAdapter() {
public void handleMessage(String text) {
System.out.println("Received : " + text);
}
});
container.afterPropertiesSet();
container.start();
try {
Thread.sleep(600000L);
} catch(Exception e) {
e.printStackTrace();
}
container.stop();
container.destroy();
System.out.println("Exiting");
}
这是我在重新建立连接之后和侦听器容器退出之前看到的异常(三四次):
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'testQueue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
我假设 XML 接线盒和纯 Java 接线盒应该等效,但显然情况并非如此。我不确定是否
我的 Java 案例中缺少某些东西,或者这是否不受支持。
我当然可以检测到侦听器容器的故障并使用管理员重新声明队列,但是由于 XML 有线示例不需要这样做,我想知道我可能会怎么做不见了。
使用org.springframework.amqp.spring-rabbit 1.4.5.RELEASE
管理员需要一个 Spring 应用程序上下文来自动声明元素(队列等)。
它向连接工厂注册一个连接侦听器,当建立新连接时,它会在应用程序上下文中查找所有队列等并注册它们。
由于您正在使用 "pure" java(没有 Spring 应用程序上下文),此机制无法工作。
您可以使用 Spring Java 配置来替换您的 XML(使用 @Bean
定义),或者您必须注册自己的 class 与连接工厂一起执行声明。
查看管理源代码 for how he registers the listener and the initialize method 了解他是如何进行声明的。
您只需要做您的
FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);
Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);
在听众中。
我有一个关于通过 XML 连接和在 java 代码中创建各种对象来使用 spring-amqp 的问题。
通过 XML 和 Java 代码执行此操作非常简单,并且在大多数情况下对我来说效果很好。但是,在代理中断后(通过关闭并重新启动我的代理来模拟),我得到了两个不同的结果。
当使用 XML 连接创建 spring-amqp 对象时,一切正常。重新建立连接,重新创建队列并恢复接收消息。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:ctx="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
<rabbit:connection-factory id="connectionFactory"
addresses="192.168.1.10:5672"
username="guest"
password="guest"
/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="testQueue" id="testQueue" auto-declare="true" auto-delete="true" exclusive="false"/>
<rabbit:fanout-exchange name="testExchange" id="testExchange" >
<rabbit:bindings>
<rabbit:binding queue="testQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<bean class="TestConsumer" id="testConsumer" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="testQueue" ref="testConsumer" method="inbound" />
</rabbit:listener-container>
</beans>
但是,当使用 Java 创建 spring-amqp 对象时,我 运行 遇到了问题。重新建立连接,但随后我立即遇到异常,报告未找到队列。管理员不会尝试重新创建队列,并且在出现几次异常后,侦听器容器将停止。
public static void main(String[] args) {
CachingConnectionFactory cf = new CachingConnectionFactory("192.168.1.10", 5672);
RabbitAdmin admin = new RabbitAdmin(cf);
FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);
Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);
admin.declareBinding(BindingBuilder.bind(testQueue).to(testExchange));
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(cf);
container.setRabbitAdmin(admin);
container.setQueues(testQueue);
container.setMessageListener(new MessageListenerAdapter() {
public void handleMessage(String text) {
System.out.println("Received : " + text);
}
});
container.afterPropertiesSet();
container.start();
try {
Thread.sleep(600000L);
} catch(Exception e) {
e.printStackTrace();
}
container.stop();
container.destroy();
System.out.println("Exiting");
}
这是我在重新建立连接之后和侦听器容器退出之前看到的异常(三四次):
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'testQueue' in vhost '/', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
我假设 XML 接线盒和纯 Java 接线盒应该等效,但显然情况并非如此。我不确定是否 我的 Java 案例中缺少某些东西,或者这是否不受支持。
我当然可以检测到侦听器容器的故障并使用管理员重新声明队列,但是由于 XML 有线示例不需要这样做,我想知道我可能会怎么做不见了。
使用org.springframework.amqp.spring-rabbit 1.4.5.RELEASE
管理员需要一个 Spring 应用程序上下文来自动声明元素(队列等)。
它向连接工厂注册一个连接侦听器,当建立新连接时,它会在应用程序上下文中查找所有队列等并注册它们。
由于您正在使用 "pure" java(没有 Spring 应用程序上下文),此机制无法工作。
您可以使用 Spring Java 配置来替换您的 XML(使用 @Bean
定义),或者您必须注册自己的 class 与连接工厂一起执行声明。
查看管理源代码 for how he registers the listener and the initialize method 了解他是如何进行声明的。
您只需要做您的
FanoutExchange testExchange = new FanoutExchange("testExchange", true, false);
admin.declareExchange(testExchange);
Queue testQueue = new Queue("testQueue", true, false, true);
admin.declareQueue(testQueue);
在听众中。