在 SpringBoot 中,@JmsListener 如何以及何时被调用?

In SpringBoot how & when does @JmsListener gets called?

我是 SpringBoot 的新手。尝试构建一个简单的非 Web 进程,我在其中监听 MQ 队列并处理收到的消息。我尝试了各种方法来在 SB 中实现这一点,但不幸的是我无法调用 @JmsListener 方法。也没有错误,进程只是等待。

所有 MQ 队列详细信息都在 application.properties

我确实验证了队列中有消息,我可以使用旧的 MQ 接收器方式检索它们。

我想知道@JmsListener Annotation 方法是如何以及何时被调用的?我确实尝试创建一个 JmsListenerContainerFactory 并将其包含在注释参数中,但没有任何区别。

很少有类似的例子,它看起来很简单,但我就是无法让它工作。任何建议表示赞赏。谢谢

主SpringBootClass

@SpringBootApplication
@EnableJms
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

MQListener Class

@Component
public class MQListener {
    @JmsListener(destination = "${mq.queueName}")
    public void receiveMessage(final Message message) throws JMSException{
        System.out.println("...Message Received...");
        String messageData = null;
        if(message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage)message;
            messageData = textMessage.getText();
        }
    }
}

MQ配置Class

@Configuration
public class MQConfiguration {
    @Value("${mq.host}")
    private String host;
    @Value("${mq.port}")
    private Integer port;
    @Value("${mq.queue-manager}")
    private String queueManager;
    @Value("${mq.channel}")
    private String channel;
    @Value("{mq.queueName}")
    private String queueName;
    @Value("${mq.receive-timeout}")
    private long receiveTimeout;
    
    @Bean
    public MQQueueConnectionFactory mqQueueConnectionFactory() {
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(host);
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setChannel(channel);
            mqQueueConnectionFactory.setPort(port);
            mqQueueConnectionFactory.setQueueManager(queueManager);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
}

当您使用 MQ 连接工厂的默认设置时,您实际上并不需要它。相反,您可以使用 Spring Boot 将为您创建的默认值。您也只需要一条短信,这样您就可以让 Spring 进行编组。在这种情况下,您所需要的只是从该示例派生的消息使用者 - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/java/com/ibm/mq/samples/jms/spring/level101

package ...

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer101 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${mq.queueName}")
    public void receive(String message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message is: " + message);
    }
}

您将让 Spring 引导创建 MQ 容器,您需要在 application.properties 中以 -

的形式提供设置

# MQ Connection settings
ibm.mq.queueManager=QM1  
ibm.mq.channel=DEV.APP.SVRCONN
ibm.mq.connName=localhost(1414)


# Change the following lines as necessary. Set the ibm.mq.user
# property to an empty string to send no authentication request.
ibm.mq.user=app
ibm.mq.password=passw0rd

与自定义连接工厂相比,您更可能需要自定义侦听器,但如果您确实想要配置不同于默认设置的 ConnectionFactory,请使用从该示例派生的配置和消息使用者 - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/java/com/ibm/mq/samples/jms/spring/level114

配置,只需要设置偏离默认的属性即可。

package ...

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.samples.jms.spring.globals.handlers.OurDestinationResolver;
import com.ibm.mq.samples.jms.spring.globals.handlers.OurMessageConverter;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;


import javax.jms.DeliveryMode;
import javax.jms.JMSException;

@Configuration
public class MQConfiguration114 {
    protected final Log logger = LogFactory.getLog(getClass());

    @Bean
    public MQConnectionFactory mqConnectionFactory() throws JMSException {
        MQConfigurationProperties properties = new MQConfigurationProperties();
        // Properties will be a mix of defaults, and those found in application.properties
        // under ibm.mq
        // Here we can override any of the properties should we need to
        MQConnectionFactoryFactory mqcff = new MQConnectionFactoryFactory(properties,null);
        MQConnectionFactory mqcf = mqcff.createConnectionFactory(MQConnectionFactory.class);
        return mqcf;
    }

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory114() throws JMSException {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(mqConnectionFactory());
        factory.setPubSubDomain(false);

        factory.setMessageConverter(new OurMessageConverter());
        factory.setDestinationResolver(new OurDestinationResolver());

        // reply Qos
        QosSettings rQos = new QosSettings();
        rQos.setPriority(2);
        rQos.setTimeToLive(10000);
        rQos.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        factory.setReplyQosSettings(rQos);

        return factory;
    }

    @Bean("myNonJmsTemplate114")
    public JmsTemplate myNonJmsTemplate114() throws JMSException {
        JmsTemplate jmsTemplate = new JmsTemplate(mqConnectionFactory());
        jmsTemplate.setDestinationResolver(new OurDestinationResolver());
        jmsTemplate.setMessageConverter(new OurMessageConverter());

        return jmsTemplate;
    }

注:监听器容器工厂基于自定义的连接工厂。你需要这一步。您的消息使用者看起来像:

package ...

import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer114 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${mq.queueName}", containerFactory = "myContainerFactory114")
    public void receiveRequest(OurData message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message of type: " + message.getClass().getSimpleName());
        logger.info("Received message :" + message);
    }
}

如果您需要从 JMSMessage 对象执行您自己的编组,则使用从该示例派生的消息使用者(您只需要一个使用者,不需要其他)- https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/java/com/ibm/mq/samples/jms/spring/level105

package ...

import javax.jms.*;

import com.ibm.mq.samples.jms.spring.globals.Constants;
import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import com.ibm.mq.samples.jms.spring.globals.data.OurOtherData;
import com.ibm.mq.samples.jms.spring.globals.utils.MessageUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.annotation.JmsListener;

import org.springframework.stereotype.Component;

import java.io.Serializable;


@Component
public class MessageConsumer105 {
    protected final Log logger = LogFactory.getLog(getClass());

    @JmsListener(destination = "${app.l105.queue.name2}")
    public void receiveData(Message message) {
        logger.info("");
        logger.info( this.getClass().getSimpleName());
        logger.info("Received message of type: " + message.getClass().getSimpleName());
        if (null != message) {
            MessageUtils.checkMessageType(message);
        }
    }
}

在哪里

package ...

import com.ibm.mq.samples.jms.spring.globals.Constants;
import com.ibm.mq.samples.jms.spring.globals.data.OurData;
import com.ibm.mq.samples.jms.spring.globals.data.OurOtherData;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.*;
import java.io.Serializable;
import java.util.Map;

public class MessageUtils {
    protected static final Log logger = LogFactory.getLog(MessageUtils.class);

    private MessageUtils () {}

    public static void checkMessageType(Message message) {
        try {
            if (message instanceof TextMessage) {
                logger.info("Message matches TextMessage");
                logger.info("message payload is " + ((TextMessage) message).getText());
            } else if (message instanceof BytesMessage) {
                logger.info("Message matches BytesMessage");
            } else if (message instanceof MapMessage) {
                logger.info("Message matches MapMessage");
            } else if (message instanceof StreamMessage) {
                logger.info("Message matches StreamMessage");
            } else if (message instanceof ObjectMessage) {
                checkForObject((ObjectMessage) message);
            }
        } catch (JMSException e) {
            logger.warn("Unable to process JMS message");
        }
    }

    public static void logHeaders(Map<String, Object> msgHeaders) {
        if (! msgHeaders.isEmpty() ) {
            logger.info("");
            logger.info("Headers found");
            msgHeaders.forEach((k, v) -> {
                logger.info(k + ": is of type" + v.getClass());
            });
        }
    }

    private static void checkForObject(ObjectMessage message) {
        try {
            int typeValue = message.getIntProperty(Constants.DATATYPE);
            if (Constants.DataTypes.OURDATATYPE.getValue() == typeValue) {
                logger.info("It is one of our objects");
                Serializable serObj = message.getObject();
                OurData data = (OurData) serObj;
                logger.info(data);
            } else if (Constants.DataTypes.OUROTHERDATATYPE.getValue() == typeValue) {
                logger.info("It is one of our other objects");
                Serializable serObj = message.getObject();
                OurOtherData data = (OurOtherData) serObj;
                logger.info(data);
            } else {
                logger.warn("It is not one of our objects");
            }
        } catch (JMSException e) {
            logger.warn("Unable to retrieve message data");
        } catch (ClassCastException e2) {
            logger.warn("Not the object we were expecting");
        }
    }

}

如果您的消费者和配置 类 有 @Component@Configuration 注释并且位于同一个包系列中,那么它们将被找到,如果没有那么您需要添加为应用程序添加一些注释,让 Spring 接收它们。例如

package ...

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.jms.annotation.EnableJms;


@SpringBootApplication
@EnableJms
@ComponentScan
@EnableAutoConfiguration
public class MQApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQApplication.class, args);
    }
}

如果你没有

spring.jms.listener.auto-startup=false 

在您的 application.properties 文件中,所有侦听器将在应用程序启动时自动启动。

您需要将 mq-jms-spring-boot-starter 命名为依赖项。例如。如果使用 Maven :

        <dependency>
            <groupId>com.ibm.mq</groupId>
            <artifactId>mq-jms-spring-boot-starter</artifactId>
            <version>2.4.1</version>
        </dependency>

如果这是唯一的 JMS 实现者,那么 Spring Boot 将能够确定您的所有侦听器都在使用 IBM MQ。如果您将其他 JMS 提供程序列为依赖项,则 Spring 需要明确告知要使用哪个连接工厂。在 https://github.com/ibm-messaging/mq-dev-patterns/blob/master/Spring-JMS/pom.xml

处有一个样本 pom.xml,样本中只有所需的依赖项

尝试 https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS 处的 101 样本 它基于 Maven,仅在其 pom.xml 中列出了所需的依赖项,您需要做的就是更新 https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Spring-JMS/src/main/resources 中的 application.properties 以指向您的 MQ 服务器。