Spring AMQP - 匿名队列声明
Spring AMQP - anonymous queue declaration
我正在尝试使用 Spring AMQP 注释来声明匿名队列,但它似乎不起作用。
@RabbitListener(
id = "b1",
bindings =
@QueueBinding(
value =
@Queue(
name = "",
durable = "false",
exclusive = "true",
autoDelete = "true",
admins = "amqpAdmin1"),
exchange =
@Exchange(
value = "${my.rabbitmq[0].exchange}", declare = "false",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC,
admins = "amqpAdmin1"),
key = "*.*.*.*.*.*.*.-",
admins = "amqpAdmin1"),
admin = "amqpAdmin1",
containerFactory = "lf1",
autoStartup = "true")
我读到我可以使用空 name
参数,它应该创建匿名队列 - 但问题是我需要服务器生成的名称(我没有权限命名我自己的队列)。当前使用 spring AMQP lib 是否可能?
我在它当前使用的代码中看到
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
Collection<Declarable> declarables) {
String queueName = (String) resolveExpression(bindingQueue.value());
boolean isAnonymous = false;
if (!StringUtils.hasText(queueName)) {
queueName = Base64UrlNamingStrategy.DEFAULT.generateName(); //<---- HERE
// default exclusive/autodelete and non-durable when anonymous
isAnonymous = true;
}
但这里不是 Base64UrlNamingStrategy.DEFAULT.generateName();
-> 我需要 spring 去 RabbitMQ 代理并获取 RabbitMQ 生成的名称并使用那个。
更新
@Bean
public Queue q2(){
Queue q = QueueBuilder.nonDurable().exclusive().autoDelete().build();
//Queue q = new Queue("", false, true, true); --> same
q.setAdminsThatShouldDeclare(amqpAdmin2());
return q;
}
@Bean
public Exchange e2(@Value("${my.rabbitmq[1].exchange}") String name){
Exchange e = ExchangeBuilder.topicExchange(name).ignoreDeclarationExceptions().build();
e.setAdminsThatShouldDeclare(amqpAdmin2());
return e;
}
@Bean
public Binding bin2(Queue q2, Exchange e2){
Binding b = BindingBuilder.bind(q2).to(e2).with("*.*.*.*.*.*.*.-").noargs();
b.setAdminsThatShouldDeclare(amqpAdmin2());
return b;
}
@RabbitListener(id = "b2", queues = "#{q2}",
admin = "amqpAdmin2",
containerFactory = "lf2", autoStartup = "true")
public void listen2(GenericMessage<?> msg) {
comparator.recordForComparison(props.getRabbitmq().get(1).getId(), msg.getPayload());
}
这是在日志中:
Auto-declaring a non-durable, auto-delete, or exclusive Queue () durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2022-02-10 09:22:43.660 INFO 30076 --- [ main] o.s.a.r.l.DirectMessageListenerContainer : Container initialized for queues: [spring.gen-ZAVana-pSSW2WaanrkVZXw_awaiting_declaration]
2022-02-10 09:22:43.661 INFO 30076 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: my.server.com:5671
2022-02-10 09:22:43.765 ERROR 30076 --- [ b2-1] o.s.a.r.l.DirectMessageListenerContainer : Queue not present, scheduling consumer SimpleConsumer [queue=spring.gen-ZAVana-pSSW2WaanrkVZXw_awaiting_declaration, index=0, consumerTag=null identity=3f844994] for restart
这是我在删除 @RabbitListener
后添加的内容。现在它适用于此:
@Bean
public MessageListenerContainer c2(DirectRabbitListenerContainerFactory lf2, Queue q2){
DirectMessageListenerContainer lc = lf2.createListenerContainer();
lc.setQueues(q2);
lc.setAutoStartup(true);
MessageListenerAdapter a = new MessageListenerAdapter();
a.setMessageConverter(converter());
a.setDefaultListenerMethod("receive");
a.setDelegate(new MessageReceiver(props.getRabbitmq().get(1).getId()));
lc.setupMessageListener(a);
lc.afterPropertiesSet();
return lc;
}
class MessageReceiver{
private final int id;
public MessageReceiver(int id){
this.id= id;
}
void receive(Object o){
comparator.recordForComparison(this.id, o);
}
}
我认为在我们实施此 https://jira.spring.io/browse/AMQP-816 时,应将其视为错误和缺失部分。因此,使用常规 Queue
bean 定义,您可以将名称设置为空字符串,并且 actualName
将在通过 RabbitAdmin
.
在代理上声明后设置
考虑将其作为 use-case 的解决方法。您必须将此 @QueueBinding
移动到 Binding
、Queue
和 Exchange
bean 定义。然后你只需使用 queues = "#{myQueueBean}"
。有关详细信息,请参阅文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#builder-api.
请针对此错误提出 GH 问题,我们会尽快修复它。
我发现了问题 - @RabbitListener
bean 后处理器在它被声明之前抓取了名称。
这是一个解决方法(手动注入队列):
@SpringBootApplication
public class So71033589Application {
public static void main(String[] args) {
SpringApplication.run(So71033589Application.class, args);
}
@RabbitListener(id = "foo", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
}
@Configuration
class Config{
@Bean
Queue q() {
return new Queue("", false, true, true);
}
@Bean
ApplicationRunner runner(RabbitListenerEndpointRegistry registry, Queue q) {
return args -> {
MessageListenerContainer container = registry.getListenerContainer("foo");
((AbstractMessageListenerContainer) container).setQueues(q);
container.start();
};
}
}
我正在尝试使用 Spring AMQP 注释来声明匿名队列,但它似乎不起作用。
@RabbitListener(
id = "b1",
bindings =
@QueueBinding(
value =
@Queue(
name = "",
durable = "false",
exclusive = "true",
autoDelete = "true",
admins = "amqpAdmin1"),
exchange =
@Exchange(
value = "${my.rabbitmq[0].exchange}", declare = "false",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC,
admins = "amqpAdmin1"),
key = "*.*.*.*.*.*.*.-",
admins = "amqpAdmin1"),
admin = "amqpAdmin1",
containerFactory = "lf1",
autoStartup = "true")
我读到我可以使用空 name
参数,它应该创建匿名队列 - 但问题是我需要服务器生成的名称(我没有权限命名我自己的队列)。当前使用 spring AMQP lib 是否可能?
我在它当前使用的代码中看到
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
Collection<Declarable> declarables) {
String queueName = (String) resolveExpression(bindingQueue.value());
boolean isAnonymous = false;
if (!StringUtils.hasText(queueName)) {
queueName = Base64UrlNamingStrategy.DEFAULT.generateName(); //<---- HERE
// default exclusive/autodelete and non-durable when anonymous
isAnonymous = true;
}
但这里不是 Base64UrlNamingStrategy.DEFAULT.generateName();
-> 我需要 spring 去 RabbitMQ 代理并获取 RabbitMQ 生成的名称并使用那个。
更新
@Bean
public Queue q2(){
Queue q = QueueBuilder.nonDurable().exclusive().autoDelete().build();
//Queue q = new Queue("", false, true, true); --> same
q.setAdminsThatShouldDeclare(amqpAdmin2());
return q;
}
@Bean
public Exchange e2(@Value("${my.rabbitmq[1].exchange}") String name){
Exchange e = ExchangeBuilder.topicExchange(name).ignoreDeclarationExceptions().build();
e.setAdminsThatShouldDeclare(amqpAdmin2());
return e;
}
@Bean
public Binding bin2(Queue q2, Exchange e2){
Binding b = BindingBuilder.bind(q2).to(e2).with("*.*.*.*.*.*.*.-").noargs();
b.setAdminsThatShouldDeclare(amqpAdmin2());
return b;
}
@RabbitListener(id = "b2", queues = "#{q2}",
admin = "amqpAdmin2",
containerFactory = "lf2", autoStartup = "true")
public void listen2(GenericMessage<?> msg) {
comparator.recordForComparison(props.getRabbitmq().get(1).getId(), msg.getPayload());
}
这是在日志中:
Auto-declaring a non-durable, auto-delete, or exclusive Queue () durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2022-02-10 09:22:43.660 INFO 30076 --- [ main] o.s.a.r.l.DirectMessageListenerContainer : Container initialized for queues: [spring.gen-ZAVana-pSSW2WaanrkVZXw_awaiting_declaration]
2022-02-10 09:22:43.661 INFO 30076 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: my.server.com:5671
2022-02-10 09:22:43.765 ERROR 30076 --- [ b2-1] o.s.a.r.l.DirectMessageListenerContainer : Queue not present, scheduling consumer SimpleConsumer [queue=spring.gen-ZAVana-pSSW2WaanrkVZXw_awaiting_declaration, index=0, consumerTag=null identity=3f844994] for restart
这是我在删除 @RabbitListener
后添加的内容。现在它适用于此:
@Bean
public MessageListenerContainer c2(DirectRabbitListenerContainerFactory lf2, Queue q2){
DirectMessageListenerContainer lc = lf2.createListenerContainer();
lc.setQueues(q2);
lc.setAutoStartup(true);
MessageListenerAdapter a = new MessageListenerAdapter();
a.setMessageConverter(converter());
a.setDefaultListenerMethod("receive");
a.setDelegate(new MessageReceiver(props.getRabbitmq().get(1).getId()));
lc.setupMessageListener(a);
lc.afterPropertiesSet();
return lc;
}
class MessageReceiver{
private final int id;
public MessageReceiver(int id){
this.id= id;
}
void receive(Object o){
comparator.recordForComparison(this.id, o);
}
}
我认为在我们实施此 https://jira.spring.io/browse/AMQP-816 时,应将其视为错误和缺失部分。因此,使用常规 Queue
bean 定义,您可以将名称设置为空字符串,并且 actualName
将在通过 RabbitAdmin
.
考虑将其作为 use-case 的解决方法。您必须将此 @QueueBinding
移动到 Binding
、Queue
和 Exchange
bean 定义。然后你只需使用 queues = "#{myQueueBean}"
。有关详细信息,请参阅文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/#builder-api.
请针对此错误提出 GH 问题,我们会尽快修复它。
我发现了问题 - @RabbitListener
bean 后处理器在它被声明之前抓取了名称。
这是一个解决方法(手动注入队列):
@SpringBootApplication
public class So71033589Application {
public static void main(String[] args) {
SpringApplication.run(So71033589Application.class, args);
}
@RabbitListener(id = "foo", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
}
@Configuration
class Config{
@Bean
Queue q() {
return new Queue("", false, true, true);
}
@Bean
ApplicationRunner runner(RabbitListenerEndpointRegistry registry, Queue q) {
return args -> {
MessageListenerContainer container = registry.getListenerContainer("foo");
((AbstractMessageListenerContainer) container).setQueues(q);
container.start();
};
}
}