无法检索使用 MessageProducer.send() 创建的 JmsTemplate.receive() 消息
Unable to retrieve message with JmsTemplate.receive() that was created with MessageProducer.send()
我正在使用 Spring 引导来启动嵌入式 ActiveMQ 代理,并且我正在使用 spring-jms 在运行时为主题动态注册 JMS 端点。我让那部分工作,当我创建 JMS 消息并将其发送到主题时,它最终被我的听众使用。到目前为止,一切都很好。在侦听器内部,如果在消息中设置了 JMS header "JMSReplyTo",我想将简单的 ACK 消息发送到不同的主题。如果是,我创建一个 MessageProducer,目标设置为 header 中设置的主题,然后发送 ACK 消息。该代码似乎有效,或者至少我认为是这样,因为我没有收到任何错误。但是,在我的测试用例中,在发送初始消息后,我试图读取返回另一个主题的 ACK,并且调用超时。我正在使用 JmsTemplate 来发送原始消息以及使用 ACK。我确定我错过了什么,但我不确定是什么。这是显示 SessionAwareMessageListener 的相关 Spring Java 配置部分:
@Bean(name = "sessionAwareMessageListener")
SessionAwareMessageListener<TextMessage> createSessionAwareMessageListener() {
return new SessionAwareMessageListener<TextMessage>() {
@Override
public void onMessage(TextMessage message, Session session) throws JMSException {
log.info(" Received: {} ", message.getText());
// Prepare an ACK reply message
final ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("ACK");
// Send the ACK message back to the replyTo address of the incoming
// message.
if (message.getJMSReplyTo() != null) {
log.info("Sending ACK message to {}", message.getJMSReplyTo());
// session.createTopic(message.getJMSReplyTo().toString());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(textMessage);
}
}
};
}
在我的 Spock 集成测试中,我正在创建 MessageProducer 并通过 JmsTemplate 实例发送消息:
...other stuff here...
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
log.info("Sending text message")
TextMessage message = session.createTextMessage("This is a test")
message.setJMSReplyTo(new ActiveMQTopic("test"))
return message
}
}
... code that registers the JMS endpoint goes here...
JmsTemplate jmsTemplate = context.getBean(JmsTemplate)
assert jmsTemplate
jmsTemplate.send("bar", messageCreator)
消息发送成功,最终由前面显示的 SessionAwareMessageListener 处理。但是当我尝试使用 JmsTemplate.receive() 来检索 ACK 消息时,它会阻塞直到调用超时,并且 returns 为空。主题上没有为 ACK 消息注册的特定侦听器或动态创建的端点,因此我认为可以使用 JmsTemplate.receive() 来使用消息。显然我在这里做错了什么,但我真的不知道是什么。这是我尝试接收 ACK 的代码片段:
jmsTemplate.setReceiveTimeout(2000L)
Message received = jmsTemplate.receive("test")
assert received instanceof ActiveMQTextMessage
log.info("Received message: ${received}")
assert ((ActiveMQTextMessage) received).text == "ACK"
如果能指出更好的方法,我将不胜感激。在这一点上,我真的只是在试验。我知道我需要动态注册端点,并可能将消息转发到其他主题。谢谢。
将主题用于 request/reply 消息传递通常不是一个好主意,尤其是在回复端。
- 回复可能会在
receive()
订阅主题之前到达;代理将丢弃此类消息,除非该订阅是持久的。
- 您通常只希望将回复发送给发起者。
使用队列而不是主题,尤其是对于回复。
考虑使用 JmsTemplate
作为回复,而不是创建您自己的制作人;如果您正在使用 DefaultMessageListenerContainer
和会话交易(您应该始终将 sessionTransacted
与 DMLC 一起使用以避免丢失消息),模板将自动使用相同的会话。
如果您希望对一条消息有多个响应,可以在请求端使用主题,但是,同样,除非订阅是持久的,否则回复的数量将是不确定的——您不能告诉生产者有多少消费者会收到它。
我正在使用 Spring 引导来启动嵌入式 ActiveMQ 代理,并且我正在使用 spring-jms 在运行时为主题动态注册 JMS 端点。我让那部分工作,当我创建 JMS 消息并将其发送到主题时,它最终被我的听众使用。到目前为止,一切都很好。在侦听器内部,如果在消息中设置了 JMS header "JMSReplyTo",我想将简单的 ACK 消息发送到不同的主题。如果是,我创建一个 MessageProducer,目标设置为 header 中设置的主题,然后发送 ACK 消息。该代码似乎有效,或者至少我认为是这样,因为我没有收到任何错误。但是,在我的测试用例中,在发送初始消息后,我试图读取返回另一个主题的 ACK,并且调用超时。我正在使用 JmsTemplate 来发送原始消息以及使用 ACK。我确定我错过了什么,但我不确定是什么。这是显示 SessionAwareMessageListener 的相关 Spring Java 配置部分:
@Bean(name = "sessionAwareMessageListener")
SessionAwareMessageListener<TextMessage> createSessionAwareMessageListener() {
return new SessionAwareMessageListener<TextMessage>() {
@Override
public void onMessage(TextMessage message, Session session) throws JMSException {
log.info(" Received: {} ", message.getText());
// Prepare an ACK reply message
final ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("ACK");
// Send the ACK message back to the replyTo address of the incoming
// message.
if (message.getJMSReplyTo() != null) {
log.info("Sending ACK message to {}", message.getJMSReplyTo());
// session.createTopic(message.getJMSReplyTo().toString());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(textMessage);
}
}
};
}
在我的 Spock 集成测试中,我正在创建 MessageProducer 并通过 JmsTemplate 实例发送消息:
...other stuff here...
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
log.info("Sending text message")
TextMessage message = session.createTextMessage("This is a test")
message.setJMSReplyTo(new ActiveMQTopic("test"))
return message
}
}
... code that registers the JMS endpoint goes here...
JmsTemplate jmsTemplate = context.getBean(JmsTemplate)
assert jmsTemplate
jmsTemplate.send("bar", messageCreator)
消息发送成功,最终由前面显示的 SessionAwareMessageListener 处理。但是当我尝试使用 JmsTemplate.receive() 来检索 ACK 消息时,它会阻塞直到调用超时,并且 returns 为空。主题上没有为 ACK 消息注册的特定侦听器或动态创建的端点,因此我认为可以使用 JmsTemplate.receive() 来使用消息。显然我在这里做错了什么,但我真的不知道是什么。这是我尝试接收 ACK 的代码片段:
jmsTemplate.setReceiveTimeout(2000L)
Message received = jmsTemplate.receive("test")
assert received instanceof ActiveMQTextMessage
log.info("Received message: ${received}")
assert ((ActiveMQTextMessage) received).text == "ACK"
如果能指出更好的方法,我将不胜感激。在这一点上,我真的只是在试验。我知道我需要动态注册端点,并可能将消息转发到其他主题。谢谢。
将主题用于 request/reply 消息传递通常不是一个好主意,尤其是在回复端。
- 回复可能会在
receive()
订阅主题之前到达;代理将丢弃此类消息,除非该订阅是持久的。 - 您通常只希望将回复发送给发起者。
使用队列而不是主题,尤其是对于回复。
考虑使用 JmsTemplate
作为回复,而不是创建您自己的制作人;如果您正在使用 DefaultMessageListenerContainer
和会话交易(您应该始终将 sessionTransacted
与 DMLC 一起使用以避免丢失消息),模板将自动使用相同的会话。
如果您希望对一条消息有多个响应,可以在请求端使用主题,但是,同样,除非订阅是持久的,否则回复的数量将是不确定的——您不能告诉生产者有多少消费者会收到它。