如何在 Spring 引导中从 ActiveMQ 队列中读取未决消息

How to read pending messages from an ActiveMQ queue in Spring Boot

我喜欢使用 Spring 引导读取 ActiveMQ 队列中的未决(未确认)消息。怎么做?

到目前为止,我可以在消息发送到队列时读取消息:

@JmsListener(destination = "LOCAL.TEST", 
  containerFactory = "myJmsListenerContainerFactory")
public void receiveMessage(final Message jsonMessage) throws JMSException {
    String messageData = null;
    // jsonMessage.acknowledge(); // dont consume message (for testing)
    LOGGER.info("=== Received message {}", jsonMessage);
}

为 mq 连接使用标准配置:

@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(BROKER_URL + ":" + BROKER_PORT);
    return activeMQConnectionFactory;
}

和一个标准的 ListenerContainerFactory:

@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
  DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
  factory.setConnectionFactory(getActiveMQConnectionFactory());
  factory.setConcurrency("1-1");
  return factory;
}

但是如果我使用

手动发送一条消息,这只会记录一条消息
@Autowired
private JmsTemplate jmsTemplate;

public void send(String destination, String message) {
    LOGGER.info("sending message='{}' to destination='{}'", message, destination);
    jmsTemplate.convertAndSend(destination, message);
}

使用标准模板

@Bean
public JmsTemplate jmsTemplate() {
  JmsTemplate template = new JmsTemplate();
  template.setConnectionFactory(getActiveMQConnectionFactory());
  return template;
}

我无法阅读之前发送的仍在队列中的消息(因为我没有 .acknowledge() 它们)...

未确认的消息不会被重新发送。在会话关闭或连接丢失之前,它们不会返回到队列中,例如通过停止(并重新启动)工厂创建的侦听器容器。

您可以使用 JmsListenerEndpointRegistry bean(或 stop/start 整个注册表,它将 stop/start 它的所有容器)访问容器。

JMS supports "browsing" messages 这似乎是您想要的功能。因此,您应该更改 Spring 应用程序以使用 QueueBrowser 而不是实际使用消息。

要阅读所有待处理的消息,您可以这样做

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
    Connection connection = connectionFactory.createConnection("admin", "admin");
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("listenerQueue");
    MessageConsumer consumer = session.createConsumer(destination);

    QueueBrowser browser = session.createBrowser((Queue) destination);
    Enumeration elems = browser.getEnumeration();
    while (elems.hasMoreElements()) {
        Message message = (Message) consumer.receive();

        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Incoming Message: '" + textMessage.getText() + "'");
            message.acknowledge();
        }
    }
    connection.close();

逐步执行 Spring 引导 ActiveMQ。让我们写一些代码让它更清楚。这将有助于仅读取当前会话中的所有待处理消息。

  1. 在 pom.xml 文件中添加这些依赖项。
<!-- Dependencies to setup JMS and active mq environment -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>
  1. 将@EnableJms 添加到您的 main() 方法所在的主控制器中。
  2. 仅通过在应用程序控制器中添加这 2 个方法来创建连接工厂。
@Bean
    public JmsListenerContainerFactory<?> myFactory(
        ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
      DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
      logger.info("configuring jms connection factory....");
      // anonymous class
      factory.setErrorHandler(
              new ErrorHandler() {
                  @Override
                  public void handleError(Throwable t) {
                      logger.error("An error has occurred in the transaction", t);
                  }
              });
      // lambda function
      factory.setErrorHandler(t -> logger.info("An error has occurred in the transaction"));
      configurer.configure(factory, connectionFactory);

      return factory;
    }

    // Serialize message content to json using TextMessage
    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
      MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
      converter.setTargetType(MessageType.TEXT);
      converter.setTypeIdPropertyName("_type");
      return converter;
    } 
  1. 在 application.yml 文件中将凭据提及为
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1

  1. 在任何 spring bean class.
  2. 中自动装配 jmsTemplate
@Autowired
private JmsTemplate jmsTemplate;
  1. 现在是时候将消息发送到队列了。
jmsTemplate.convertAndSend("anyQueueName", "value1");
jmsTemplate.convertAndSend("anyQueueName", "value2");
...
  1. 添加一个 jmslistener。当任何消息被推送到队列时,JMS 将自动调用此方法。
@JmsListener(destination ="anyQueueName", containerFactory = "myFactory")
    public void receiveMessage(String user) {
        System.out.println("Received <" + user + ">");
    }
  1. 您可以手动阅读队列中可用的消息:-
import javax.jms.TextMessage;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

public void readMessageFromQueue(){
jmsTemplate.browse("anyQueueName", new BrowserCallback<TextMessage>() {
            @Override
            public TextMessage doInJms(Session session, QueueBrowser browser) throws JMSException {
                Enumeration<TextMessage> messages = browser.getEnumeration();
                while (messages.hasMoreElements()) {
                    System.out.println("message found : -"+ messages.nextElement().getText());
                }
            }
        });
}

输出:-
找到消息:- value1

找到消息:- value2

-快乐编码