Spring 集成任务执行器在测试期间大约 1000 毫秒后在没有警告的情况下死机
Spring Integration Task Executor Dies Without Warning After Approx 1000ms during Testing
我正在使用 Spring 集成构建以下流程:
输入通道 -> 分离器 -> 转换器 -> 服务激活器 -> 聚合器
Transformer 和 Service Activator 使用任务执行器链接和执行。在应用程序执行期间,没有任何问题。但是当我尝试 运行 单元测试时,如果有一个很长的 运行ning 任务,正在执行 Service Activator 的执行线程会神秘地退出。为了证明这一点,我创建了一个具有以下配置的示例项目:
<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
channel="helloWorldChannel"/>
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
<bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
<int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>
<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
<int:transformer>
<bean id="stringTransformer" class="hello.Transformer"></bean>
</int:transformer>
<int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
<bean class="hello.ResponseAggregator"/>
</int:aggregator>
这是拆分器class:
public class Splitter {
public List<String> splitMessage(Message message) {
String msg = message.getPayload().toString();
return Arrays.asList(msg.split(","));
}
}
这是变形金刚class:
public class Transformer {
public String transform(Message message) {
String msg = message.getPayload().toString();
return msg+"t";
}
}
这是服务激活器class:
@Service
public class HelloWorldAmqService {
public Message processMsg(String msg) throws InterruptedException {
DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date date = new Date();
//Simulate long running process
if(msg.equalsIgnoreCase("1t")){
Thread.sleep(500);
System.out.println("After first sleep");
Thread.sleep(800);
}
System.out.println("*************"+ msg + " as of "+sdf.format(date)+" *********** " );
return MessageBuilder.withPayload(msg).build();
}
}
为了模拟长时间的 运行ning 任务,我在 processMsg 方法中添加了一个 Thread.sleep()
。
这是 ResponseAggregator class:
@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
StringBuilder builder = new StringBuilder();
for (Message message : group.getMessages()) {
builder.append(message.getPayload());
}
System.out.println(builder.toString());
return MessageBuilder.withPayload(builder.toString()).build();
}
}
我编写了一个单元测试来向频道发送示例消息并测试行为。但是,只要服务激活器线程的处理时间超过大约 1000 毫秒,线程就会在没有任何警告的情况下退出。这是单元测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {
@Autowired
private MessageChannel helloWorldChannel;
@Autowired
private HelloWorldAmqService hello;
@Autowired
@Qualifier("jmsConnectionFactory")
ConnectionFactory jmsConnectionFactory;
@Test
public void test() {
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
assertThat(true);
}
}
人们通常希望输出具有汇总结果,即
1t2t3t4t6t(顺序可能不同)。但是应用程序根本没有到达聚合器。负责处理“1t”的线程退出,聚合器根本不会被触发,因为默认行为是等待所有消息到达。
这是我让线程休眠更长时间时的响应,即 Thread.sleep(1000)
*************2t as of 2019/10/14 17:29:58 ***********
*************3t as of 2019/10/14 17:29:58 ***********
*************4t as of 2019/10/14 17:29:58 ***********
*************6t as of 2019/10/14 17:29:58 ***********
After first sleep
如果我让线程休眠更短的时间,即 Thread.sleep(200)
,这是响应
*************2t as of 2019/10/14 17:31:53 ***********
*************4t as of 2019/10/14 17:31:53 ***********
*************6t as of 2019/10/14 17:31:53 ***********
*************3t as of 2019/10/14 17:31:53 ***********
After first sleep
*************1t as of 2019/10/14 17:31:53 ***********
2t3t6t4t1t
您的问题是您遗漏了一个事实,即您的应用程序是 async
,但您的测试与多线程解决方案无关。
您在测试方法中发送一条消息,但不执行任何操作以等待输出消息。
因此,启动测试执行的主线程就存在,而其他线程中的所有执行都被抛在后面。
您的想法是发送到 helloWorldChannel
而不是处理 JMS 目的地是一个不错的选择。唯一的问题是您在聚合后不等待流结果。
将端点输出到 errorChannel
中也很奇怪,但您可以在生成消息之前从测试用例中订阅它:
@Autowired
private SubscribableChannel errorChannel;
@Test
public void test() {
SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();
this.errorChannel.subscribe((message) -> messageFuture.set(message));
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);
...
}
这种方式独立于流行为,您的主 JUnit 线程将等待结果 Future
。
我正在使用 Spring 集成构建以下流程: 输入通道 -> 分离器 -> 转换器 -> 服务激活器 -> 聚合器
Transformer 和 Service Activator 使用任务执行器链接和执行。在应用程序执行期间,没有任何问题。但是当我尝试 运行 单元测试时,如果有一个很长的 运行ning 任务,正在执行 Service Activator 的执行线程会神秘地退出。为了证明这一点,我创建了一个具有以下配置的示例项目:
<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
channel="helloWorldChannel"/>
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
<bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
<int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>
<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
<int:transformer>
<bean id="stringTransformer" class="hello.Transformer"></bean>
</int:transformer>
<int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
<bean class="hello.ResponseAggregator"/>
</int:aggregator>
这是拆分器class:
public class Splitter {
public List<String> splitMessage(Message message) {
String msg = message.getPayload().toString();
return Arrays.asList(msg.split(","));
}
}
这是变形金刚class:
public class Transformer {
public String transform(Message message) {
String msg = message.getPayload().toString();
return msg+"t";
}
}
这是服务激活器class:
@Service
public class HelloWorldAmqService {
public Message processMsg(String msg) throws InterruptedException {
DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Date date = new Date();
//Simulate long running process
if(msg.equalsIgnoreCase("1t")){
Thread.sleep(500);
System.out.println("After first sleep");
Thread.sleep(800);
}
System.out.println("*************"+ msg + " as of "+sdf.format(date)+" *********** " );
return MessageBuilder.withPayload(msg).build();
}
}
为了模拟长时间的 运行ning 任务,我在 processMsg 方法中添加了一个 Thread.sleep()
。
这是 ResponseAggregator class:
@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
StringBuilder builder = new StringBuilder();
for (Message message : group.getMessages()) {
builder.append(message.getPayload());
}
System.out.println(builder.toString());
return MessageBuilder.withPayload(builder.toString()).build();
}
}
我编写了一个单元测试来向频道发送示例消息并测试行为。但是,只要服务激活器线程的处理时间超过大约 1000 毫秒,线程就会在没有任何警告的情况下退出。这是单元测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {
@Autowired
private MessageChannel helloWorldChannel;
@Autowired
private HelloWorldAmqService hello;
@Autowired
@Qualifier("jmsConnectionFactory")
ConnectionFactory jmsConnectionFactory;
@Test
public void test() {
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
assertThat(true);
}
}
人们通常希望输出具有汇总结果,即 1t2t3t4t6t(顺序可能不同)。但是应用程序根本没有到达聚合器。负责处理“1t”的线程退出,聚合器根本不会被触发,因为默认行为是等待所有消息到达。
这是我让线程休眠更长时间时的响应,即 Thread.sleep(1000)
*************2t as of 2019/10/14 17:29:58 ***********
*************3t as of 2019/10/14 17:29:58 ***********
*************4t as of 2019/10/14 17:29:58 ***********
*************6t as of 2019/10/14 17:29:58 ***********
After first sleep
如果我让线程休眠更短的时间,即 Thread.sleep(200)
*************2t as of 2019/10/14 17:31:53 ***********
*************4t as of 2019/10/14 17:31:53 ***********
*************6t as of 2019/10/14 17:31:53 ***********
*************3t as of 2019/10/14 17:31:53 ***********
After first sleep
*************1t as of 2019/10/14 17:31:53 ***********
2t3t6t4t1t
您的问题是您遗漏了一个事实,即您的应用程序是 async
,但您的测试与多线程解决方案无关。
您在测试方法中发送一条消息,但不执行任何操作以等待输出消息。 因此,启动测试执行的主线程就存在,而其他线程中的所有执行都被抛在后面。
您的想法是发送到 helloWorldChannel
而不是处理 JMS 目的地是一个不错的选择。唯一的问题是您在聚合后不等待流结果。
将端点输出到 errorChannel
中也很奇怪,但您可以在生成消息之前从测试用例中订阅它:
@Autowired
private SubscribableChannel errorChannel;
@Test
public void test() {
SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();
this.errorChannel.subscribe((message) -> messageFuture.set(message));
helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);
...
}
这种方式独立于流行为,您的主 JUnit 线程将等待结果 Future
。