Spring & RabbitMQ - 在运行时注册队列
Spring & RabbitMQ - register queue at runtime
如何在 运行 时间内创建绑定到扇出交换的新队列并 运行 它?到目前为止我有这个:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);
GenericBeanDefinition runtimeQueueBean = new GenericBeanDefinition();
runtimeQueueBean.setBeanClass(Queue.class);
runtimeQueueBean.setLazyInit(false);
runtimeQueueBean.setAbstract(false);
runtimeQueueBean.setAutowireCandidate(true);
ConstructorArgumentValues queueConstrArgs = new ConstructorArgumentValues();
queueConstrArgs.addIndexedArgumentValue(0, queueName);
queueConstrArgs.addIndexedArgumentValue(1, true);
queueConstrArgs.addIndexedArgumentValue(2, false);
queueConstrArgs.addIndexedArgumentValue(3, false);
queueConstrArgs.addIndexedArgumentValue(4, arguments);
runtimeQueueBean.setConstructorArgumentValues(queueConstrArgs);
this.context.registerBeanDefinition("nejm", runtimeQueueBean);
GenericBeanDefinition runtimeFanoutExchange = new GenericBeanDefinition();
runtimeFanoutExchange.setBeanClass(FanoutExchange.class);
runtimeFanoutExchange.setLazyInit(false);
runtimeFanoutExchange.setAbstract(false);
runtimeFanoutExchange.setAutowireCandidate(true);
ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, "staticCache");
runtimeFanoutExchange.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("staticCache", runtimeFanoutExchange);
GenericBeanDefinition runtimeBinding = new GenericBeanDefinition();
runtimeBinding.setBeanClass(Binding.class);
runtimeBinding.setLazyInit(false);
runtimeBinding.setAbstract(false);
runtimeBinding.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, queueName);
constructorArgumentValues.addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
constructorArgumentValues.addIndexedArgumentValue(2, "staticCache");
constructorArgumentValues.addIndexedArgumentValue(3, "");
runtimeBinding.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("bajnding", runtimeBinding);
GenericBeanDefinition runtimeMessageListenerAdapter = new GenericBeanDefinition();
runtimeMessageListenerAdapter.setBeanClass(MessageListenerAdapter.class);
runtimeMessageListenerAdapter.setLazyInit(false);
runtimeMessageListenerAdapter.setAbstract(false);
runtimeMessageListenerAdapter.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, this);
constructorArgumentValues.addIndexedArgumentValue(1, new RuntimeBeanReference("jackson2JsonMessageConverter"));
runtimeMessageListenerAdapter.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("mla2", runtimeMessageListenerAdapter);
GenericBeanDefinition runtimeContainerExchange = new GenericBeanDefinition();
runtimeContainerExchange.setBeanClass(SimpleMessageListenerContainer.class);
runtimeContainerExchange.setLazyInit(false);
runtimeContainerExchange.setAbstract(false);
runtimeContainerExchange.setAutowireCandidate(true);
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("connectionFactory", new RuntimeBeanReference("connectionFactory"));
propertyValues.addPropertyValue("queues", new RuntimeBeanReference("nejm"));
propertyValues.addPropertyValue("messageListener", new RuntimeBeanReference("mla2"));
runtimeContainerExchange.setPropertyValues(propertyValues);
this.context.registerBeanDefinition("defqueue", runtimeContainerExchange);
问题是 queue/exchange 没有在 运行 时创建,我必须手动启动侦听器(除非我调用 this.context.start() - 但我没有不知道这是否是正确的做法)。
我的问题 - 是否有某种方法可以在 运行 时间内神奇地启动所有生成的 bean(类似于 this.context.refresh() - 这存在但不起作用或类似)?
更新:
我目前就是这样做的(这种方法有效,但不知道是否正确)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);
Queue queue = new Queue(queueName, true, false, false, arguments);
FanoutExchange exchange = new FanoutExchange("staticCache");
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, "staticCache", "", null);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(binding);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.connectionFactory);
container.setQueues(queue);
container.setMessageListener(new MessageListenerAdapter(this, this.converter));
container.start();
你不能那样做。 BeanDefinition
和 this.context.registerBeanDefinition
用于应用程序上下文生命周期的解析阶段。
如果您的应用程序已经存在,应用程序上下文将不会接受任何 BeanDefinition
。
是的,您可以在运行时手动向交易所声明 Queue
及其 Binding
。而且您甚至可以手动创建 SimpleMessageListenerContainer
并使其工作。
而对你有什么好处,你只需要手动使用他们的 类 来实例化。只需要提供容器环境(例如将 this.applicationContext
注入 listenerContainer
对象)。
对于 Broker 的声明,您必须使用 applicationContext
中的 RabbitAdmin
bean。
从另一方面看,没有理由手动启动新的 listenerContainer
。现有的可以在运行时随新 Queue
一起提供。
如何在 运行 时间内创建绑定到扇出交换的新队列并 运行 它?到目前为止我有这个:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);
GenericBeanDefinition runtimeQueueBean = new GenericBeanDefinition();
runtimeQueueBean.setBeanClass(Queue.class);
runtimeQueueBean.setLazyInit(false);
runtimeQueueBean.setAbstract(false);
runtimeQueueBean.setAutowireCandidate(true);
ConstructorArgumentValues queueConstrArgs = new ConstructorArgumentValues();
queueConstrArgs.addIndexedArgumentValue(0, queueName);
queueConstrArgs.addIndexedArgumentValue(1, true);
queueConstrArgs.addIndexedArgumentValue(2, false);
queueConstrArgs.addIndexedArgumentValue(3, false);
queueConstrArgs.addIndexedArgumentValue(4, arguments);
runtimeQueueBean.setConstructorArgumentValues(queueConstrArgs);
this.context.registerBeanDefinition("nejm", runtimeQueueBean);
GenericBeanDefinition runtimeFanoutExchange = new GenericBeanDefinition();
runtimeFanoutExchange.setBeanClass(FanoutExchange.class);
runtimeFanoutExchange.setLazyInit(false);
runtimeFanoutExchange.setAbstract(false);
runtimeFanoutExchange.setAutowireCandidate(true);
ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, "staticCache");
runtimeFanoutExchange.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("staticCache", runtimeFanoutExchange);
GenericBeanDefinition runtimeBinding = new GenericBeanDefinition();
runtimeBinding.setBeanClass(Binding.class);
runtimeBinding.setLazyInit(false);
runtimeBinding.setAbstract(false);
runtimeBinding.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, queueName);
constructorArgumentValues.addIndexedArgumentValue(1, Binding.DestinationType.QUEUE);
constructorArgumentValues.addIndexedArgumentValue(2, "staticCache");
constructorArgumentValues.addIndexedArgumentValue(3, "");
runtimeBinding.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("bajnding", runtimeBinding);
GenericBeanDefinition runtimeMessageListenerAdapter = new GenericBeanDefinition();
runtimeMessageListenerAdapter.setBeanClass(MessageListenerAdapter.class);
runtimeMessageListenerAdapter.setLazyInit(false);
runtimeMessageListenerAdapter.setAbstract(false);
runtimeMessageListenerAdapter.setAutowireCandidate(true);
constructorArgumentValues = new ConstructorArgumentValues();
constructorArgumentValues.addIndexedArgumentValue(0, this);
constructorArgumentValues.addIndexedArgumentValue(1, new RuntimeBeanReference("jackson2JsonMessageConverter"));
runtimeMessageListenerAdapter.setConstructorArgumentValues(constructorArgumentValues);
this.context.registerBeanDefinition("mla2", runtimeMessageListenerAdapter);
GenericBeanDefinition runtimeContainerExchange = new GenericBeanDefinition();
runtimeContainerExchange.setBeanClass(SimpleMessageListenerContainer.class);
runtimeContainerExchange.setLazyInit(false);
runtimeContainerExchange.setAbstract(false);
runtimeContainerExchange.setAutowireCandidate(true);
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("connectionFactory", new RuntimeBeanReference("connectionFactory"));
propertyValues.addPropertyValue("queues", new RuntimeBeanReference("nejm"));
propertyValues.addPropertyValue("messageListener", new RuntimeBeanReference("mla2"));
runtimeContainerExchange.setPropertyValues(propertyValues);
this.context.registerBeanDefinition("defqueue", runtimeContainerExchange);
问题是 queue/exchange 没有在 运行 时创建,我必须手动启动侦听器(除非我调用 this.context.start() - 但我没有不知道这是否是正确的做法)。
我的问题 - 是否有某种方法可以在 运行 时间内神奇地启动所有生成的 bean(类似于 this.context.refresh() - 这存在但不起作用或类似)?
更新:
我目前就是这样做的(这种方法有效,但不知道是否正确)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 600000L);
Queue queue = new Queue(queueName, true, false, false, arguments);
FanoutExchange exchange = new FanoutExchange("staticCache");
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, "staticCache", "", null);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(binding);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.connectionFactory);
container.setQueues(queue);
container.setMessageListener(new MessageListenerAdapter(this, this.converter));
container.start();
你不能那样做。 BeanDefinition
和 this.context.registerBeanDefinition
用于应用程序上下文生命周期的解析阶段。
如果您的应用程序已经存在,应用程序上下文将不会接受任何 BeanDefinition
。
是的,您可以在运行时手动向交易所声明 Queue
及其 Binding
。而且您甚至可以手动创建 SimpleMessageListenerContainer
并使其工作。
而对你有什么好处,你只需要手动使用他们的 类 来实例化。只需要提供容器环境(例如将 this.applicationContext
注入 listenerContainer
对象)。
对于 Broker 的声明,您必须使用 applicationContext
中的 RabbitAdmin
bean。
从另一方面看,没有理由手动启动新的 listenerContainer
。现有的可以在运行时随新 Queue
一起提供。