动态队列和侦听器,未发送消息?
Dynamic queues and listeners, messages not being sent?
兔子配置:
package com.rabbitMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
import java.net.URISyntaxException;
@EnableRabbit
@Configuration
public class RabbitMqConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
应用程序概述:每当 gitRepository 连接到我们的应用程序时,存储库名称成为交换名称,在这种情况下 ForceCI
,然后该存储库的每个分支都会创建自己的队列,这里有两个队列 develop
和 master
。现在,每次在 develop 分支中创建拉取请求时,我都需要将信息传递给 develop 队列,并且应该由特定的监听器监听,该监听器应该仅为 develop 注册。我看到了动态队列的例子,但我似乎找不到任何关于如何创建动态监听器的例子,这些监听器将在不同的线程中执行,我该如何实现呢?
此外,我正在尝试将一些消息作为测试发送到队列,但我无法在控制台中看到它们。 (下面的代码)
@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
request) throws URISyntaxException {
rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));
}
@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);
System.out.println("develop -> "+develop);
if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
for (String stringPropertyName : develop.stringPropertyNames()) {
String property = develop.getProperty(stringPropertyName);
System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
}
} else {
Queue queue = new Queue(branchName, true);
String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
System.out.println(develop1);
}
}
@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");
}
@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");
}
更新
缺少绑定,当我在代码中按上面所示进行绑定时,消息开始进入,但我仍然无法弄清楚如何在不同的侦听器中侦听这些消息并在不同的线程中处理它们?
最简单的方法是使用 DirectMessageListenerContainer
并根据需要向其添加队列。但是,您不会为每个队列获得一个新线程;对于直接容器,侦听器是在 amqp-client
线程池中的线程上调用的。
direct container加队列效率高;如果需要,您可以从零队列开始。有关详细信息,请参阅 Choosing a container。
如果每个队列都必须有一个新线程,则必须为每个队列手动创建(和管理)一个 SimpleMessageListenerContainer
。
兔子配置:
package com.rabbitMQ;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.URI;
import java.net.URISyntaxException;
@EnableRabbit
@Configuration
public class RabbitMqConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
应用程序概述:每当 gitRepository 连接到我们的应用程序时,存储库名称成为交换名称,在这种情况下 ForceCI
,然后该存储库的每个分支都会创建自己的队列,这里有两个队列 develop
和 master
。现在,每次在 develop 分支中创建拉取请求时,我都需要将信息传递给 develop 队列,并且应该由特定的监听器监听,该监听器应该仅为 develop 注册。我看到了动态队列的例子,但我似乎找不到任何关于如何创建动态监听器的例子,这些监听器将在不同的线程中执行,我该如何实现呢?
此外,我正在尝试将一些消息作为测试发送到队列,但我无法在控制台中看到它们。 (下面的代码)
@RequestMapping(value = "/createExchange", method = RequestMethod.GET)
public void createExchange(ServletResponse response, ServletRequest
request) throws URISyntaxException {
rabbitMqConfig.amqpAdmin().declareExchange(new DirectExchange("ForceCI"));
}
@RequestMapping(value = "/createDynamicQueues", method = RequestMethod.GET)
public void createDynamicQueues(@RequestParam String branchName, ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties(branchName);
System.out.println("develop -> "+develop);
if(develop != null && develop.stringPropertyNames() != null && !develop.stringPropertyNames().isEmpty()) {
for (String stringPropertyName : develop.stringPropertyNames()) {
String property = develop.getProperty(stringPropertyName);
System.out.println("property Value -> " + property + " ---- " + "property key -> " + stringPropertyName);
}
} else {
Queue queue = new Queue(branchName, true);
String develop1 = rabbitMqConfig.amqpAdmin().declareQueue(new Queue(branchName, true));
rabbitMqConfig.amqpAdmin().declareBinding(BindingBuilder.bind(queue).to(new DirectExchange("ForceCI")).withQueueName());
System.out.println(develop1);
}
}
@RequestMapping(value = "/sendMessageToQueuesDevelop", method = RequestMethod.GET)
public void sendMessageToQueuesDevelop(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("develop");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage");
}
@RequestMapping(value = "/sendMessageToQueuesMaster", method = RequestMethod.GET)
public void sendMessageToQueuesMaster(ServletResponse response, ServletRequest
request) throws URISyntaxException {
Properties develop = rabbitMqConfig.amqpAdmin().getQueueProperties("master");
String queue_name = develop.getProperty("QUEUE_NAME");
rabbitTemplate.convertAndSend("ForceCI", queue_name, "TestMessage1");
}
更新
缺少绑定,当我在代码中按上面所示进行绑定时,消息开始进入,但我仍然无法弄清楚如何在不同的侦听器中侦听这些消息并在不同的线程中处理它们?
最简单的方法是使用 DirectMessageListenerContainer
并根据需要向其添加队列。但是,您不会为每个队列获得一个新线程;对于直接容器,侦听器是在 amqp-client
线程池中的线程上调用的。
direct container加队列效率高;如果需要,您可以从零队列开始。有关详细信息,请参阅 Choosing a container。
如果每个队列都必须有一个新线程,则必须为每个队列手动创建(和管理)一个 SimpleMessageListenerContainer
。