如何使用 autowired Spring Boot 监听多个队列?
How to listen to multiple queues with autowired Spring Boot?
我是 Spring 引导的新手,正在试用它。目前我已经构建了一些我希望能够通过队列相互通信的应用程序。
我目前有一个 Listener 对象,可以从特定队列接收消息。
@Configuration
public class Listener {
final static String queueName = "myqueue";
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
这行得通。但是,现在我希望能够收听另一个队列。所以我想我会复制上面的对象并更改队列名称。不幸的是,这没有用,因为 Spring boot 只为其中之一创建连接。
关于如何让我的 Spring 引导应用程序监听多个队列的任何想法?
好的,我想出了如何让它监听多个队列。认为与我的其他解决方案相比可能存在一些缺点,主要是如果列出的队列不存在,它就不起作用。
我最终使用了一种完全不同的方法,使用 @RabbitListener
@Component
public class EventListener {
private static Logger LOG = LoggerFactory.getLogger(EventListener.class);
private CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "myqueue")
public void processPaymentMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myqueue: " + content);
latch.countDown();
}
@RabbitListener(queues = "myotherqueue")
public void processOrderMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myotherqueue: " + content);
latch.countDown();
}
}
对 byte[] 的整个检查都在那里,因为从命令行发送的消息看起来像这样。否则它是 org.springframework.amqp.core.Message.
你可以试试这个
在application.properties
rabbitmq.queue.names= com.queue1,com.queue2
在 Java 文件中
@RabbitListener(queues = "#{'${rabbitmq.queue.names}'.split(',')}")
public void receiveMessage(Message message) {
try {
if (processmessage(message));
}
} catch (Exception ex) {
LOGGER.error("Exception while processing the Message", ex);
}
}
以下是 groovy 对我有用的方法:
@Component
@EnableRabbit
@Slf4j
class StatusListener {
Library library
int messageCounter
@Autowired
StatusListener(Library library) {
this.library = library
}
@RabbitListener(queues = '#{library.allStatusQueues.split(",")}')
void receiveMessage(Message message) {
messageCounter++
log.info("Rabbit Listener received message <" + new String(message.body) + "> (" + messageCounter + ")")
}
}
其中 Library 是一个配置 bean:
@Component
@ConfigurationProperties
@RefreshScope
class Library {
String allStatusQueues
}
属性 本身,在 application.properties 或类似的配置文件中看起来像:
all-status-queues=queue1,queue2,queue3,queue4
我是 Spring 引导的新手,正在试用它。目前我已经构建了一些我希望能够通过队列相互通信的应用程序。 我目前有一个 Listener 对象,可以从特定队列接收消息。
@Configuration
public class Listener {
final static String queueName = "myqueue";
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
这行得通。但是,现在我希望能够收听另一个队列。所以我想我会复制上面的对象并更改队列名称。不幸的是,这没有用,因为 Spring boot 只为其中之一创建连接。 关于如何让我的 Spring 引导应用程序监听多个队列的任何想法?
好的,我想出了如何让它监听多个队列。认为与我的其他解决方案相比可能存在一些缺点,主要是如果列出的队列不存在,它就不起作用。 我最终使用了一种完全不同的方法,使用 @RabbitListener
@Component
public class EventListener {
private static Logger LOG = LoggerFactory.getLogger(EventListener.class);
private CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "myqueue")
public void processPaymentMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myqueue: " + content);
latch.countDown();
}
@RabbitListener(queues = "myotherqueue")
public void processOrderMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myotherqueue: " + content);
latch.countDown();
}
}
对 byte[] 的整个检查都在那里,因为从命令行发送的消息看起来像这样。否则它是 org.springframework.amqp.core.Message.
你可以试试这个
在application.properties
rabbitmq.queue.names= com.queue1,com.queue2
在 Java 文件中
@RabbitListener(queues = "#{'${rabbitmq.queue.names}'.split(',')}")
public void receiveMessage(Message message) {
try {
if (processmessage(message));
}
} catch (Exception ex) {
LOGGER.error("Exception while processing the Message", ex);
}
}
以下是 groovy 对我有用的方法:
@Component
@EnableRabbit
@Slf4j
class StatusListener {
Library library
int messageCounter
@Autowired
StatusListener(Library library) {
this.library = library
}
@RabbitListener(queues = '#{library.allStatusQueues.split(",")}')
void receiveMessage(Message message) {
messageCounter++
log.info("Rabbit Listener received message <" + new String(message.body) + "> (" + messageCounter + ")")
}
}
其中 Library 是一个配置 bean:
@Component
@ConfigurationProperties
@RefreshScope
class Library {
String allStatusQueues
}
属性 本身,在 application.properties 或类似的配置文件中看起来像:
all-status-queues=queue1,queue2,queue3,queue4