Spring & RabbitMQ - 在运行时注册队列

Spring & RabbitMQ - register queue at runtime

如何在 运行 时间内创建绑定到扇出交换的新队列并 运行 它?到目前为止我有这个:

Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 600000L);

    GenericBeanDefinition runtimeQueueBean = new GenericBeanDefinition();
    runtimeQueueBean.setBeanClass(Queue.class);
    runtimeQueueBean.setLazyInit(false);
    runtimeQueueBean.setAbstract(false);
    runtimeQueueBean.setAutowireCandidate(true);
    ConstructorArgumentValues queueConstrArgs = new ConstructorArgumentValues();
    queueConstrArgs.addIndexedArgumentValue(0, queueName);
    queueConstrArgs.addIndexedArgumentValue(1, true);
    queueConstrArgs.addIndexedArgumentValue(2, false);
    queueConstrArgs.addIndexedArgumentValue(3, false);
    queueConstrArgs.addIndexedArgumentValue(4, arguments);
    runtimeQueueBean.setConstructorArgumentValues(queueConstrArgs);
    this.context.registerBeanDefinition("nejm", runtimeQueueBean);


    GenericBeanDefinition runtimeFanoutExchange = new GenericBeanDefinition();
    runtimeFanoutExchange.setBeanClass(FanoutExchange.class);
    runtimeFanoutExchange.setLazyInit(false);
    runtimeFanoutExchange.setAbstract(false);
    runtimeFanoutExchange.setAutowireCandidate(true);
    ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, "staticCache");
    runtimeFanoutExchange.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("staticCache", runtimeFanoutExchange);


    GenericBeanDefinition runtimeBinding = new GenericBeanDefinition();
    runtimeBinding.setBeanClass(Binding.class);
    runtimeBinding.setLazyInit(false);
    runtimeBinding.setAbstract(false);
    runtimeBinding.setAutowireCandidate(true);
    constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, queueName);
    constructorArgumentValues.addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
    constructorArgumentValues.addIndexedArgumentValue(2, "staticCache");
    constructorArgumentValues.addIndexedArgumentValue(3, "");
    runtimeBinding.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("bajnding", runtimeBinding);


    GenericBeanDefinition runtimeMessageListenerAdapter = new GenericBeanDefinition();
    runtimeMessageListenerAdapter.setBeanClass(MessageListenerAdapter.class);
    runtimeMessageListenerAdapter.setLazyInit(false);
    runtimeMessageListenerAdapter.setAbstract(false);
    runtimeMessageListenerAdapter.setAutowireCandidate(true);
    constructorArgumentValues = new ConstructorArgumentValues();
    constructorArgumentValues.addIndexedArgumentValue(0, this);
    constructorArgumentValues.addIndexedArgumentValue(1, new RuntimeBeanReference("jackson2JsonMessageConverter"));
    runtimeMessageListenerAdapter.setConstructorArgumentValues(constructorArgumentValues);
    this.context.registerBeanDefinition("mla2", runtimeMessageListenerAdapter);



    GenericBeanDefinition runtimeContainerExchange = new GenericBeanDefinition();
    runtimeContainerExchange.setBeanClass(SimpleMessageListenerContainer.class);
    runtimeContainerExchange.setLazyInit(false);
    runtimeContainerExchange.setAbstract(false);
    runtimeContainerExchange.setAutowireCandidate(true);
    MutablePropertyValues propertyValues = new MutablePropertyValues();
    propertyValues.addPropertyValue("connectionFactory", new RuntimeBeanReference("connectionFactory"));
    propertyValues.addPropertyValue("queues", new RuntimeBeanReference("nejm"));
    propertyValues.addPropertyValue("messageListener", new RuntimeBeanReference("mla2"));
    runtimeContainerExchange.setPropertyValues(propertyValues);
    this.context.registerBeanDefinition("defqueue", runtimeContainerExchange);

问题是 queue/exchange 没有在 运行 时创建,我必须手动启动侦听器(除非我调用 this.context.start() - 但我没有不知道这是否是正确的做法)。

我的问题 - 是否有某种方法可以在 运行 时间内神奇地启动所有生成的 bean(类似于 this.context.refresh() - 这存在但不起作用或类似)?

更新:

我目前就是这样做的(这种方法有效,但不知道是否正确)

    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 600000L);
    Queue queue = new Queue(queueName, true, false, false, arguments);

    FanoutExchange exchange = new FanoutExchange("staticCache");

    Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, "staticCache", "", null);

    rabbitAdmin.declareQueue(queue);
    rabbitAdmin.declareExchange(exchange);
    rabbitAdmin.declareBinding(binding);

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(this.connectionFactory);
    container.setQueues(queue);
    container.setMessageListener(new MessageListenerAdapter(this, this.converter));

    container.start();

你不能那样做。 BeanDefinitionthis.context.registerBeanDefinition 用于应用程序上下文生命周期的解析阶段。

如果您的应用程序已经存在,应用程序上下文将不会接受任何 BeanDefinition

是的,您可以在运行时手动向交易所声明 Queue 及其 Binding。而且您甚至可以手动创建 SimpleMessageListenerContainer 并使其工作。

而对你有什么好处,你只需要手动使用他们的 类 来实例化。只需要提供容器环境(例如将 this.applicationContext 注入 listenerContainer 对象)。

对于 Broker 的声明,您必须使用 applicationContext 中的 RabbitAdmin bean。

从另一方面看,没有理由手动启动新的 listenerContainer。现有的可以在运行时随新 Queue 一起提供。