spring boot jms - 在@JmsListner 中发送和接收消息
spring boot jms - send and receive message inside @JmsListner
QUEUE_INI 使用 IBM MQ9、最新的 spring 引导和 IBM 的 spring-boot-starter。
我想使用@JmsListner 接收一条消息,然后不提交,将另一条消息发送到另一个队列,接收响应,然后全部提交。
到目前为止我有下一个:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
我卡在 'Message sent' 上了。看起来子请求还没有真正发送。我在 MQ UI 中看到队列深度为 1,但里面没有消息,我的子请求侦听器也没有看到任何消息。
我也试过使用sendAndReceive
方法:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
但是我没有访问模型队列的权限。
有什么方法可以实现吗?
更新:
在你们所有人的共同帮助下,我完成了这项工作。我最终得到了单独的服务,只发送带有 @Transactional(propagation = Propagation.REQUIRES_NEW)
的子请求。所有其他逻辑都保留在主侦听器中。
打开事务 start/end 日志也很有帮助:
logging:
level:
org.springframework.transaction.interceptor: trace
您的出站消息发件人与消息接收者在同一交易中注册。因此,在事务提交之前,出站消息的接收者将看不到该消息。我认为你需要启动一个新事务来执行内部过程。
==更新==
所以已经有一段时间了,我没有为此设置开发环境,但我建议这样做。本质上,您将 Listener 和 Sender/Receiver 拆分为两个单独的 类。 Sender/Receiver 应该被注入到您的侦听器中,以便 @Transactional 注释得到尊重。
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, edepending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
@Transactional(propagation=Propagation.NESTED) 将开始一个新的事务,如果一个已经是 运行(并且 commit/rollback 当方法退出时),所以你应该能够send/receive 消息 return 给听众。
很遗憾,我没有足够的权限,无法对之前的答案添加评论。
正如 @Nicholas 所说,您的消息接收与消息发送在同一个事务中。在 receiveMessage
方法完成之前,初始发送不会提交到队列以供接收方使用。我假设您可能已将接收配置为 RECEIVE_TIMEOUT_INDEFINITE_WAIT
之类的内容,这将阻止该方法的完成。
使用此配置,您已经有一个绑定到 QUEUE_IN
的消息侦听器。传递到该队列的消息将只传递给一个消费者。我看到您正在使用选择器来避免这个问题。
一个选项可能是为 type = com.example.SubResponse
引入另一个消息侦听器,并从第一个中删除阻塞接收。
QUEUE_INI 使用 IBM MQ9、最新的 spring 引导和 IBM 的 spring-boot-starter。
我想使用@JmsListner 接收一条消息,然后不提交,将另一条消息发送到另一个队列,接收响应,然后全部提交。
到目前为止我有下一个:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
我卡在 'Message sent' 上了。看起来子请求还没有真正发送。我在 MQ UI 中看到队列深度为 1,但里面没有消息,我的子请求侦听器也没有看到任何消息。
我也试过使用sendAndReceive
方法:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
但是我没有访问模型队列的权限。
有什么方法可以实现吗?
更新:
在你们所有人的共同帮助下,我完成了这项工作。我最终得到了单独的服务,只发送带有 @Transactional(propagation = Propagation.REQUIRES_NEW)
的子请求。所有其他逻辑都保留在主侦听器中。
打开事务 start/end 日志也很有帮助:
logging:
level:
org.springframework.transaction.interceptor: trace
您的出站消息发件人与消息接收者在同一交易中注册。因此,在事务提交之前,出站消息的接收者将看不到该消息。我认为你需要启动一个新事务来执行内部过程。
==更新==
所以已经有一段时间了,我没有为此设置开发环境,但我建议这样做。本质上,您将 Listener 和 Sender/Receiver 拆分为两个单独的 类。 Sender/Receiver 应该被注入到您的侦听器中,以便 @Transactional 注释得到尊重。
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, edepending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
@Transactional(propagation=Propagation.NESTED) 将开始一个新的事务,如果一个已经是 运行(并且 commit/rollback 当方法退出时),所以你应该能够send/receive 消息 return 给听众。
很遗憾,我没有足够的权限,无法对之前的答案添加评论。
正如 @Nicholas 所说,您的消息接收与消息发送在同一个事务中。在 receiveMessage
方法完成之前,初始发送不会提交到队列以供接收方使用。我假设您可能已将接收配置为 RECEIVE_TIMEOUT_INDEFINITE_WAIT
之类的内容,这将阻止该方法的完成。
使用此配置,您已经有一个绑定到 QUEUE_IN
的消息侦听器。传递到该队列的消息将只传递给一个消费者。我看到您正在使用选择器来避免这个问题。
一个选项可能是为 type = com.example.SubResponse
引入另一个消息侦听器,并从第一个中删除阻塞接收。