动态创建队列并将其绑定到单个 Exchange
Create and bind queues dynamically to a single Exchange
我有一个交换器 (exchange1
),它可以根据路由键表达式路由到 n
个不同的队列。
- 所有图片消息都应该进入 queue1
- 所有文档消息都应转到队列 2
- 所有视频消息都应进入默认队列 3
并且在未来,可以增加队列数量(所有视频和 mp4 扩展名应该转到 queue4
)
我们如何创建队列并将其动态绑定到一个特定的交换器,并且应该只使用一个流监听器?
无法通过属性直接使用 Spring Cloud Stream 完成。
您必须使用所需的路由键声明 Exchange
、Queue
和 Binding
@Bean
,然后将消费者绑定配置为不声明队列和绑定)然后设置 s.c.s.consumer.multiplex=true
和 s.c.s.destination=queue1,queue2,queue3
.
请参阅 Using Existing Queues/Exhanges 了解如何禁用活页夹配置。
在 Spring 引导中使用 Spring-AMQP 插件可以轻松完成。我在下面附上了如何实现它的片段。 (我相信代码比理论词更能说明问题):
package com.savk.workout.spring.rabbitmqconversendreceivefanoutproducer;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DynamicBindingDemo {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
private boolean switchedOn = true;
//Toggle binding programatically
@Scheduled(fixedRate = 10000L)
public void manipulateBinding() {
Exchange exchange = ExchangeBuilder.directExchange("exchange").autoDelete().build();
Queue queue = QueueBuilder.nonDurable("queue").build();
Binding binding = BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();
if(switchedOn) {
rabbitAdmin.declareBinding(binding);
} else {
rabbitAdmin.removeBinding(binding);
}
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(rabbitTemplate.getConnectionFactory());
}
}
确保你也有相应的 RabbitListener :
@RabbitListener(queues = "queuename")
@RabbitHandler
public String handle(String msg) {
System.out.println("RCVD :: " + msg);
String response = "PONG { " + msg + " } from INSTANCE " + INSTANCE;
return response;
}
你看一切都可以动态完成。您只需要使用绑定进行操作即可。
我有一个交换器 (exchange1
),它可以根据路由键表达式路由到 n
个不同的队列。
- 所有图片消息都应该进入 queue1
- 所有文档消息都应转到队列 2
- 所有视频消息都应进入默认队列 3
并且在未来,可以增加队列数量(所有视频和 mp4 扩展名应该转到 queue4
)
我们如何创建队列并将其动态绑定到一个特定的交换器,并且应该只使用一个流监听器?
无法通过属性直接使用 Spring Cloud Stream 完成。
您必须使用所需的路由键声明 Exchange
、Queue
和 Binding
@Bean
,然后将消费者绑定配置为不声明队列和绑定)然后设置 s.c.s.consumer.multiplex=true
和 s.c.s.destination=queue1,queue2,queue3
.
请参阅 Using Existing Queues/Exhanges 了解如何禁用活页夹配置。
在 Spring 引导中使用 Spring-AMQP 插件可以轻松完成。我在下面附上了如何实现它的片段。 (我相信代码比理论词更能说明问题):
package com.savk.workout.spring.rabbitmqconversendreceivefanoutproducer;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class DynamicBindingDemo {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
private boolean switchedOn = true;
//Toggle binding programatically
@Scheduled(fixedRate = 10000L)
public void manipulateBinding() {
Exchange exchange = ExchangeBuilder.directExchange("exchange").autoDelete().build();
Queue queue = QueueBuilder.nonDurable("queue").build();
Binding binding = BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();
if(switchedOn) {
rabbitAdmin.declareBinding(binding);
} else {
rabbitAdmin.removeBinding(binding);
}
}
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(rabbitTemplate.getConnectionFactory());
}
}
确保你也有相应的 RabbitListener :
@RabbitListener(queues = "queuename")
@RabbitHandler
public String handle(String msg) {
System.out.println("RCVD :: " + msg);
String response = "PONG { " + msg + " } from INSTANCE " + INSTANCE;
return response;
}
你看一切都可以动态完成。您只需要使用绑定进行操作即可。