Spring 集成任务执行器在测试期间大约 1000 毫秒后在没有警告的情况下死机

Spring Integration Task Executor Dies Without Warning After Approx 1000ms during Testing

我正在使用 Spring 集成构建以下流程: 输入通道 -> 分离器 -> 转换器 -> 服务激活器 -> 聚合器

Transformer 和 Service Activator 使用任务执行器链接和执行。在应用程序执行期间,没有任何问题。但是当我尝试 运行 单元测试时,如果有一个很长的 运行ning 任务,正在执行 Service Activator 的执行线程会神秘地退出。为了证明这一点,我创建了一个具有以下配置的示例项目:

<task:executor id="executor" pool-size="20" keep-alive="120" queue-capacity="100"/>
<jms:message-driven-channel-adapter id="helloWorldJMSAdapater" destination="helloWorldJMSQueue"
    channel="helloWorldChannel"/>    
<int:channel id="helloWorldChannel"/>
<int:splitter id="splitter" input-channel="helloWorldChannel" output-channel="execChannel">
    <bean id="stringSplitter" class="hello.Splitter"></bean>
</int:splitter>
<int:channel id="execChannel">
    <int:dispatcher task-executor="executor"></int:dispatcher>
</int:channel>

<int:chain input-channel="execChannel" output-channel="aggregatorChannel">
    <int:transformer>
        <bean id="stringTransformer" class="hello.Transformer"></bean>
    </int:transformer>
    <int:service-activator id="helloWorldServiceActivator" ref="helloWorldAmqService" method="processMsg"/>
</int:chain>
<int:aggregator input-channel="aggregatorChannel" output-channel="errorChannel">
    <bean class="hello.ResponseAggregator"/>
</int:aggregator>

这是拆分器class:

public class Splitter {

public List<String> splitMessage(Message message)  {
    String msg = message.getPayload().toString();
    return Arrays.asList(msg.split(","));
}

}

这是变形金刚class:

public class Transformer {

public String transform(Message message)  {
    String msg = message.getPayload().toString();
    return msg+"t";
}

}

这是服务激活器class:


@Service
public class HelloWorldAmqService {

    public Message processMsg(String msg) throws InterruptedException {
        DateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
        Date date = new Date();
        //Simulate long running process
        if(msg.equalsIgnoreCase("1t")){
            Thread.sleep(500);
            System.out.println("After first sleep");
            Thread.sleep(800);
        }
        System.out.println("*************"+ msg + " as of "+sdf.format(date)+" ***********  " );
        return MessageBuilder.withPayload(msg).build();

    }
}

为了模拟长时间的 运行ning 任务,我在 processMsg 方法中添加了一个 Thread.sleep()

这是 ResponseAggregator class:

@Component
public class ResponseAggregator extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Message aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        StringBuilder builder = new StringBuilder();
        for (Message message : group.getMessages()) {
            builder.append(message.getPayload());
        }
        System.out.println(builder.toString());
        return MessageBuilder.withPayload(builder.toString()).build();
    }
}

我编写了一个单元测试来向频道发送示例消息并测试行为。但是,只要服务激活器线程的处理时间超过大约 1000 毫秒,线程就会在没有任何警告的情况下退出。这是单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsFlowTests {

    @Autowired
    private MessageChannel helloWorldChannel;

    @Autowired
    private HelloWorldAmqService hello;

    @Autowired
    @Qualifier("jmsConnectionFactory")
    ConnectionFactory jmsConnectionFactory;

    @Test
    public void test() {

        helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
        assertThat(true);
    }

}

人们通常希望输出具有汇总结果,即 1t2t3t4t6t(顺序可能不同)。但是应用程序根本没有到达聚合器。负责处理“1t”的线程退出,聚合器根本不会被触发,因为默认行为是等待所有消息到达。

这是我让线程休眠更长时间时的响应,即 Thread.sleep(1000)

*************2t as of 2019/10/14 17:29:58 ***********  
*************3t as of 2019/10/14 17:29:58 ***********  
*************4t as of 2019/10/14 17:29:58 ***********  
*************6t as of 2019/10/14 17:29:58 ***********  


After first sleep

如果我让线程休眠更短的时间,即 Thread.sleep(200)

,这是响应
*************2t as of 2019/10/14 17:31:53 ***********  
*************4t as of 2019/10/14 17:31:53 ***********  
*************6t as of 2019/10/14 17:31:53 ***********  
*************3t as of 2019/10/14 17:31:53 ***********  


After first sleep
*************1t as of 2019/10/14 17:31:53 ***********  
2t3t6t4t1t

您的问题是您遗漏了一个事实,即您的应用程序是 async,但您的测试与多线程解决方案无关。

您在测试方法中发送一条消息,但不执行任何操作以等待输出消息。 因此,启动测试执行的主线程就存在,而其他线程中的所有执行都被抛在后面。

您的想法是发送到 helloWorldChannel 而不是处理 JMS 目的地是一个不错的选择。唯一的问题是您在聚合后不等待流结果。

将端点输出到 errorChannel 中也很奇怪,但您可以在生成消息之前从测试用例中订阅它:

@Autowired
private SubscribableChannel errorChannel;


@Test
public void test() {
    SettableListenableFuture<Message<?>> messageFuture = new SettableListenableFuture<>();
    this.errorChannel.subscribe((message) -> messageFuture.set(message));
    helloWorldChannel.send(MessageBuilder.withPayload("1,2,3,4,6").build());
    Message<?> messageToAssert = messageFuture.get(10, TimeUnit.SECONDS);
    ...
}

这种方式独立于流行为,您的主 JUnit 线程将等待结果 Future