spring boot jms - 在@JmsListner 中发送和接收消息

spring boot jms - send and receive message inside @JmsListner

QUEUE_INI 使用 IBM MQ9、最新的 spring 引导和 IBM 的 spring-boot-starter。

我想使用@JmsListner 接收一条消息,然后不提交,将另一条消息发送到另一个队列,接收响应,然后全部提交。

到目前为止我有下一个:

    @JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
    public void receiveMessage(Message message) throws JMSException {
        LOG.info("Received {} <{}>", QUEUE_IN, message);
        jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
            requestMsg.setStringProperty("type", "com.example.SubRequest");
            return requestMsg;
        });
        LOG.info("Message sent");
        Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
        LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
    }

我卡在 'Message sent' 上了。看起来子请求还没有真正发送。我在 MQ UI 中看到队列深度为 1,但里面没有消息,我的子请求侦听器也没有看到任何消息。

我也试过使用sendAndReceive方法:

Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
    Message msg = session.createTextMessage();
    msg.setStringProperty("type", "com.example.SubRequest");
    LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
    return msg;
});

但是我没有访问模型队列的权限。

有什么方法可以实现吗?

更新: 在你们所有人的共同帮助下,我完成了这项工作。我最终得到了单独的服务,只发送带有 @Transactional(propagation = Propagation.REQUIRES_NEW) 的子请求。所有其他逻辑都保留在主侦听器中。

打开事务 start/end 日志也很有帮助:

logging:
  level:
    org.springframework.transaction.interceptor: trace

您的出站消息发件人与消息接收者在同一交易中注册。因此,在事务提交之前,出站消息的接收者将看不到该消息。我认为你需要启动一个新事务来执行内部过程。

==更新==

所以已经有一段时间了,我没有为此设置开发环境,但我建议这样做。本质上,您将 Listener 和 Sender/Receiver 拆分为两个单独的 类。 Sender/Receiver 应该被注入到您的侦听器中,以便 @Transactional 注释得到尊重。

public class MyListener {
    private final MySender sender;

    public MyListener(MySender sender) {
        this.sender = sender;
    }

    @JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
    public void receiveMessage(Message message) throws JMSException {
        LOG.info("Received {} <{}>", QUEUE_IN, message);
        Message reply = sender.sendAndReceive()
        LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
    }
}


public class MySender {
    private final JmsTemplate jmsTemplate2;
    private final Destination QUEUE_OUT;

    public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
        this.jmsTemplate2 = jmsTemplate2;
        this.QUEUE_OUT = QUEUE_OUT;
    }

    @Transactional(propagation=Propagation.NESTED)  // Or REQUIRES_NEW, edepending on usage
    public Message sendAndReceive() throws JMSException {
        jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
            requestMsg.setStringProperty("type", "com.example.SubRequest");
            return requestMsg;
        });
        LOG.info("Message sent");
        Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
        LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
        return reply;
    }
}

@Transactional(propagation=Propagation.NESTED) 将开始一个新的事务,如果一个已经是 运行(并且 commit/rollback 当方法退出时),所以你应该能够send/receive 消息 return 给听众。

很遗憾,我没有足够的权限,无法对之前的答案添加评论。

正如 @Nicholas 所说,您的消息接收与消息发送在同一个事务中。在 receiveMessage 方法完成之前,初始发送不会提交到队列以供接收方使用。我假设您可能已将接收配置为 RECEIVE_TIMEOUT_INDEFINITE_WAIT 之类的内容,这将阻止该方法的完成。

使用此配置,您已经有一个绑定到 QUEUE_IN 的消息侦听器。传递到该队列的消息将只传递给一个消费者。我看到您正在使用选择器来避免这个问题。

一个选项可能是为 type = com.example.SubResponse 引入另一个消息侦听器,并从第一个中删除阻塞接收。