在 Spring-Boot-RabbitMQ 中处理连接
Handling Connections in Spring-Boot-RabbitMQ
您好,我正在开发 Spring-boot-RabbitMQ 版本 1。6.I 在开发应用程序时遇到的问题很少。阅读文档并浏览了其他堆栈溢出问题,但我无法弄清楚一些事情(可能是因为我记性不好)。
如果有人回答我的问题就太好了。
1) 目前我有 4 个生产者和 4-Consumers.Producer 可能会产生数百万条消息或事件,因此对生产者和消费者使用单一连接将阻止消费者消费 messages.So我想的是为生产者和消费者创建单独的连接,这样两者都不会阻塞并提供一些性能 improvement.Am 我对这种方法更正?
2) 我正在使用 CachingConnectionFactory 以便使用 SimpleRabbitListenerContainerFactory 创建连接。在调用该工厂时它是否会 return 我们的新连接?所以如果我们使用 CachingConnectionFactory 我们真的需要为 Producer 和 consumer.Please 编写一个单独的连接工厂吗?在下面找到我的
1)配置class
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
2)制作人Class
@Configuration
public class TaskProducerConfiguration extends RabbitMqConfiguration {
@Value("${queue1}")
public String queue1;
@Value("${queue2}")
public String queue2;
@Value("${queue3}")
public String queue1;
@Value("${queue4}")
public String queue2;
@Value("${spring.rabbit.exchange}")
public String exchange;
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Primary
@Bean
public RabbitTemplate getQueue1Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue1);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue2Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue2);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue3Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue3);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue4Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue4);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean(name="queue1Bean")
public Queue queue1()
{
return new Queue(this.queue1);
}
@Bean(name="queue2Bean")
public Queue queue2()
{
return new Queue(this.queue2);
}
@Bean(name="queue3Bean")
public Queue queue3()
{
return new Queue(this.queue3);
}
@Bean(name="queue4Bean")
public Queue queue4()
{
return new Queue(this.queue4);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(exchange);
}
@Bean
List<Binding> bindings(Queue queue1Bean,Queue queue2Bean,Queue queue3Bean,Queue queue4Bean, TopicExchange exchange) {
List<Binding> bindingList = new ArrayList<Binding>();
bindingList.add(BindingBuilder.bind(queue1Bean).to(exchange).with(this.queue1));
bindingList.add(BindingBuilder.bind(queue2Bean).to(exchange).with(this.queue2));
bindingList.add(BindingBuilder.bind(queue3Bean).to(exchange).with(this.queue3));
bindingList.add(BindingBuilder.bind(queue4Bean).to(exchange).with(this.queue4));
return bindingList;
}
}
3) Receiver Class(Just Shared one receiver class receiver of the 3-receiver classes are a and the same except queue name & routing键).
@Component
public class Queue1Receiver {
@Autowired
private TaskProducer taskProducer;
@Value("${queue1}")
public String queue1;
@RabbitListener(id="queue1",containerFactory="rabbitListenerContainerFactory",queues = "#{queue1Bean}")
public void handleQueue1Message(TaskMessage taskMessage,@Header(AmqpHeaders.CONSUMER_QUEUE) String queue)
{
System.out.println("Queue::"+queue);
System.out.println("CustomerId: " + taskMessage.getCustomerID());
if(taskMessage.isHasQueue2()){
taskProducer.sendQueue2Message(taskMessage);
}
if(taskMessage.isHasQueue3()){
taskProducer.sendQueue3Message(taskMessage);
}
if(taskMessage.isHasQueue4()){
taskProducer.sendQueue4Message(taskMessage);
}
}
@Bean
public Queue queue1Bean() {
// This queue has the following properties:
// name: my_durable,durable: true,exclusive: false,auto_delete: false
return new Queue(queue1, true, false, false);
}
}
你的帮助应该是可观的。
注意:反对投票者请在反对投票前注册您的评论,以便以后我可以避免错误。
根据 Gary Russell 的评论编辑:
1)RabbitMq配置
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setCacheMode(CacheMode.CONNECTION);
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
using a single connection for both producer & consumer will block consumer to consume the messages`
是什么让您相信这一点?单个连接通常会很好。如果您真的想要单独的连接,请将连接工厂 cacheMode
更改为 CONNECTION
。
您可以在同一情况下使用连接池,保持适当的池大小可能会解决上述答案中建议的 problem.As 生产者和消费者都使用相同的连接,因此池可能会帮助您解决问题。
您好,我正在开发 Spring-boot-RabbitMQ 版本 1。6.I 在开发应用程序时遇到的问题很少。阅读文档并浏览了其他堆栈溢出问题,但我无法弄清楚一些事情(可能是因为我记性不好)。 如果有人回答我的问题就太好了。
1) 目前我有 4 个生产者和 4-Consumers.Producer 可能会产生数百万条消息或事件,因此对生产者和消费者使用单一连接将阻止消费者消费 messages.So我想的是为生产者和消费者创建单独的连接,这样两者都不会阻塞并提供一些性能 improvement.Am 我对这种方法更正?
2) 我正在使用 CachingConnectionFactory 以便使用 SimpleRabbitListenerContainerFactory 创建连接。在调用该工厂时它是否会 return 我们的新连接?所以如果我们使用 CachingConnectionFactory 我们真的需要为 Producer 和 consumer.Please 编写一个单独的连接工厂吗?在下面找到我的
1)配置class
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
2)制作人Class
@Configuration
public class TaskProducerConfiguration extends RabbitMqConfiguration {
@Value("${queue1}")
public String queue1;
@Value("${queue2}")
public String queue2;
@Value("${queue3}")
public String queue1;
@Value("${queue4}")
public String queue2;
@Value("${spring.rabbit.exchange}")
public String exchange;
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Primary
@Bean
public RabbitTemplate getQueue1Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue1);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue2Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue2);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue3Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue3);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public RabbitTemplate getQueue4Template()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setRoutingKey(this.queue4);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean(name="queue1Bean")
public Queue queue1()
{
return new Queue(this.queue1);
}
@Bean(name="queue2Bean")
public Queue queue2()
{
return new Queue(this.queue2);
}
@Bean(name="queue3Bean")
public Queue queue3()
{
return new Queue(this.queue3);
}
@Bean(name="queue4Bean")
public Queue queue4()
{
return new Queue(this.queue4);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(exchange);
}
@Bean
List<Binding> bindings(Queue queue1Bean,Queue queue2Bean,Queue queue3Bean,Queue queue4Bean, TopicExchange exchange) {
List<Binding> bindingList = new ArrayList<Binding>();
bindingList.add(BindingBuilder.bind(queue1Bean).to(exchange).with(this.queue1));
bindingList.add(BindingBuilder.bind(queue2Bean).to(exchange).with(this.queue2));
bindingList.add(BindingBuilder.bind(queue3Bean).to(exchange).with(this.queue3));
bindingList.add(BindingBuilder.bind(queue4Bean).to(exchange).with(this.queue4));
return bindingList;
}
}
3) Receiver Class(Just Shared one receiver class receiver of the 3-receiver classes are a and the same except queue name & routing键).
@Component
public class Queue1Receiver {
@Autowired
private TaskProducer taskProducer;
@Value("${queue1}")
public String queue1;
@RabbitListener(id="queue1",containerFactory="rabbitListenerContainerFactory",queues = "#{queue1Bean}")
public void handleQueue1Message(TaskMessage taskMessage,@Header(AmqpHeaders.CONSUMER_QUEUE) String queue)
{
System.out.println("Queue::"+queue);
System.out.println("CustomerId: " + taskMessage.getCustomerID());
if(taskMessage.isHasQueue2()){
taskProducer.sendQueue2Message(taskMessage);
}
if(taskMessage.isHasQueue3()){
taskProducer.sendQueue3Message(taskMessage);
}
if(taskMessage.isHasQueue4()){
taskProducer.sendQueue4Message(taskMessage);
}
}
@Bean
public Queue queue1Bean() {
// This queue has the following properties:
// name: my_durable,durable: true,exclusive: false,auto_delete: false
return new Queue(queue1, true, false, false);
}
}
你的帮助应该是可观的。
注意:反对投票者请在反对投票前注册您的评论,以便以后我可以避免错误。
根据 Gary Russell 的评论编辑: 1)RabbitMq配置
@Configuration
@EnableRabbit
public class RabbitMqConfiguration{
@Value("${concurrent.consumers}")
public int concurrent_consumers;
@Value("${max.concurrent.consumers}")
public int max_concurrent_consumers;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(concurrent_consumers);
factory.setMaxConcurrentConsumers(max_concurrent_consumers);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setCacheMode(CacheMode.CONNECTION);
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
using a single connection for both producer & consumer will block consumer to consume the messages`
是什么让您相信这一点?单个连接通常会很好。如果您真的想要单独的连接,请将连接工厂 cacheMode
更改为 CONNECTION
。
您可以在同一情况下使用连接池,保持适当的池大小可能会解决上述答案中建议的 problem.As 生产者和消费者都使用相同的连接,因此池可能会帮助您解决问题。