并行执行的 Observable

Observables executed in parallel

我正在用 reactiveX Zip 做一些实验,我注意到我在 zip 中定义的可观察对象一个接一个地顺序执行。我认为 zip 的好处是 zip 中定义的每个单独的可观察对象都由一个线程执行,所以它们都是并行执行的。有什么办法可以实现我想要的吗? 这是我的 zip 示例

         @Test
public void testZip() {
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                     .concat(s3))
              .subscribe(System.out::println);
}

public Observable<String> obString() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("hello");
}

public Observable<String> obString1() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just(" world");
}

public Observable<String> obString2() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("!");
}

你看错了。

obString*都是在同一个线程上执行的,因为在testZip.

中调用它们的时候就执行了

您想要查看的是 可观察对象中发生的事情,仅使用 just 是不可能的,您需要一个自定义可观察对象并查看在 onSubscribe.

正文中的当前线程

此外,您可能想使用 scheduleOn 为您的 Observable 提供一个专门的新线程或线程池。