测试 Spring 集成流程
Testing Spring Integration Flow
实际上,我正在为 运行 Spring 与 Kubernetes 的集成创建一个 POC,为此我创建了一个集成流程,该流程读取 XML 文件并将其移动到 Processed Dir 如果这是一个有效的 xml 文件,否则将其移动到错误目录
package com.Whosebug.questions.config;
import static java.util.Arrays.asList;
import com.Whosebug.questions.dto.WriteResult;
import com.Whosebug.questions.handler.FileReaderHandler;
import com.Whosebug.questions.handler.StudentErrorHandler;
import com.Whosebug.questions.handler.StudentWriterHandler;
import com.Whosebug.questions.service.DirectoryManagerService;
import com.Whosebug.questions.transformer.FileToStudentTransformer;
import java.io.File;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.RecursiveDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class MainIntegrationFlow {
@Value("${regex.filename.pattern}")
private String regexFileNamePattern;
@Value("${root.file.dir}")
private String rootFileDir;
@Value("${default.polling.rate}")
private Long defaultPollingRate;
private final DirectoryManagerService directoryManagerService;
private final StudentErrorHandler studentErrorHandler;
private final FileReaderHandler fileReaderHandler;
private final StudentWriterHandler studentWriterHandler;
private final FileToStudentTransformer fileToStudentTransformer;
@Bean("mainStudentIntegrationFlow")
public IntegrationFlow mainStudentIntegrationFlow(
@Qualifier("mainFileReadingSourceMessage") MessageSource<File> mainFileReadingSourceMessage,
@Qualifier("fileReaderChannel") MessageChannel fileReaderChannel) {
return IntegrationFlows.from(mainFileReadingSourceMessage)
.channel(fileReaderChannel)
.handle(fileReaderHandler)
.transform(fileToStudentTransformer)
.handle(studentWriterHandler)
.<WriteResult, Boolean>route(WriteResult::isWriten,
mapping -> mapping
.subFlowMapping(true, moveToProcessedDirFlow())
.subFlowMapping(false, moveToErrorDirFlow()))
.get();
}
public IntegrationFlow moveToProcessedDirFlow() {
return flow -> flow.handle(message ->
directoryManagerService
.moveToProcessedDir(((WriteResult) message.getPayload()).getFilename()));
}
public IntegrationFlow moveToErrorDirFlow() {
return flow -> flow.channel("studentErrorChannel")
.handle(message ->
directoryManagerService
.moveToErrorDir(((WriteResult) message.getPayload()).getFilename()));
}
@Bean(name = "errorHandlerMainFlow")
public IntegrationFlow errorHandlerMainFlow() {
return IntegrationFlows.from("errorChannel")
.handle(studentErrorHandler)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata mainPollerMetadata() {
return Pollers.fixedRate(defaultPollingRate, TimeUnit.SECONDS)
.maxMessagesPerPoll(0)
.get();
}
@Bean(name = "fileReaderChannel")
public MessageChannel fileReaderChannel() {
return MessageChannels.queue("fileReaderChannel").get();
}
@Bean("mainDirectoryScanner")
public DirectoryScanner mainDirectoryScanner() {
DirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();
CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<>(
asList(new AcceptOnceFileListFilter<>(),
new RegexPatternFileListFilter(regexFileNamePattern)));
recursiveDirectoryScanner.setFilter(compositeFileListFilter);
return recursiveDirectoryScanner;
}
@Bean("mainFileReadingSourceMessage")
public MessageSource<File> mainFileReadingSourceMessage(
@Qualifier("mainDirectoryScanner") DirectoryScanner mainDirectoryScanner) {
FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
fileReadingMessageSource.setDirectory(new File(rootFileDir));
fileReadingMessageSource.setScanner(mainDirectoryScanner);
return fileReadingMessageSource;
}
}
我正在尝试测试整个流程,为此我创建了一个测试 class:
@SpringBootTest
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
public class MainFlowIntegratoinTests {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@Autowired
private SourcePollingChannelAdapter fileReadingEndpoint;
@Test
public void readingInvalidFileAndMoveItToErrorDir() throws IOException {
File file = new ClassPathResource("valid01-student-01.xml").getFile();
MessageSource<File> mockInvalidStudentFile = () -> MessageBuilder.withPayload(file).build();
mockIntegrationContext.substituteMessageSourceFor("fileReadingEndpoint", mockInvalidStudentFile);
// start the file adapter manually
fileReadingEndpoint.start();
}
}
我正在其中测试我的集成流程,但不知何故测试没有到达编写器端点,我可以看到来自 reader 和转换器端点但不是来自编写器的日志。
我试图阅读文档 - https://docs.spring.io/spring-integration/reference/html/testing.html - 但我无法理解。
能否请您提供示例或更多有关如何测试整个集成流程的详细信息。
工作测试:
package com.Whosebug.questions;
import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;
import com.Whosebug.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;
@SpringBootTest
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
public class MainFlowIntegrationTests {
private static final String MOCK_FILE_DIR = "intFiles/";
private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";
@Autowired
private MessageChannel fileReaderChannel;
@Autowired
private DirectoryManagerService directoryManagerService;
private File queueDir;
private File processed;
private File error;
@BeforeEach
public void setup() throws IOException {
createRequiredDirectories();
moveFilesToQueueDir();
injectProperties();
}
@AfterEach
public void tearDown() throws IOException {
deleteRequiredDirectories();
}
@Test
public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
// When: the fileReaderChannel receives a valid XML file
fileReaderChannel
.send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());
// Then: the valid XML file should be sent to the processedDir
await().until(() -> processed.list().length == 1);
}
@Test
public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
// When: the fileReaderChannel receives a invalid XML file
fileReaderChannel
.send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());
// Then: the invalid XML file should be sent to the errorDir
await().until(() -> error.list().length == 1);
}
private void injectProperties() {
ReflectionTestUtils
.setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
ReflectionTestUtils
.setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
}
private void moveFilesToQueueDir() throws IOException {
File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();
for (String filename : intFiles.list()) {
FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
}
}
private void createRequiredDirectories() throws IOException {
queueDir = Files.createTempDirectory("queueDir").toFile();
processed = Files.createTempDirectory("processedDir").toFile();
error = Files.createTempDirectory("errorDir").toFile();
}
private void deleteRequiredDirectories() throws IOException {
forceDelete(queueDir);
forceDelete(processed);
forceDelete(error);
}
}
看起来您根本没有测试或验证 readingInvalidFileAndMoveItToErrorDir()
中的任何内容。 fileReadingEndpoint.start();
是测试方法的最后一行。
请考虑调查什么是 JUnit 以及我们应该如何编写测试方法以及我们应该如何真正验证异步解决方案,例如类似于您的集成流程:https://junit.org/junit5/docs/current/user-guide/
到目前为止,我看到的情况并不好。消息来源是关于一段时间的轮询。您的 mockInvalidStudentFile
一直在生成相同的文件。你的解决方案真的需要它吗?您可能只考虑将文件直接发送到消息源配置的通道。
您不需要 fileReadingEndpoint.start();
,因为 substituteMessageSourceFor()
默认会自动启动。
将文件作为有效载荷发送到通道后,您的消息将通过流传输。您可能应该了解如何在测试中验证逻辑的正确性。由于您说您将文件放在流程末尾的某个目录中,因此您可能应该在发送后检查该目录。甚至可能使用一些循环或 Awaitility:https://github.com/awaitility/awaitility.
另一种方法是使用 substituteMessageHandlerFor()
,这样您就可以放置 MockMessageHandler
而不是文件编写器来验证文件。注意:如果您将原始消息直接发送到频道,而不是作为模拟 MessageSource
来源,并且您的流程中没有线程转移,这将起作用。因此,所有流程都将在与您的测试方法相同的线程中执行。如果你在流程中有一些异步切换,你的 MockMessageHandler
应该做一些 CountDonwLatch
来阻止测试方法。
说所有这些我的意思是,随着实践和经验而来的测试理解有很多细微差别。可能无法为您做一些可能对其他人有用的示例,因为解决方案可能不同并且需要其他测试方法。
因此我的建议是:尽你所能,你知道如何测试你自己的解决方案。
实际上,我正在为 运行 Spring 与 Kubernetes 的集成创建一个 POC,为此我创建了一个集成流程,该流程读取 XML 文件并将其移动到 Processed Dir 如果这是一个有效的 xml 文件,否则将其移动到错误目录
package com.Whosebug.questions.config;
import static java.util.Arrays.asList;
import com.Whosebug.questions.dto.WriteResult;
import com.Whosebug.questions.handler.FileReaderHandler;
import com.Whosebug.questions.handler.StudentErrorHandler;
import com.Whosebug.questions.handler.StudentWriterHandler;
import com.Whosebug.questions.service.DirectoryManagerService;
import com.Whosebug.questions.transformer.FileToStudentTransformer;
import java.io.File;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.RecursiveDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;
@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class MainIntegrationFlow {
@Value("${regex.filename.pattern}")
private String regexFileNamePattern;
@Value("${root.file.dir}")
private String rootFileDir;
@Value("${default.polling.rate}")
private Long defaultPollingRate;
private final DirectoryManagerService directoryManagerService;
private final StudentErrorHandler studentErrorHandler;
private final FileReaderHandler fileReaderHandler;
private final StudentWriterHandler studentWriterHandler;
private final FileToStudentTransformer fileToStudentTransformer;
@Bean("mainStudentIntegrationFlow")
public IntegrationFlow mainStudentIntegrationFlow(
@Qualifier("mainFileReadingSourceMessage") MessageSource<File> mainFileReadingSourceMessage,
@Qualifier("fileReaderChannel") MessageChannel fileReaderChannel) {
return IntegrationFlows.from(mainFileReadingSourceMessage)
.channel(fileReaderChannel)
.handle(fileReaderHandler)
.transform(fileToStudentTransformer)
.handle(studentWriterHandler)
.<WriteResult, Boolean>route(WriteResult::isWriten,
mapping -> mapping
.subFlowMapping(true, moveToProcessedDirFlow())
.subFlowMapping(false, moveToErrorDirFlow()))
.get();
}
public IntegrationFlow moveToProcessedDirFlow() {
return flow -> flow.handle(message ->
directoryManagerService
.moveToProcessedDir(((WriteResult) message.getPayload()).getFilename()));
}
public IntegrationFlow moveToErrorDirFlow() {
return flow -> flow.channel("studentErrorChannel")
.handle(message ->
directoryManagerService
.moveToErrorDir(((WriteResult) message.getPayload()).getFilename()));
}
@Bean(name = "errorHandlerMainFlow")
public IntegrationFlow errorHandlerMainFlow() {
return IntegrationFlows.from("errorChannel")
.handle(studentErrorHandler)
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata mainPollerMetadata() {
return Pollers.fixedRate(defaultPollingRate, TimeUnit.SECONDS)
.maxMessagesPerPoll(0)
.get();
}
@Bean(name = "fileReaderChannel")
public MessageChannel fileReaderChannel() {
return MessageChannels.queue("fileReaderChannel").get();
}
@Bean("mainDirectoryScanner")
public DirectoryScanner mainDirectoryScanner() {
DirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();
CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<>(
asList(new AcceptOnceFileListFilter<>(),
new RegexPatternFileListFilter(regexFileNamePattern)));
recursiveDirectoryScanner.setFilter(compositeFileListFilter);
return recursiveDirectoryScanner;
}
@Bean("mainFileReadingSourceMessage")
public MessageSource<File> mainFileReadingSourceMessage(
@Qualifier("mainDirectoryScanner") DirectoryScanner mainDirectoryScanner) {
FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
fileReadingMessageSource.setDirectory(new File(rootFileDir));
fileReadingMessageSource.setScanner(mainDirectoryScanner);
return fileReadingMessageSource;
}
}
我正在尝试测试整个流程,为此我创建了一个测试 class:
@SpringBootTest
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
public class MainFlowIntegratoinTests {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@Autowired
private SourcePollingChannelAdapter fileReadingEndpoint;
@Test
public void readingInvalidFileAndMoveItToErrorDir() throws IOException {
File file = new ClassPathResource("valid01-student-01.xml").getFile();
MessageSource<File> mockInvalidStudentFile = () -> MessageBuilder.withPayload(file).build();
mockIntegrationContext.substituteMessageSourceFor("fileReadingEndpoint", mockInvalidStudentFile);
// start the file adapter manually
fileReadingEndpoint.start();
}
}
我正在其中测试我的集成流程,但不知何故测试没有到达编写器端点,我可以看到来自 reader 和转换器端点但不是来自编写器的日志。
我试图阅读文档 - https://docs.spring.io/spring-integration/reference/html/testing.html - 但我无法理解。
能否请您提供示例或更多有关如何测试整个集成流程的详细信息。
工作测试:
package com.Whosebug.questions;
import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;
import com.Whosebug.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;
@SpringBootTest
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
public class MainFlowIntegrationTests {
private static final String MOCK_FILE_DIR = "intFiles/";
private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";
@Autowired
private MessageChannel fileReaderChannel;
@Autowired
private DirectoryManagerService directoryManagerService;
private File queueDir;
private File processed;
private File error;
@BeforeEach
public void setup() throws IOException {
createRequiredDirectories();
moveFilesToQueueDir();
injectProperties();
}
@AfterEach
public void tearDown() throws IOException {
deleteRequiredDirectories();
}
@Test
public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
// When: the fileReaderChannel receives a valid XML file
fileReaderChannel
.send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());
// Then: the valid XML file should be sent to the processedDir
await().until(() -> processed.list().length == 1);
}
@Test
public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
// When: the fileReaderChannel receives a invalid XML file
fileReaderChannel
.send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());
// Then: the invalid XML file should be sent to the errorDir
await().until(() -> error.list().length == 1);
}
private void injectProperties() {
ReflectionTestUtils
.setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
ReflectionTestUtils
.setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
}
private void moveFilesToQueueDir() throws IOException {
File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();
for (String filename : intFiles.list()) {
FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
}
}
private void createRequiredDirectories() throws IOException {
queueDir = Files.createTempDirectory("queueDir").toFile();
processed = Files.createTempDirectory("processedDir").toFile();
error = Files.createTempDirectory("errorDir").toFile();
}
private void deleteRequiredDirectories() throws IOException {
forceDelete(queueDir);
forceDelete(processed);
forceDelete(error);
}
}
看起来您根本没有测试或验证 readingInvalidFileAndMoveItToErrorDir()
中的任何内容。 fileReadingEndpoint.start();
是测试方法的最后一行。
请考虑调查什么是 JUnit 以及我们应该如何编写测试方法以及我们应该如何真正验证异步解决方案,例如类似于您的集成流程:https://junit.org/junit5/docs/current/user-guide/
到目前为止,我看到的情况并不好。消息来源是关于一段时间的轮询。您的 mockInvalidStudentFile
一直在生成相同的文件。你的解决方案真的需要它吗?您可能只考虑将文件直接发送到消息源配置的通道。
您不需要 fileReadingEndpoint.start();
,因为 substituteMessageSourceFor()
默认会自动启动。
将文件作为有效载荷发送到通道后,您的消息将通过流传输。您可能应该了解如何在测试中验证逻辑的正确性。由于您说您将文件放在流程末尾的某个目录中,因此您可能应该在发送后检查该目录。甚至可能使用一些循环或 Awaitility:https://github.com/awaitility/awaitility.
另一种方法是使用 substituteMessageHandlerFor()
,这样您就可以放置 MockMessageHandler
而不是文件编写器来验证文件。注意:如果您将原始消息直接发送到频道,而不是作为模拟 MessageSource
来源,并且您的流程中没有线程转移,这将起作用。因此,所有流程都将在与您的测试方法相同的线程中执行。如果你在流程中有一些异步切换,你的 MockMessageHandler
应该做一些 CountDonwLatch
来阻止测试方法。
说所有这些我的意思是,随着实践和经验而来的测试理解有很多细微差别。可能无法为您做一些可能对其他人有用的示例,因为解决方案可能不同并且需要其他测试方法。
因此我的建议是:尽你所能,你知道如何测试你自己的解决方案。