单元测试 Flux.doOnNext()

Unit Testing Flux.doOnNext()

我有一个将 Flux 作为参数的方法。然后在该方法内部配置 Flux,另一个方法订阅它。

public class StringProcessor {

  private final stringParsingService stringParsingService;

  public void subscribeStringFlux(Flux<String> stringFlux) {
    fluxConfiguration(stringFlux)
        .subscribe();
  }

  Flux<String> fluxConfiguration(Flux<String> stringFlux) {
    return stringFlux
        .filter(stringValidatorr::isValidString)
        .doOnNext(itemString -> {
          List<String> values = stringParsingService.parseValues(itemString);
        })
        .onErrorContinue((e,object) -> log.error(e.getClass().toString()+" "+e.getMessage()));
  }
}

我正在尝试测试 doOnNext 中的代码是否已执行。我尝试使用 StepVerifier(使用 Mockito 提供服务),但是当我在 stringParsingService.parseValue() 中放置断点时,它似乎从未在测试期间输入代码。但是,代码确实 运行 并按预期执行,尽管在使用真实数据的测试中不是 运行。

我的问题是,您如何编写涵盖 Flux.doOnNext() 中执行的操作的测试?有没有办法使用 StepVerifier 让它在 doOnNext() 中执行代码?我已经搜索了好几天,并尝试了多种方法,到目前为止 none 已经奏效了。

到目前为止我发现的唯一接近的方法是执行以下操作(但这当然不计入代码覆盖率):

    Flux<String> testStringFlux = Flux.just("a_test_string");

    StepVerifier.create(testStringFlux)
        .consumeNextWith(itemString -> {
           List<String> values = stringParsingService.parseValues(itemString);
        })
        .verifyComplete();

如果您按如下方式配置通量,它将在 parseValues 函数的断点处停止。但我不确定这是否是你要求的...

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flux_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.4.8</version>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
            <version>3.2.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

StringProcessor.java

import org.apache.log4j.Logger;
import reactor.core.publisher.Flux;

import java.util.Arrays;
import java.util.List;

public class StringProcessor {

    private final static Logger log = Logger.getLogger(StringProcessor.class.getName());

    private class StringParsingService {
        List<String> parseValues(String str) {
            return Arrays.asList(str.split(","));
        }
    }

    private static class StringValidator {
        static boolean isValidString(String str) {
            return str.length() > 1;
        }
    }

    private final StringParsingService stringParsingService = new StringParsingService();

    public void subscribeStringFlux(Flux<String> stringFlux) {
        fluxConfiguration(stringFlux)
                .subscribe();
    }

    Flux<String> fluxConfiguration(Flux<String> stringFlux) {
        return stringFlux
                .filter(StringValidator::isValidString)
                .doOnNext(itemString -> {
                    List<String> values = stringParsingService.parseValues(itemString);
                })
                .onErrorContinue((e, object) -> log.error(e.getClass().toString() + " " + e.getMessage()));
    }

}

TestFlux.java

import org.junit.Test;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;

public class TestFlux {

    final TestPublisher<String> testPublisher = TestPublisher.create();

    @Test
    public void testFlux() {
        StepVerifier.create(new StringProcessor().fluxConfiguration(testPublisher.flux()))
                .then(() -> testPublisher.emit("aa,bb", "cc,dd"))
                .expectNext("aa,bb")
                .expectNext("cc,dd")
                .verifyComplete();
    }

}

doOnNext 是一个副作用运算符,这意味着它执行的工作从主要反应序列(Flux<T>)的角度来看并不是真正可见的。因此,用于测试 Flux 的工具中的 none 可以真正看到并测试副作用,除非您明确地使其可测试。

一种可能的方法是使测试中使用的 StringProcessor.stringParsingService 成为模拟的,或记录已解析字符串的特定于测试的实例,然后在 Flux 的末尾断言顺序。

请注意,您的 doOnNext 计算出 List,但此后不会使用该列表。 someFlux.doOnNext(function) 发出的元素与 someFlux 发出的元素完全相同,与 function 内部所做的无关(除非 function 抛出,但这是不同的故事)。