Spring 集成测试 Files.inboundAdapter 流程
Spring Integration testing a Files.inboundAdapter flow
我正在尝试测试此流程,但没有按预期运行。流程本身运作良好,但测试似乎有点棘手。
这是我的流程:
@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private String filePath;
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
.filterFunction(...)
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(
Pollers.fixedDelay(500)
.taskExecutor(this.threadPoolTaskExecutor)
.maxMessagesPerPoll(15)))
.transform(new UnZipTransformer())
.enrichHeaders(this::headersEnricher)
.transform(Message.class, this::modifyMessagePayload)
.route(Map.class, this::channelsRouter)
.get();
}
private String channelsRouter(Map<String, File> payload) {
boolean isZip = payload.values()
.stream()
.anyMatch(file -> isZipFile(file));
return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
}
@Bean
public SubscribableChannel xmlChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(XML_CHANNEL);
return channel;
}
@Bean
public SubscribableChannel zipChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(ZIP_CHANNEL);
return channel;
}
//There is a @ServiceActivator on each channel
@ServiceActivator(inputChannel = XML_CHANNEL)
public void handleXml(Message<Map<String, File>> message) {
...
}
@ServiceActivator(inputChannel = ZIP_CHANNEL)
public void handleZip(Message<Map<String, File>> message) {
...
}
//Plus an @Transformer on the XML_CHANNEL
@Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
return xmlFilesByName.values()
.stream()
.map(...)
.collect(Collectors.toList());
}
}
我想测试多个案例,第一个是检查 fileReaderFlow
结束后每个频道上发布的消息负载。
所以我定义了这个测试类:
@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@TempDir
static Path localWorkDir;
@BeforeEach
void setUp() {
copyFileToTheFlowDir(); // here I copy a file to trigger the flow
}
@Test
void checkXmlChannelPayloadTest() throws InterruptedException {
Thread.sleep(1000); //waiting for the flow execution
PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.
xmlChannel.subscribe(message -> {
assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
});
}
}
正如预期的那样,测试不起作用,因为 assertThat(message.getPayload()).isInstanceOf(Map.class);
从未执行过。
阅读 documentation 后,我没有找到任何提示来帮助我解决该问题。任何帮助,将不胜感激!非常感谢
首先,channel.setBeanName(XML_CHANNEL);
不会影响目标 bean。您在 bean 创建阶段执行此操作,依赖注入容器对此设置一无所知:它只是不咨询它。如果你真的想为 bean 名称指定一个 XML_CHANNEL
,你最好查看 @Bean(name)
属性。
测试中的问题是您忽略了流的异步逻辑这一事实。如果线程完全不同并在测试方法之外发出消息,则 Files.inboundAdapter()
有效。因此,即使您可以在向其发送任何消息之前及时订阅该频道,也不意味着您的测试将正常运行:assertThat()
将在不同的线程上执行。因此,您的测试方法上下文没有真正的 JUnit 报告。
所以,我建议做的是:
Files.inboundAdapter()
在测试开始时停止,然后再进行任何您想在测试中进行的设置。或者至少不要将文件放入那个 filePath
,这样通道适配器就不会发出消息。
- 从应用程序上下文中获取频道,如果您希望订阅或使用
ChannelInterceptor
。
- 有一个异步屏障,例如
CountDownLatch
传递给该订阅者。
- 启动通道适配器或将文件放入扫描目录。
- 在验证某些值或状态之前等待异步屏障。
我正在尝试测试此流程,但没有按预期运行。流程本身运作良好,但测试似乎有点棘手。 这是我的流程:
@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private String filePath;
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
.filterFunction(...)
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(
Pollers.fixedDelay(500)
.taskExecutor(this.threadPoolTaskExecutor)
.maxMessagesPerPoll(15)))
.transform(new UnZipTransformer())
.enrichHeaders(this::headersEnricher)
.transform(Message.class, this::modifyMessagePayload)
.route(Map.class, this::channelsRouter)
.get();
}
private String channelsRouter(Map<String, File> payload) {
boolean isZip = payload.values()
.stream()
.anyMatch(file -> isZipFile(file));
return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
}
@Bean
public SubscribableChannel xmlChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(XML_CHANNEL);
return channel;
}
@Bean
public SubscribableChannel zipChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(ZIP_CHANNEL);
return channel;
}
//There is a @ServiceActivator on each channel
@ServiceActivator(inputChannel = XML_CHANNEL)
public void handleXml(Message<Map<String, File>> message) {
...
}
@ServiceActivator(inputChannel = ZIP_CHANNEL)
public void handleZip(Message<Map<String, File>> message) {
...
}
//Plus an @Transformer on the XML_CHANNEL
@Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
return xmlFilesByName.values()
.stream()
.map(...)
.collect(Collectors.toList());
}
}
我想测试多个案例,第一个是检查 fileReaderFlow
结束后每个频道上发布的消息负载。
所以我定义了这个测试类:
@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@TempDir
static Path localWorkDir;
@BeforeEach
void setUp() {
copyFileToTheFlowDir(); // here I copy a file to trigger the flow
}
@Test
void checkXmlChannelPayloadTest() throws InterruptedException {
Thread.sleep(1000); //waiting for the flow execution
PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.
xmlChannel.subscribe(message -> {
assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
});
}
}
正如预期的那样,测试不起作用,因为 assertThat(message.getPayload()).isInstanceOf(Map.class);
从未执行过。
阅读 documentation 后,我没有找到任何提示来帮助我解决该问题。任何帮助,将不胜感激!非常感谢
首先,channel.setBeanName(XML_CHANNEL);
不会影响目标 bean。您在 bean 创建阶段执行此操作,依赖注入容器对此设置一无所知:它只是不咨询它。如果你真的想为 bean 名称指定一个 XML_CHANNEL
,你最好查看 @Bean(name)
属性。
测试中的问题是您忽略了流的异步逻辑这一事实。如果线程完全不同并在测试方法之外发出消息,则 Files.inboundAdapter()
有效。因此,即使您可以在向其发送任何消息之前及时订阅该频道,也不意味着您的测试将正常运行:assertThat()
将在不同的线程上执行。因此,您的测试方法上下文没有真正的 JUnit 报告。
所以,我建议做的是:
Files.inboundAdapter()
在测试开始时停止,然后再进行任何您想在测试中进行的设置。或者至少不要将文件放入那个filePath
,这样通道适配器就不会发出消息。- 从应用程序上下文中获取频道,如果您希望订阅或使用
ChannelInterceptor
。 - 有一个异步屏障,例如
CountDownLatch
传递给该订阅者。 - 启动通道适配器或将文件放入扫描目录。
- 在验证某些值或状态之前等待异步屏障。