如何使用 MockIntegrationContext.substituteMessageHandlerFor 模拟 WebFluxRequestExecutingMessageHandler

How to mock WebFluxRequestExecutingMessageHandler with MockIntegrationContext.substituteMessageHandlerFor

我已经实现了一个 IntegrationFlow 我想在其中执行以下任务:

  1. 轮询目录中的文件
  2. 将文件内容转换为字符串
  3. 通过 WebFluxRequestExecutingMessageHandler 将字符串发送到 REST 端点并使用 AdviceChain 处理成功和错误响应

实施

@Configuration
@Slf4j
public class JsonToRestIntegration {

    @Autowired
    private LoadBalancerExchangeFilterFunction lbFunction;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Value("${rest-service-url}")
    private String restServiceUrl;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel successChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel failureChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(1000).get();
    }

   @Bean
public IntegrationFlow jsonFileToRestFlow() {
    return IntegrationFlows
            .from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))
            .transform(org.springframework.integration.file.dsl.Files.toStringTransformer())
            .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
            .handle(reactiveOutbound())
            .log()
            .channel(httpResponseChannel())
            .get();
}

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(jsonPath));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public MessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://jsonservice")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(restServiceUrl, webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setSuccessChannel(successChannel());
        advice.setOnSuccessExpressionString("payload + ' war erfolgreich'");
        advice.setFailureChannel(failureChannel());
        advice.setOnFailureExpressionString("payload + ' war nicht erfolgreich'");

        return advice;
    }

    @Bean
    public IntegrationFlow loggingFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    String originalFileName = (String) message.getHeaders().get(FileHeaders.FILENAME);
                    log.info("some log");
                })
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return IntegrationFlows.from(successChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = ((AdviceMessage) message).getInputMessage().getHeaders();

                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(jsonSuccessPath);
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("some log", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(failureChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(FileHeaders.ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FileHeaders.FILENAME);

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(tonisJsonErrorPath);
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("some log", e);
                            }
                        }
                    }
                })
                .get();
    }
}

到目前为止它似乎在生产中工作。在测试中我想做以下步骤:

  1. 复制JSON-文件到输入目录
  2. 开始 json 个文件的轮询
  3. 对来自 WebFluxRequestExecutingMessageHandler 的 HTTP 响应进行断言,这些响应通过我的建议链

但是我在测试中遇到了以下任务:

  1. MockIntegrationContext.substituteMessageHandlerFor() 方法模拟 WebFluxRequestExecutingMessageHandler
  2. 手动启动 json 个文件的轮询

测试

@RunWith(SpringRunner.class)
@SpringIntegrationTest()
@Import({JsonToRestIntegration.class})
@JsonTest
public class JsonToRestIntegrationTest {

    @Autowired
    public DirectChannel httpResponseChannel;

    @Value("${json_folder}")
    private String jsonPath;

    @Value("${json_success_folder}")
    private String jsonSuccessPath;

    @Value("${json_error_folder}")
    private String jsonErrorPath;

    @Autowired
    private MockIntegrationContext mockIntegrationContext;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Before
    public void setUp() throws Exception {
        Files.createDirectories(Paths.get(jsonPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldSendJsonToRestEndpointAndReceiveOK() throws Exception {
        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        httpResponseChannel.subscribe(httpResponseHandler());

        this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);
    }

    private MessageHandler httpResponseHandler() {
        return message -> Assert.assertThat(message.getPayload(), is(notNullValue()));
    }

    @Configuration
    @Import({JsonToRestIntegration.class})
    public static class JsonToRestIntegrationTest {

        @Autowired
        public MessageChannel httpResponseChannel;

        @Bean
        public MessageHandler reactiveOutbound() {
            ArgumentCaptor<Message<?>> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);

            MockMessageHandler mockMessageHandler = mockMessageHandler(messageArgumentCaptor).handleNextAndReply(m -> m);
            mockMessageHandler.setOutputChannel(httpResponseChannel);
            return mockMessageHandler;
        }

    }

}

使用模拟的 WebFluX Web 客户端更新了工作示例:

实施

public class JsonToRestIntegration {

    private final LoadBalancerExchangeFilterFunction lbFunction;

    private final BatchConfigurationProperties batchConfigurationProperties;

    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public IntegrationFlow jsonFileToRestFlow() {
        return IntegrationFlows
                .from(fileReadingMessageSource(),  e -> e.id("fileReadingEndpoint"))
                .transform(org.springframework.integration.file.dsl.Files.toStringTransformer("UTF-8"))
                .enrichHeaders(s -> s.header("Content-Type", "application/json; charset=utf8"))
                .handle(reactiveOutbound())
                .channel(httpResponseChannel())
                .get();
    }

    @Bean
    public FileReadingMessageSource fileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(batchConfigurationProperties.getJsonImportFolder()));
        source.setFilter(new SimplePatternFileListFilter("*.json"));
        source.setUseWatchService(true);
        source.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);

        return source;
    }

    @Bean
    public WebFluxRequestExecutingMessageHandler reactiveOutbound() {
        WebClient webClient = WebClient.builder()
                .baseUrl("http://service")
                .filter(lbFunction)
                .build();

        WebFluxRequestExecutingMessageHandler handler = new WebFluxRequestExecutingMessageHandler(batchConfigurationProperties.getServiceUrl(), webClient);

        handler.setHttpMethod(HttpMethod.POST);
        handler.setCharset(StandardCharsets.UTF_8.displayName());
        handler.setOutputChannel(httpResponseChannel());
        handler.setExpectedResponseType(String.class);
        handler.setAdviceChain(singletonList(expressionAdvice()));

        return handler;
    }

    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();

        advice.setTrapException(true);
        advice.setFailureChannel(errorChannel());

        return advice;
    }

    @Bean
    public IntegrationFlow responseFlow() {
        return IntegrationFlows.from(httpResponseChannel())
                .handle(message -> {
                    MessageHeaders messageHeaders = message.getHeaders();
                    File originalFile = (File) messageHeaders.get(ORIGINAL_FILE);
                    String originalFileName = (String) messageHeaders.get(FILENAME);                        

                    if (originalFile != null && originalFileName != null) {

                        File jsonSuccessFolder = new File(batchConfigurationProperties.getJsonSuccessFolder());
                        File jsonSuccessFile = new File(jsonSuccessFolder, originalFileName);

                        try {
                            Files.move(originalFile.toPath(), jsonSuccessFile.toPath());
                        } catch (IOException e) {
                            log.error("Could not move file", e);
                        }
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return IntegrationFlows.from(errorChannel())
                .handle(message -> {
                    Message<?> failedMessage = ((MessagingException) message.getPayload()).getFailedMessage();

                    if (failedMessage != null) {

                        File originalFile = (File) failedMessage.getHeaders().get(ORIGINAL_FILE);
                        String originalFileName = (String) failedMessage.getHeaders().get(FILENAME);                            

                        if (originalFile != null && originalFileName != null) {

                            File jsonErrorFolder = new File(batchConfigurationProperties.getJsonErrorFolder());
                            File jsonErrorFile = new File(jsonErrorFolder, originalFileName);

                            try {
                                Files.move(originalFile.toPath(), jsonErrorFile.toPath());
                            } catch (IOException e) {
                                log.error("Could not move file", originalFileName, e);
                            }
                        }
                    }
                })
                .get();
    }
}

测试

@RunWith(SpringRunner.class)
@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")
@Import({JsonToRestIntegration.class, BatchConfigurationProperties.class})
@JsonTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class JsonToRestIntegrationIT {

    private static final FilenameFilter JSON_FILENAME_FILTER = (dir, name) -> name.endsWith(".json");

    @Autowired
    private BatchConfigurationProperties batchConfigurationProperties;

    @Autowired
    private ObjectMapper om;

    @Autowired
    private MessageHandler reactiveOutbound;

    @Autowired
    private DirectChannel httpResponseChannel;

    @Autowired
    private DirectChannel errorChannel;

    @Autowired
    private FileReadingMessageSource fileReadingMessageSource;

    @Autowired
    private SourcePollingChannelAdapter fileReadingEndpoint;

    @MockBean
    private LoadBalancerExchangeFilterFunction lbFunction;

    private String jsonImportPath;
    private String jsonSuccessPath;
    private String jsonErrorPath;

    @Before
    public void setUp() throws Exception {
        jsonImportPath = batchConfigurationProperties.getJsonImportFolder();
        jsonSuccessPath = batchConfigurationProperties.getJsonSuccessFolder();
        jsonErrorPath = batchConfigurationProperties.getJsonErrorFolder();

        Files.createDirectories(Paths.get(jsonImportPath));
        Files.createDirectories(Paths.get(jsonSuccessPath));
        Files.createDirectories(Paths.get(jsonErrorPath));
    }

    @After
    public void tearDown() throws Exception {
        FileUtils.deleteDirectory(new File(jsonImportPath));
        FileUtils.deleteDirectory(new File(jsonSuccessPath));
        FileUtils.deleteDirectory(new File(jsonErrorPath));
    }

    @Test
    public void shouldMoveJsonFileToSuccessFolderWhenHttpResponseIsOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.OK);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            String valueAsString = null;
            try {
                valueAsString = om.writeValueAsString(new ResponseDto("1"));
            } catch (JsonProcessingException e) {
                fail();
            }
            return response.writeWith(Mono.just(bufferFactory.wrap(valueAsString.getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/test.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(12, TimeUnit.SECONDS), is(true));

        File[] jsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(1));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(0));
    }

    @Test
    public void shouldMoveJsonFileToErrorFolderWhenHttpResponseIsNotOk() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);

        errorChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                latch.countDown();
                super.postSend(message, channel, sent);
            }
        });
        httpResponseChannel.addInterceptor(new ChannelInterceptorAdapter() {
            @Override
            public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
                fail();
            }
        });

        ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8);

            DataBufferFactory bufferFactory = response.bufferFactory();

            return response.writeWith(Mono.just(bufferFactory.wrap("SOME BAD REQUEST".getBytes())))
                    .then(Mono.defer(response::setComplete));

        });

        WebClient webClient = WebClient.builder()
                .clientConnector(httpConnector)
                .build();

        new DirectFieldAccessor(this.reactiveOutbound)
                .setPropertyValue("webClient", webClient);

        File jsonFile = new ClassPathResource("/error.json").getFile();
        Path targetFilePath = Paths.get(jsonImportPath + "/" + jsonFile.getName());
        Files.copy(jsonFile.toPath(), targetFilePath);

        fileReadingEndpoint.start();

        assertThat(latch.await(11, TimeUnit.SECONDS), is(true));

        File[] filesInJsonImportFolder = new File(jsonImportPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonImportFolder, is(notNullValue()));
        assertThat(filesInJsonImportFolder.length, is(0));

        File[] filesInJsonSuccessFolder = new File(jsonSuccessPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonSuccessFolder, is(notNullValue()));
        assertThat(filesInJsonSuccessFolder.length, is(0));

        File[] filesInJsonErrorFolder = new File(jsonErrorPath).listFiles(JSON_FILENAME_FILTER);
        assertThat(filesInJsonErrorFolder, is(notNullValue()));
        assertThat(filesInJsonErrorFolder.length, is(1));
    }
}
  this.mockIntegrationContext.substituteMessageHandlerFor("", reactiveOutbound);

这个方法的第一个参数是一个endpoint id。 (我想我们只是缺少有关这些方法的 Javadocs...)。

所以,你需要的是这样的:

.handle(reactiveOutbound(), e -> e.id("webFluxEndpoint"))

然后在那个测试用例中你做:

 this.mockIntegrationContext.substituteMessageHandlerFor("webFluxEndpoint", reactiveOutbound);

您不需要在测试 class 配置中覆盖 bean。 MockMessageHandler 可以只用在测试方法体中。

您通过 .from(fileReadingMessageSource()) 轮询文件。要进行手动控制,您需要在开始时将其停止。为此,您再次添加 endpoint id

.from(fileReadingMessageSource(), e -> e.id("fileReadingEndpoint"))

然后在测试配置中执行此操作:

@SpringIntegrationTest(noAutoStartup = "fileReadingEndpoint")

WebFlux 的另一种方法是通过自定义 WebClient 模拟对服务器的请求。例如:

ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
        response.setStatusCode(HttpStatus.OK);
        response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

        DataBufferFactory bufferFactory = response.bufferFactory();
        return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())))
                .then(Mono.defer(response::setComplete));
    });

    WebClient webClient = WebClient.builder()
            .clientConnector(httpConnector)
            .build();

    new DirectFieldAccessor(this.reactiveOutbound)
            .setPropertyValue("webClient", webClient);