RxJava - 如何使用内部 "toBlocking" 调用测试函数

RxJava - How to test a function with inner "toBlocking" call

我正在尝试向以下代码添加一些单元测试:

public List<Stuff> extractStuff(Scheduler scheduler, List<Token> tokens) {
    List<Observable<List<Stuff>>> observables = tokens
            .stream()
            .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
            .collect(Collectors.toList());

    List<Stuff> result = new ArrayList<>();
    for (List<Stuff> stuff: Observable.merge(observables).toBlocking().toIterable()) {
        result.addAll(stuff);
    }

    return result;
}

我希望 Stuff 对象以并行方式获取,但我需要在继续之前收集所有对象(否则下一个过程没有任何意义)。

代码按预期工作,但我在单元测试方面遇到困难:

@Test
public void extractStuff() {
    // GIVEN
    TestScheduler scheduler = new TestScheduler();
    List<Token> tokens = buildTokens();
    ...
    // WHEN
    List<Stuff> result = this.instance.extractStuff(scheduler, tokens);
    // Execution never comes to this point...
    // THEN
    ...
}

使用调试器,我可以看到 Observable.just(...) 看起来不错(我的可观察对象列表不为空,我可以在里面看到我的自定义模拟对象)。

问题: 执行似乎卡在了 Observable.merge(observables).toBlocking() 表达式处。 result.addAll 行从未被调用。

我已经用 TestScheduler 对象尝试了几件事,但我无法让它工作。我在 Internet 上找到的大多数示例都在处理 returns Observable 对象的函数,因此关联的单元测试可以 运行 scheduler.advanceTimeBy(...);。在我的情况下,我无法应用这种方法,因为 Observable 对象没有直接返回。

非常感谢您的帮助!

我不确定发布的代码是否按您预期的方式运行。您的 getStuffByToken(...) 方法实际上是在调用线程上调用的,而不是由 Scheduler.

为简单起见,让我将 Token 替换为 Integer,将 Stuff 替换为 String

我的 getStuffByToken(...) 将 return IntegerString 表示,还将包括当前 Thread 的姓名:

private List<String> getStuffByToken( Integer token )
{
    return Arrays.asList( token.toString(), Thread.currentThread().getName() );
}

我可能使用不同版本的 RxJava,我没有 toBlocking(...) 方法但有 blockingIterable() - 我希望这是等价的:

public List<String> extractStuff(Scheduler scheduler, List<Integer> tokens) {
    List<Observable<List<String>>> observables = tokens
            .stream()
            .map(token-> Observable.just(getStuffByToken(token)).subscribeOn(scheduler))
            .collect(Collectors.toList());

    List<String> result = new ArrayList<>();
    for (List<String> stuff: Observable.merge(observables).blockingIterable()) {
        result.addAll(stuff);
    }

    return result;
}

如果我们测试以上:

@Test
public void testExtractStuff()
{
    List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
    List<String> result = extractStuff( Schedulers.computation(), tokens );
    System.out.println( result );
}

我们回来了:

[1, main, 3, main, 4, main, 5, main, 2, main]

如您所知,所有 getStuffByToken(...) 调用都是在 main Thread.

上执行的

接下来,您的测试方法未被调用的原因是因为 TestScheduler 需要调用 TestScheduler.advanceTimeBy(...) 来模拟导致处理 Rx 管道的时间流逝。由于您的方法是阻塞方法,因此使用 TestScheduler.

进行测试并不方便

根据以上两条信息,我建议您按照以下思路做一些事情:

public Single<List<String>> extractStuff( Scheduler scheduler, List<Integer> tokens )
{
    return Observable.fromIterable( tokens )
        .flatMap( token -> Observable.just( token )
                .subscribeOn( scheduler )
                .map( this::getStuffByToken )
                .flatMap( Observable::fromIterable ))
        .toList();
}

您的生产代码可以调用 extractStuff(...).blockingGet() 来解析 List

并且,您可以进行如下测试:

@Test
public void testExtractStuff()
{
    TestScheduler scheduler = new TestScheduler();
    List<Integer> tokens = Arrays.asList( 1, 2, 3, 4, 5 );
    TestObserver<List<String>> test = extractStuff( scheduler, tokens ).test();

    scheduler.advanceTimeBy( 1, TimeUnit.SECONDS );
    test.assertValueCount( 1 );
    test.assertValue( list -> list.size() == 10 );
    test.assertComplete();
}