Spring 集成:如何对轮询器建议进行单元测试

Spring Integration: how to unit test a poller advice

我正在尝试对轮询器的建议进行单元测试,该建议会阻止 mongo 通道适配器的执行,直到满足特定条件(=处理来自该批次的所有消息)。

流程如下所示:

IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory,
            new Query().with(Sort.by(Sort.Direction.DESC, "modifiedDate")).limit(1))
                    .collectionName("metadata")
                    .entityClass(Metadata.class)
                    .expectSingleResult(true),
            e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds))
                    .advice(this.advices.waitUntilCompletedAdvice())))
            .handle((p, h) -> {
                this.advices.waitUntilCompletedAdvice().setWait(true);
                return p;
            })
            .handle(doSomething())
            .channel(Channels.DOCUMENT_HEADER.name())
            .get();

以及以下建议 bean:

@Bean
public WaitUntilCompletedAdvice waitUntilCompletedAdvice() {
    DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1));
    return new WaitUntilCompletedAdvice(trigger);
}

以及建议本身:

public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice {

    AtomicBoolean wait = new AtomicBoolean(false);

    public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        if (getWait())
            return false;
        return true;
    }

    public boolean getWait() {
        return wait.get();
    }

    public void setWait(boolean newWait) {
        if (getWait() == newWait)
            return;

        while (true) {
            if (wait.compareAndSet(!newWait, newWait)) {
                return;
            }
        }
    }
}

我正在使用以下测试来测试流程:

    @Test
    public void testClaimPoollingAdapterFlow() throws Exception {
        // given
        ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
        CountDownLatch receiveLatch = new CountDownLatch(1);
        MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
        this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader", mockMessageHandler);
        LocalDateTime modifiedDate = LocalDateTime.now();
        ProcessingMetadata data = Metadata.builder()
                .modifiedDate(modifiedDate)
                .build();
        assert !this.advices.waitUntilCompletedAdvice().getWait();

        // when
        itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

        // then
        assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
        verify(mockMessageHandler).handleMessage(any());
        assertThat(captor.getValue().getPayload()).isEqualTo(modifiedDate);
        assert this.advices.waitUntilCompletedAdvice().getWait();
    }

效果很好,但是当我向输入通道发送另一条消息时,它仍然在不考虑建议的情况下处理消息。

这是有意为之的行为吗?如果是这样,我如何使用单元测试验证轮询器是否真的在等待这个建议?

itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

绕过轮询器直接发送消息。

您可以通过从测试中调用 beforeReceive() 来对已配置的建议进行单元测试

或者您可以使用相同的建议创建一个虚拟测试流程

IntegationFlows.from(() -> "foo", e -> e.poller(...))
       ...

并确认只发送了一条消息。

示例实现:

@Test
public void testWaitingActivate() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(true);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNull();
}

@Test
public void testWaitingInactive() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(false);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNotNull();
}

@Configuration
@EnableIntegration
public static class Config {

    @Autowired
    private Advices advices;

    @Bean
    public PollableChannel testChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fakeFlow() {
        this.advices.waitUntilCompletedAdvice().setWait(true);
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
                .advice(this.advices.waitUntilCompletedAdvice()))).channel("testChannel").get();
    }
}