在运行时创建和绑定一个独占和自动删除的 rabbitmq 队列,定义的到期时间失败

Creating and binding an exclusive and auto delete rabbitmq queue at runtime, with a defined expiry time fails

我有一个用例,我需要在运行时创建新队列,并为这些新创建的队列创建消费者。在运行时创建的队列应该是独占的,并且在到期时间自动删除。我遵循了建议的模式 - here 如果我声明它们既是独占的又是自动删除的,没有任何 x-expires 参数它有效。但是,如果我设置它,每当应用程序尝试在运行时创建新队列时,我都会在控制台中看到一条错误消息。看起来参数名称是错误的,或者可能不是 spring 内部期望的名称。只是看看如何设置到期时间。 以下是我的 类:

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;

import java.util.HashMap;
import java.util.Map;

import static org.springframework.amqp.support.AmqpHeaders.CONSUMER_QUEUE;

@Configuration
@EnableRabbit
@Slf4j
@RequiredArgsConstructor
public class PrintJobListenerConfiguration
{

  @RabbitListener(queues = "${queue.name}")
  public void listen(@Header(CONSUMER_QUEUE) String queue) {
    log.info("Message read from the queue : {}", queue);
  }

  @Bean
  public Queue queue(@Value("${queue.name}") String name) {
    Map arg = new HashMap<>();
    arg.put("x-expires","20000");
    return new Queue(name, true, true, true, arg);
  }

  @Bean
  DirectExchange exchange(@Value("${exchange.name}") String exchange) {
    return new DirectExchange(exchange);
  }

  @Bean
  Binding binding(
      @Value("${routing.key}") String routingkey, Queue queue, DirectExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingkey);
  }

  @Bean
  public RabbitAdmin admin(ConnectionFactory cf) {
    return new RabbitAdmin(cf);
  }
}

=========================================================================================

import com.mm.alchemy.dynamicqueue.PrintJobListenerConfiguration;
import com.mm.alchemy.print.properties.ApplicationProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@Slf4j
@Service
@RequiredArgsConstructor
public class DynamicQueueListenerService {
  private final Map<String, ConfigurableApplicationContext> children = new HashMap<>();
  private final ApplicationContext context;
  private final ApplicationProperties applicationProperties;

  public void addNewDynamicQueueAndListener(String queue) {
    children.put(queue, addNewListener(queue));
  }

  private ConfigurableApplicationContext addNewListener(String queue) {
    AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
    child.setParent(context);
    ConfigurableEnvironment environment = child.getEnvironment();
    Properties properties = new Properties();
    properties.setProperty("queue.name", queue);
    properties.setProperty(
        "exchange.name", applicationProperties.getConsumer().getPrintJobRequestExchangeName());
    properties.setProperty("routing.key", queue);
    PropertiesPropertySource pps = new PropertiesPropertySource("props", properties);
    environment.getPropertySources().addLast(pps);
    child.register(PrintJobListenerConfiguration.class);
    child.refresh();
    return child;
  }
}


尝试在运行时创建队列时失败堆栈跟踪:

2021-05-05 20:28:32.096  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:32.119 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:32.139  INFO 96868 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2021-05-05 20:28:33.143  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:33.150 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:35.157  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:35.165 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:39.171  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:39.178 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:44.182  INFO 96868 --- [  restartedMain] o.s.amqp.rabbit.core.RabbitAdmin         : Auto-declaring a non-durable, auto-delete, or exclusive Queue (STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2021-05-05 20:28:44.189 ERROR 96868 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-expires' for queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)
2021-05-05 20:28:44.193  INFO 96868 --- [  restartedMain] o.s.a.r.l.SimpleMessageListenerContainer : Broker not available; cannot force queue declarations during start: java.io.IOException
2021-05-05 20:28:44.205  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414
2021-05-05 20:28:44.209  WARN 96868 --- [ntContainer#0-1] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:733) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:608) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:595) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1347) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1192) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1012) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:46) ~[amqp-client-5.10.0.jar:5.10.0]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157) ~[spring-rabbit-2.3.6.jar:2.3.6]
    at com.sun.proxy.$Proxy83.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.3.6.jar:2.3.6]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'STAQ-ENTR-00000000-0000-1000-8000-F4A997AA4414' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection.access0(AMQConnection.java:47) ~[amqp-client-5.10.0.jar:5.10.0]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666) ~[amqp-client-5.10.0.jar:5.10.0]
    ... 1 common frames omitted

arg.put("x-expires","20000");

reply-text=PRECONDITION_FAILED - invalid arg 'x-expires'

x-expires 是整数参数,不是字符串。

我建议使用具有类型安全方法的 QueueBuilder...

@Bean
Queue queue() {
    return QueueBuilder.durable("queue")
            .autoDelete()
            .exclusive()
            .expires(20000)
            .build();
}