Spring 引导 JMSListener 被不同目的地的其他 JMSListener 阻止
Spring Boot JMSListener blocked by other JMSListener for different destination
我在使用 JMS over ActiveMQ 5 作为代理的 Spring 启动应用程序中遇到了以下情况。
一个 @JMSListener 注释方法处理消息并将响应消息发送到不同的目的地。此目标还有一个 @JMSListener,当响应已发送到代理时不会调用它,只有当原始侦听器的处理完全完成时才调用。如果此侦听器额外使用 @Async 注释,则按预期发送后立即收到响应。
原项目太大,所以我准备了下面的最小示例。
它包含一个 Spring 引导应用程序 TestApp 和一个 @JmsListener (1) 其中立即将消息从目标 in 转发到 out 并且 afterwards 休眠 3 秒。
应用程序在测试中启动,该测试向 in 发送一条消息并等待 2 秒以等待 out 上的响应。
仅当 @Async 出现在 (1) 时,测试才成功。
进一步观察:
- 如果测试使用变体 (2) 并通过 JmsTemplate 而不是使用 JMSListener 接收响应,则行为相同.
- 在任何情况下都可以看到消息在发送后立即出现在代理中。
问题:为什么在这种情况下接收自发消息被屏蔽了?如何在不使用@Async 的情况下立即接收传出消息?
Update/Solution:正如 Gary 所说,确实存在一个事务,但似乎不是 Spring Boot 的事务,而是由包含的 activemq-lib 创建的事务。
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);
@Container
public static final GenericContainer<?> ACTIVEMQ =
new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
.withExposedPorts(8161, 61616)
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
.withStartupTimeout(Duration.ofSeconds(60))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@DynamicPropertySource
private static void ports(DynamicPropertyRegistry registry)
{
registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
}
@Autowired
private JmsTemplate jmsTemplate;
private List<String> messages = new LinkedList<>();
@Async
@JmsListener(destination = "out")
public void onOut(String message)
{
LOG.warn("Received message from out: {}", message);
messages.add(message);
}
@Test
public void foo() throws InterruptedException
{
LOG.warn("Sending request");
// Sending some message on destination 'in' to be received and answered by the listener below
jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());
LOG.warn("Waiting for repsonse");
// (2) // Try to receive response from 'out'
// jmsTemplate.setReceiveTimeout(2_000);
// Message response = jmsTemplate.receive("out");
// assertThat(response, notNullValue());
Thread.sleep(2_000);
assertThat(messages, hasSize(1));
}
}
@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);
public static void main(String[] args)
{
SpringApplication.run(TestApp.class, args);
}
@Autowired
private JmsTemplate jmsTemplate;
// (1)
// @Async
@JmsListener(destination = "in")
public void onIn(String message) throws InterruptedException
{
LOG.warn("Received message from in: {}", message);
jmsTemplate.convertAndSend("out", message);
LOG.warn("Sent Response");
LOG.warn("Sleeping ...");
Thread.sleep(3_000);
LOG.warn("Finished");
}
}
这里是pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>foo</groupId>
<artifactId>jmstest</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
看起来您正在使用事务,在 @JmsListener
方法退出之前事务不会提交,因此其他消费者不会看到它。
您不能将事务用于此用例。
因此 @Async
有效,因为发送将在不同的事务中执行。
我在使用 JMS over ActiveMQ 5 作为代理的 Spring 启动应用程序中遇到了以下情况。
一个 @JMSListener 注释方法处理消息并将响应消息发送到不同的目的地。此目标还有一个 @JMSListener,当响应已发送到代理时不会调用它,只有当原始侦听器的处理完全完成时才调用。如果此侦听器额外使用 @Async 注释,则按预期发送后立即收到响应。
原项目太大,所以我准备了下面的最小示例。 它包含一个 Spring 引导应用程序 TestApp 和一个 @JmsListener (1) 其中立即将消息从目标 in 转发到 out 并且 afterwards 休眠 3 秒。
应用程序在测试中启动,该测试向 in 发送一条消息并等待 2 秒以等待 out 上的响应。
仅当 @Async 出现在 (1) 时,测试才成功。
进一步观察:
- 如果测试使用变体 (2) 并通过 JmsTemplate 而不是使用 JMSListener 接收响应,则行为相同.
- 在任何情况下都可以看到消息在发送后立即出现在代理中。
问题:为什么在这种情况下接收自发消息被屏蔽了?如何在不使用@Async 的情况下立即接收传出消息?
Update/Solution:正如 Gary 所说,确实存在一个事务,但似乎不是 Spring Boot 的事务,而是由包含的 activemq-lib 创建的事务。
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@SpringBootTest(classes = TestApp.class, webEnvironment = NONE)
@Testcontainers
public class JmsTest
{
private static final Logger LOG = LoggerFactory.getLogger(JmsTest.class);
@Container
public static final GenericContainer<?> ACTIVEMQ =
new GenericContainer<>(DockerImageName.parse("rmohr/activemq"))
.withExposedPorts(8161, 61616)
.waitingFor(new LogMessageWaitStrategy().withRegEx(".*Apache ActiveMQ .* started.*"))
.withStartupTimeout(Duration.ofSeconds(60))
.withLogConsumer(new Slf4jLogConsumer(LOG));
@DynamicPropertySource
private static void ports(DynamicPropertyRegistry registry)
{
registry.add("spring.activemq.broker-url", () -> "tcp://" + ACTIVEMQ.getHost() + ":" + ACTIVEMQ.getMappedPort(61616));
}
@Autowired
private JmsTemplate jmsTemplate;
private List<String> messages = new LinkedList<>();
@Async
@JmsListener(destination = "out")
public void onOut(String message)
{
LOG.warn("Received message from out: {}", message);
messages.add(message);
}
@Test
public void foo() throws InterruptedException
{
LOG.warn("Sending request");
// Sending some message on destination 'in' to be received and answered by the listener below
jmsTemplate.convertAndSend("in", UUID.randomUUID().toString());
LOG.warn("Waiting for repsonse");
// (2) // Try to receive response from 'out'
// jmsTemplate.setReceiveTimeout(2_000);
// Message response = jmsTemplate.receive("out");
// assertThat(response, notNullValue());
Thread.sleep(2_000);
assertThat(messages, hasSize(1));
}
}
@SpringBootApplication
@EnableJms
@EnableAsync
class TestApp
{
private static final Logger LOG = LoggerFactory.getLogger(TestApp.class);
public static void main(String[] args)
{
SpringApplication.run(TestApp.class, args);
}
@Autowired
private JmsTemplate jmsTemplate;
// (1)
// @Async
@JmsListener(destination = "in")
public void onIn(String message) throws InterruptedException
{
LOG.warn("Received message from in: {}", message);
jmsTemplate.convertAndSend("out", message);
LOG.warn("Sent Response");
LOG.warn("Sleeping ...");
Thread.sleep(3_000);
LOG.warn("Finished");
}
}
这里是pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>foo</groupId>
<artifactId>jmstest</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
看起来您正在使用事务,在 @JmsListener
方法退出之前事务不会提交,因此其他消费者不会看到它。
您不能将事务用于此用例。
因此 @Async
有效,因为发送将在不同的事务中执行。