Spring 引导 JMSListener 被不同目的地的其他 JMSListener 阻止

Spring Boot JMSListener blocked by other JMSListener for different destination

我在使用 JMS over ActiveMQ 5 作为代理的 Spring 启动应用程序中遇到了以下情况。

一个 @JMSListener 注释方法处理消息并将响应消息发送到不同的目的地。此目标还有一个 @JMSListener,当响应已发送到代理时不会调用它,只有当原始侦听器的处理完全完成时才调用。如果此侦听器额外使用 @Async 注释,则按预期发送后立即收到响应。

原项目太大,所以我准备了下面的最小示例。 它包含一个 Spring 引导应用程序 TestApp 和一个 @JmsListener (1) 其中立即将消息从目标 in 转发到 out 并且 afterwards 休眠 3 秒。

应用程序在测试中启动,该测试向 in 发送一条消息并等待 2 秒以等待 out 上的响应。

仅当 @Async 出现在 (1) 时,测试才成功。

进一步观察:

问题:为什么在这种情况下接收自发消息被屏蔽了?如何在不使用@Async 的情况下立即接收传出消息?

Update/Solution:正如 Gary 所说,确实存在一个事务,但似乎不是 Spring Boot 的事务,而是由包含的 activemq-lib 创建的事务。

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
    private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);

    @Container
    public static final GenericContainer<?> ACTIVEMQ =
        new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
            .withExposedPorts(8161, 61616)
            .waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
            .withStartupTimeout(Duration.ofSeconds(60))
            .withLogConsumer(new Slf4jLogConsumer(LOG));

    @DynamicPropertySource
    private static void ports(DynamicPropertyRegistry registry)
    {
        registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    private List<String> messages = new LinkedList<>();

    @Async
    @JmsListener(destination = "out")
    public void onOut(String message)
    {
        LOG.warn("Received message from out: {}", message);
        messages.add(message);
    }

    @Test
    public void foo() throws InterruptedException
    {
        LOG.warn("Sending request");
        // Sending some message on destination 'in' to be received and answered by the listener below
        jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());

        LOG.warn("Waiting for repsonse");

        // (2)    // Try to receive response from 'out'
        //        jmsTemplate.setReceiveTimeout(2_000);
        //        Message response = jmsTemplate.receive("out");
        //        assertThat(response, notNullValue());
        
        Thread.sleep(2_000);
        assertThat(messages, hasSize(1));

    }
}

@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
    private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);

    public static void main(String[] args)
    {
        SpringApplication.run(TestApp.class, args);
    }

    @Autowired
    private JmsTemplate jmsTemplate;

    // (1)
    // @Async
    @JmsListener(destination = "in")
    public void onIn(String message) throws InterruptedException
    {
        LOG.warn("Received message from in: {}", message);

        jmsTemplate.convertAndSend("out", message);
        LOG.warn("Sent Response");

        LOG.warn("Sleeping ...");
        Thread.sleep(3_000);

        LOG.warn("Finished");
    }
}

这里是pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>foo</groupId>
    <artifactId>jmstest</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.5.3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>


        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>junit-jupiter</artifactId>
            <version>1.15.3</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>
</project>

看起来您正在使用事务,在 @JmsListener 方法退出之前事务不会提交,因此其他消费者不会看到它。

您不能将事务用于此用例。

因此 @Async 有效,因为发送将在不同的事务中执行。