RX Java 2 顺序处理合约

RX Java 2 contract on sequential processing

TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间内只使用一个线程?

在我看来,RxJava2 通常是顺序的,除非通过 parallel() 之类的东西另外表示。即使使用 observeOn/subscribeOn,我也看到有例如对于 doOnNext():

,永远不会同时有两个线程 运行
AtomicInteger counter = new AtomicInteger();

PublishSubject<Integer> testSubject = PublishSubject.create();

testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
  if(counter.incrementAndGet() > 1)
    System.out.println("Whoa!!!!"); // <- never happens

  Thread.sleep(20);

  counter.decrementAndGet();
})
.subscribe();

for (int i = 0; i < 10000; i++) {
  Thread.sleep(10);
  testSubject.onNext(i);
}

无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核处理,否则我看不到不同的 doOnNext 运行同时线程。

我想依赖这个特性,这样我就可以忽略我的操作员中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了它,谁能告诉我合同的这一部分在哪里描述?

谢谢!

  1. 去掉Thread.sleep(10);炮击主题
  2. 在 worker 中睡更长的时间以模拟更长的 运行ning 任务。
  3. 在循环下添加Thread.sleep(10000),这样你的main线程就会等待后台线程完成它们的工作
  4. 如果您想强制每个工作人员在新线程上 运行,请更改您的线程池 .subscribeOn(Schedulers.newThread())

来自http://reactivex.io/documentation/contract.html

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

在您的示例中,您没有违反此可观察的合同。但是如果你实现 Observable 错误,两个线程将同时 运行:

AtomicInteger counter = new AtomicInteger();

    Observable.create(emitter -> {
        new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                try {
                    Thread.sleep(1);
                    emitter.onNext(i);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }).start();
        for (int i = 0; i < 10000; i++) {
            Thread.sleep(1);
            emitter.onNext(i);

        }
    }).doOnNext(integer -> {
        if (counter.incrementAndGet() > 1)
            System.out.println("Whoaa!");
        counter.decrementAndGet();
        Thread.sleep(1);

    }).subscribe();

看来您可以使用 observeOn https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-325114185

来解决此问题