如何在 JUnit 中等待 @JMSListener 注释方法完成

How to wait for @JMSListener annotated method to complete in JUnit

所以我正在尝试对 JMS 处理进行一些集成测试,Spring (v4.1.6) 基于代码。

这是一个非常标准的 Spring 设置,带有 @JmsListener 注释方法和 DefaultMessageListenerContainer concurrency 设置为 1,因此只允许 1 个侦听线程.

现在,我利用 ActiveMQ 的嵌入式代理而不依赖任何外部 jms 代理来随时随地 运行 进行测试(我应该从事营销工作)。

所以一切正常,然后我进行 JUnit 测试:

@Test
public void test() {
    sendSomeMessage();
    //how to wait here for the @JMSListener method to complete
    verify();
}

我发送了消息,但随后我需要以某种方式等待 @JMSListener 注释方法完成。我该怎么做?

好吧,我希望我能以某种方式连接到消息驱动的 Pojos 生命周期来做到这一点,但是通过研究其他关于异步代码的 SO 问题,我想出了一个基于 CountDownLatch [=16= 的解决方案]

  1. 所有工作完成后,@JMSListener 注释方法应在 CountDownLatch 上调用 countDown()

    @JmsListener(destination = "dest", containerFactory = "cf")
    public void processMessage(TextMessage message) throws JMSException {
        //do the actual processing
        actualProcessing(message);
        //if there's countDownLatch call the countdown.
        if(countDownLatch != null) {
            countDownLatch.countDown();
        }
    }
    
  2. 在测试方法中

    @Test
    public void test() throws InterruptedException {
        //initialize the countDownLatch and set in on the processing class
        CountDownLatch countDownLatch = new CountDownLatch(1);
        messageProcessor.setCountDownLatch(countDownLatch);
        //sendthemessage
        sendSomeMessage();
        //wait for the processing method to call countdown()
        countDownLatch.await();
        verify();
    }
    

此解决方案的缺点是您必须实际更改 @JMSListener 注释方法,专门用于集成测试

为了避免必须更改实际的@JmsListener 方法,您可以尝试在测试中使用 AOP...

首先像这样创建一个方面class:

@Aspect
public static class JmsListenerInterceptor {
    @org.aspectj.lang.annotation.After("@annotation(org.springframework.jms.annotation.JmsListener)")
    public void afterOnMessage(JoinPoint jp) {
        // Do countdown latch stuff...
    }
}

然后将其添加到您用于测试的应用程序上下文配置中,如下所示:

<aop:aspectj-autoproxy/>
<bean id="jmsListenerInterceptor" class="path.to.your.Test$JmsListenerInterceptor" />

如果一切按计划进行,JmsListenerInterceptor 将倒计时,您无需更改实际代码。

重要提示: 我刚刚发现使用 AOP 和 Mockito 来验证是否调用了 @JmsListener 中的某些方法是一个糟糕的组合。原因似乎是 CGLib classes 的额外包装导致调用 wrong/actual 目标实例而不是 Mockito 代理。

在我的测试中,我有一个@Autowired、@InjectMocks Listener 对象和一个我想要的@Mock Facade 对象验证是否调用了某个方法。

使用 AOP:

  • 测试线程:
    • [JmsListenerTest] 2279812 - class Listener$$EnhancerBySpringCGLIB$$6587f46b (由 [=107= 包装] AOP)
    • [JmsListenerTest] 30960534 - class Facade$$EnhancerByMockitoWithCGLIB$$69fe8952 (由 Mockito 包装)
  • 听众线程:
    • [Listener] 1151375 - class Listener (AOP 包装的目标实例 class)
    • [Listener] 4007155 - class FacadeImpl (不是我们预期的实际实例)

没有 AOP:

  • 测试线程:
    • [JmsListenerTest] 10692528 - class Listener (实例)
    • [JmsListenerTest] 823767 - class Facade$$EnhancerByMockitoWithCGLIB$$773538e8 (由 Mockito 包装)
  • 听众线程:
    • [Listener] 10692528 - class Listener (还是实例)
    • [听众] 823767 - class 门面 $$EnhancerByMockitoWithCGLIB$$773538e8 (仍然是我们模拟的实例)

这表明您需要注意按照我尝试的方式使用 AOP,因为您最终可能会在两个线程中使用不同的实例...

如果要将日志记录添加到 @JmsListener 注释方法,则可以在测试中执行类似的操作 class

@Rule
public OutputCapture outputCapture = new OutputCapture();

@Test
public void test() {
    sendSomeMessage();
    //how to wait here for the @JMSListener method to complete
    Assertions.assertThat(outputCapture.toString()).contains("Message received.");
}

我使用 spring 配置文件并且在测试和生产代码中有不同的 Processor。在我的测试代码中,我在处理后写入 BlockingQueue 可以在测试中等待

例如:

@Configuration
public class MyConfiguration {
   @Bean @Profile("!test")
   public Processor productionProcessor() {
      return new ProductionProcessor();
   }
   @Bean @Profile("test")
   public Processor testProcessor() {
      return new TestProcessor();
   }
   @Bean
   public MyListener myListener(Processor processor) {
      return new MyListener(processor);
   }
}
public class MyListener {
   private final Processor processor;
   // constructor
   @JmsListener(destination = "dest", containerFactory = "cf")
   public void processMessage(TextMessage message) throws JMSException {
      processor.process(message);
   }
}
public class TestProcessor extends ProductionProcessor {
   private final BlockingQueue<TextMessage> queue = new LinkedBlockingQueue<>();
   public void process(Textmessage message) {
      super.process(message);
      queue.add(message);
   }
   public BlockingQueue getQueue() { return queue; }
}
@SpringBootTest
@ActiveProfiles("test")
public class MyListenerTest {
   @Autowired
   private TestProcessor processor;

   @Test
   public void test() {
      sendTestMessageOverMq();
      TextMessage processedMessage = processor.getQueue().poll(10, TimeUnit.SECONDS);
      assertAllOk(processedMessage);
   }

}