RX Java 2 顺序处理合约
RX Java 2 contract on sequential processing
TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间内只使用一个线程?
在我看来,RxJava2 通常是顺序的,除非通过 parallel() 之类的东西另外表示。即使使用 observeOn/subscribeOn,我也看到有例如对于 doOnNext():
,永远不会同时有两个线程 运行
AtomicInteger counter = new AtomicInteger();
PublishSubject<Integer> testSubject = PublishSubject.create();
testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
if(counter.incrementAndGet() > 1)
System.out.println("Whoa!!!!"); // <- never happens
Thread.sleep(20);
counter.decrementAndGet();
})
.subscribe();
for (int i = 0; i < 10000; i++) {
Thread.sleep(10);
testSubject.onNext(i);
}
无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核处理,否则我看不到不同的 doOnNext 运行同时线程。
我想依赖这个特性,这样我就可以忽略我的操作员中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了它,谁能告诉我合同的这一部分在哪里描述?
谢谢!
- 去掉
Thread.sleep(10);
炮击主题
- 在 worker 中睡更长的时间以模拟更长的 运行ning 任务。
- 在循环下添加
Thread.sleep(10000)
,这样你的main线程就会等待后台线程完成它们的工作
- 如果您想强制每个工作人员在新线程上 运行,请更改您的线程池
.subscribeOn(Schedulers.newThread())
。
来自http://reactivex.io/documentation/contract.html
Observables must issue notifications to observers serially (not in
parallel). They may issue these notifications from different threads,
but there must be a formal happens-before relationship between the
notifications.
在您的示例中,您没有违反此可观察的合同。但是如果你实现 Observable
错误,两个线程将同时 运行:
AtomicInteger counter = new AtomicInteger();
Observable.create(emitter -> {
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
try {
Thread.sleep(1);
emitter.onNext(i);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}).start();
for (int i = 0; i < 10000; i++) {
Thread.sleep(1);
emitter.onNext(i);
}
}).doOnNext(integer -> {
if (counter.incrementAndGet() > 1)
System.out.println("Whoaa!");
counter.decrementAndGet();
Thread.sleep(1);
}).subscribe();
看来您可以使用 observeOn
https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-325114185
来解决此问题
TL; DR - 是否保证默认情况下,在观察 Observable 发出的事件时,在任何给定时间内只使用一个线程?
在我看来,RxJava2 通常是顺序的,除非通过 parallel() 之类的东西另外表示。即使使用 observeOn/subscribeOn,我也看到有例如对于 doOnNext():
,永远不会同时有两个线程 运行AtomicInteger counter = new AtomicInteger();
PublishSubject<Integer> testSubject = PublishSubject.create();
testSubject
.observeOn(Schedulers.io())
.doOnNext(val -> {
if(counter.incrementAndGet() > 1)
System.out.println("Whoa!!!!"); // <- never happens
Thread.sleep(20);
counter.decrementAndGet();
})
.subscribe();
for (int i = 0; i < 10000; i++) {
Thread.sleep(10);
testSubject.onNext(i);
}
无论我如何更改此示例 - 除非我使用 .toFlowable(...).parallel().runOn(...) 进行硬核处理,否则我看不到不同的 doOnNext 运行同时线程。
我想依赖这个特性,这样我就可以忽略我的操作员中的同步问题,但是我从来没有在 RxJava2、RxJava1 甚至一般的 RX 的文档中看到它明确指定。也许我只是错过了它,谁能告诉我合同的这一部分在哪里描述?
谢谢!
- 去掉
Thread.sleep(10);
炮击主题 - 在 worker 中睡更长的时间以模拟更长的 运行ning 任务。
- 在循环下添加
Thread.sleep(10000)
,这样你的main线程就会等待后台线程完成它们的工作 - 如果您想强制每个工作人员在新线程上 运行,请更改您的线程池
.subscribeOn(Schedulers.newThread())
。
来自http://reactivex.io/documentation/contract.html
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
在您的示例中,您没有违反此可观察的合同。但是如果你实现 Observable
错误,两个线程将同时 运行:
AtomicInteger counter = new AtomicInteger();
Observable.create(emitter -> {
new Thread(() -> {
for (int i = 0; i < 10000; i++) {
try {
Thread.sleep(1);
emitter.onNext(i);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}).start();
for (int i = 0; i < 10000; i++) {
Thread.sleep(1);
emitter.onNext(i);
}
}).doOnNext(integer -> {
if (counter.incrementAndGet() > 1)
System.out.println("Whoaa!");
counter.decrementAndGet();
Thread.sleep(1);
}).subscribe();
看来您可以使用 observeOn
https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-325114185