DefaultMessageListenerContainer 不读取来自 IBM MQ 的消息
DefaultMessageListenerContainer Not Reading Messages from IBM MQ
我正在尝试创建一些 Spring 使用 DefaultMessageListenerContainer 来侦听来自 IBM MQ 的消息的启动代码。
我可以创建 MQQueueConnectionFactory 并使用 JmsTemplate 发送和接收消息,但这是为了实现高吞吐量并希望使用侦听器而不是轮询。
我已将大部分代码整合到一个组件中,所以我希望我拥有所有相关的东西。
如果我安排 receiveMessage 方法,它会选择排队的消息,所以我知道 sendMessage 方法正在排队消息。
@Component
class AllInOneTest {
private MessagingConfiguration.QueueConfig config;
private MQQueueConnectionFactory connectionFactory;
private JmsTemplate jmsTemplate;
private DefaultMessageListenerContainer listenerContainer;
private final Logger logger = LoggerFactory.getLogger(getClass());
public AllInOneTest(MessagingManager manager) throws JMSException {
String detailsName = "default";
config = manager.getMessagingDetails(detailsName).getConfig();
logger.debug("AllInOneTest Initializing Connection Factory: {}", detailsName);
connectionFactory = new MQQueueConnectionFactory();
connectionFactory.setHostName(config.getHost());
connectionFactory.setPort(config.getPort());
connectionFactory.setTransportType(config.getTransportType());
connectionFactory.setQueueManager(config.getQueueManager());
connectionFactory.setChannel(config.getChannel());
logger.debug("AllInOneTest Initializing Message Listener: {}", detailsName);
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
defaultListener.setConnectionFactory(connectionFactory);
defaultListener.setExceptionListener((ee) -> {
logger.warn(String.format("AllInOneTest Message Listener Error: %s", detailsName), ee);
});
defaultListener.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created Listener Destination: {}", ret);
return ret;
});
defaultListener.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
logger.info("AllInOneTest Listening For Message: {}", message);
}
});
// TODO Configure subscription.
// defaultListener.setSubscriptionDurable(true);
// defaultListener.setSubscriptionName("masher-service");
// TODO Configure concurrency.
// defaultListener.setConcurrency(config.getConcurrency());
// TODO Configure transaction.
// defaultListener.setSessionTransacted(config.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
listenerContainer = defaultListener;
logger.debug("AllInOneTest Initializing JMS Template: {}", detailsName);
jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(new SpringToJMSMessageConverter());
jmsTemplate.setReceiveTimeout(1000L);
jmsTemplate.setDefaultDestinationName(config.getOutputQueue());
jmsTemplate.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created JMS Template Destination: {}", ret);
return ret;
});
listenerContainer.setDestinationName(config.getOutputQueue());
logger.debug("AllInOneTest Starting Message Listener: {} on {}", detailsName, config.getOutputQueue());
listenerContainer.start();
}
// @Scheduled(fixedRate = 500L)
public void receiveMessage() {
Object message = jmsTemplate.receiveAndConvert();
if (message != null) {
logger.info("AllInOneTest Received: {}", message);
}
}
@Scheduled(fixedRate = 1500L)
public void sendMessage() {
int count = counter.incrementAndGet();
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(String.format("JMS Masher Message %d %s %s", count,
new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), UUID.randomUUID().toString())).build();
logger.info("AllInOneTest Sending: {} [{}]", message.getPayload(), message.getHeaders());
jmsTemplate.convertAndSend(config.getInputQueue(), message);
}
}
我正在调用 DefaultMessageListenerContainer.start(),但我感觉它不是 "starting",我一定是遗漏了什么。
DestinationResolver 是为 JmsTemplate 而不是 DefaultMessageListenerContainer 调用的。
我在控制台中没有看到任何异常。
感谢您的帮助,
韦斯.
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
当您以编程方式创建容器时,而不是让 Spring 将其作为 @Bean
进行管理,您必须调用 afterPropertiesSet()
(在设置所有属性之后,之前你start()
它)。
许多 Spring 组件都是如此。通常让 Spring 管理它们会更好。
我正在尝试创建一些 Spring 使用 DefaultMessageListenerContainer 来侦听来自 IBM MQ 的消息的启动代码。
我可以创建 MQQueueConnectionFactory 并使用 JmsTemplate 发送和接收消息,但这是为了实现高吞吐量并希望使用侦听器而不是轮询。
我已将大部分代码整合到一个组件中,所以我希望我拥有所有相关的东西。
如果我安排 receiveMessage 方法,它会选择排队的消息,所以我知道 sendMessage 方法正在排队消息。
@Component
class AllInOneTest {
private MessagingConfiguration.QueueConfig config;
private MQQueueConnectionFactory connectionFactory;
private JmsTemplate jmsTemplate;
private DefaultMessageListenerContainer listenerContainer;
private final Logger logger = LoggerFactory.getLogger(getClass());
public AllInOneTest(MessagingManager manager) throws JMSException {
String detailsName = "default";
config = manager.getMessagingDetails(detailsName).getConfig();
logger.debug("AllInOneTest Initializing Connection Factory: {}", detailsName);
connectionFactory = new MQQueueConnectionFactory();
connectionFactory.setHostName(config.getHost());
connectionFactory.setPort(config.getPort());
connectionFactory.setTransportType(config.getTransportType());
connectionFactory.setQueueManager(config.getQueueManager());
connectionFactory.setChannel(config.getChannel());
logger.debug("AllInOneTest Initializing Message Listener: {}", detailsName);
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
defaultListener.setConnectionFactory(connectionFactory);
defaultListener.setExceptionListener((ee) -> {
logger.warn(String.format("AllInOneTest Message Listener Error: %s", detailsName), ee);
});
defaultListener.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created Listener Destination: {}", ret);
return ret;
});
defaultListener.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
logger.info("AllInOneTest Listening For Message: {}", message);
}
});
// TODO Configure subscription.
// defaultListener.setSubscriptionDurable(true);
// defaultListener.setSubscriptionName("masher-service");
// TODO Configure concurrency.
// defaultListener.setConcurrency(config.getConcurrency());
// TODO Configure transaction.
// defaultListener.setSessionTransacted(config.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
listenerContainer = defaultListener;
logger.debug("AllInOneTest Initializing JMS Template: {}", detailsName);
jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(new SpringToJMSMessageConverter());
jmsTemplate.setReceiveTimeout(1000L);
jmsTemplate.setDefaultDestinationName(config.getOutputQueue());
jmsTemplate.setDestinationResolver((session, name, pubSub) -> {
Destination ret = session.createQueue(name);
logger.debug("AllInOneTest Created JMS Template Destination: {}", ret);
return ret;
});
listenerContainer.setDestinationName(config.getOutputQueue());
logger.debug("AllInOneTest Starting Message Listener: {} on {}", detailsName, config.getOutputQueue());
listenerContainer.start();
}
// @Scheduled(fixedRate = 500L)
public void receiveMessage() {
Object message = jmsTemplate.receiveAndConvert();
if (message != null) {
logger.info("AllInOneTest Received: {}", message);
}
}
@Scheduled(fixedRate = 1500L)
public void sendMessage() {
int count = counter.incrementAndGet();
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(String.format("JMS Masher Message %d %s %s", count,
new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()), UUID.randomUUID().toString())).build();
logger.info("AllInOneTest Sending: {} [{}]", message.getPayload(), message.getHeaders());
jmsTemplate.convertAndSend(config.getInputQueue(), message);
}
}
我正在调用 DefaultMessageListenerContainer.start(),但我感觉它不是 "starting",我一定是遗漏了什么。
DestinationResolver 是为 JmsTemplate 而不是 DefaultMessageListenerContainer 调用的。
我在控制台中没有看到任何异常。
感谢您的帮助, 韦斯.
DefaultMessageListenerContainer defaultListener = new DefaultMessageListenerContainer();
当您以编程方式创建容器时,而不是让 Spring 将其作为 @Bean
进行管理,您必须调用 afterPropertiesSet()
(在设置所有属性之后,之前你start()
它)。
许多 Spring 组件都是如此。通常让 Spring 管理它们会更好。