Spring 集成测试 Files.inboundAdapter 流程

Spring Integration testing a Files.inboundAdapter flow

我正在尝试测试此流程,但没有按预期运行。流程本身运作良好,但测试似乎有点棘手。 这是我的流程:

@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
    private String filePath;
    
    @Bean
    public IntegrationFlow fileReaderFlow() {
      
        return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
                                .filterFunction(...)
                                .preventDuplicates(false),
                        endpointConfigurer -> endpointConfigurer.poller(
                             Pollers.fixedDelay(500)
                                .taskExecutor(this.threadPoolTaskExecutor)
                                .maxMessagesPerPoll(15)))
                .transform(new UnZipTransformer())
                .enrichHeaders(this::headersEnricher)
                .transform(Message.class, this::modifyMessagePayload)
                .route(Map.class, this::channelsRouter)
                .get();
    }

   private String channelsRouter(Map<String, File> payload) {
          boolean isZip = payload.values()
                .stream()
                .anyMatch(file -> isZipFile(file));

        return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
    }
  

    @Bean
    public SubscribableChannel xmlChannel() {
        var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
        channel.setBeanName(XML_CHANNEL);
        return channel;
    }
    
    @Bean
    public SubscribableChannel zipChannel() {
        var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
        channel.setBeanName(ZIP_CHANNEL);
        return channel;
    }

   //There is a @ServiceActivator on each channel

    @ServiceActivator(inputChannel = XML_CHANNEL)
    public void handleXml(Message<Map<String, File>> message) {
        ...
    }

    @ServiceActivator(inputChannel = ZIP_CHANNEL)
    public void handleZip(Message<Map<String, File>> message) {
        ...
    }

   //Plus an @Transformer on the XML_CHANNEL

    @Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
    private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
        return xmlFilesByName.values()
                .stream()
                .map(...)
                .collect(Collectors.toList());
    }
}

我想测试多个案例,第一个是检查 fileReaderFlow 结束后每个频道上发布的消息负载。 所以我定义了这个测试类:

@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {
    
    @Autowired
    private MockIntegrationContext mockIntegrationContext;

    @TempDir
    static Path localWorkDir;

    @BeforeEach
    void setUp() {
        copyFileToTheFlowDir(); // here I copy a file to trigger the flow
    }

   
    @Test
    void checkXmlChannelPayloadTest() throws InterruptedException {
       
       Thread.sleep(1000); //waiting for the flow execution

        PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.

        xmlChannel.subscribe(message -> {
            assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
        });
    }
}

正如预期的那样,测试不起作用,因为 assertThat(message.getPayload()).isInstanceOf(Map.class); 从未执行过。 阅读 documentation 后,我没有找到任何提示来帮助我解决该问题。任何帮助,将不胜感激!非常感谢

首先,channel.setBeanName(XML_CHANNEL); 不会影响目标 bean。您在 bean 创建阶段执行此操作,依赖注入容器对此设置一无所知:它只是不咨询它。如果你真的想为 bean 名称指定一个 XML_CHANNEL,你最好查看 @Bean(name) 属性。

测试中的问题是您忽略了流的异步逻辑这一事实。如果线程完全不同并在测试方法之外发出消息,则 Files.inboundAdapter() 有效。因此,即使您可以在向其发送任何消息之前及时订阅该频道,也不意味着您的测试将正常运行:assertThat() 将在不同的线程上执行。因此,您的测试方法上下文没有真正的 JUnit 报告。

所以,我建议做的是:

  1. Files.inboundAdapter() 在测试开始时停止,然后再进行任何您想在测试中进行的设置。或者至少不要将文件放入那个 filePath,这样通道适配器就不会发出消息。
  2. 从应用程序上下文中获取频道,如果您希望订阅或使用 ChannelInterceptor
  3. 有一个异步屏障,例如CountDownLatch 传递给该订阅者。
  4. 启动通道适配器或将文件放入扫描目录。
  5. 在验证某些值或状态之前等待异步屏障。