RxJava2 subscribe 在一段时间后停止观察,但在 flowable 完成时继续观察
RxJava2 subscribe stops observing after a while but continues when flowable completes
我很难理解以下代码示例的行为;
Flowable<String> f = Flowable.just(1)
.flatMap(it -> Flowable.create(e -> {
for(int i = 1; i < 1001; ++i) {
log.info("Emitting: " + i);
if(i % 10 == 0) {
Thread.sleep(1000);
}
e.onNext(i);
}
e.onComplete();
}, BackpressureStrategy.BUFFER))
.map(String::valueOf)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread());
f.subscribe(val -> {
Thread.sleep(100);
log.info("Observing: " + val);
});
Thread.sleep(1000000);
代码工作正常,直到 subscribe
调用观察到 128 个项目。发射和观察是并行的。但在那之后,Flowable 继续发射项目(显然在某处排队)但直到发射所有 1000 个项目才观察到任何项目。发出所有 1000 个项目后,立即观察其余项目(> 128)。
这看起来与 backpressure bufferSize 为 128 有关,但我仍然希望发射和观察对于整个 1000 个项目是并行的,因为观察者显然不比发射器慢。我在这里缺少什么吗?我应该怎么做才能修复代码?
这是由于创建和 subscribeOn 之间存在相同的池死锁:
If there is a create(FlowableOnSubscribe, BackpressureStrategy)
type source up in the chain, it is recommended to use subscribeOn(scheduler, false)
instead to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
//...
.subscribeOn(Schedulers.io(), false)
//...
编辑:
I tried the original example (plus your suggested fix) by replacing Flowable.create with a Flowable.range but I didn't encounter a problem. Can you give an example when problems may occur?
Flowable.range(1, 10)
.subscribeOn(Schedulers.io(), false)
.doOnNext(v -> System.out.println(Thread.currentThread().getName()))
.observeOn(Schedulers.single(), false, 1)
.blockingSubscribe();
这最初打印 RxCachedThreadScheduler-1
然后 RxSingleScheduler-1
9 次,因为 observeOn
的补充请求将 运行 在单个调度程序上而不是路由回 io 调度程序。用 subscribeOn true 试试这个。
我很难理解以下代码示例的行为;
Flowable<String> f = Flowable.just(1)
.flatMap(it -> Flowable.create(e -> {
for(int i = 1; i < 1001; ++i) {
log.info("Emitting: " + i);
if(i % 10 == 0) {
Thread.sleep(1000);
}
e.onNext(i);
}
e.onComplete();
}, BackpressureStrategy.BUFFER))
.map(String::valueOf)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread());
f.subscribe(val -> {
Thread.sleep(100);
log.info("Observing: " + val);
});
Thread.sleep(1000000);
代码工作正常,直到 subscribe
调用观察到 128 个项目。发射和观察是并行的。但在那之后,Flowable 继续发射项目(显然在某处排队)但直到发射所有 1000 个项目才观察到任何项目。发出所有 1000 个项目后,立即观察其余项目(> 128)。
这看起来与 backpressure bufferSize 为 128 有关,但我仍然希望发射和观察对于整个 1000 个项目是并行的,因为观察者显然不比发射器慢。我在这里缺少什么吗?我应该怎么做才能修复代码?
这是由于创建和 subscribeOn 之间存在相同的池死锁:
If there is a
create(FlowableOnSubscribe, BackpressureStrategy)
type source up in the chain, it is recommended to usesubscribeOn(scheduler, false)
instead to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
//...
.subscribeOn(Schedulers.io(), false)
//...
编辑:
I tried the original example (plus your suggested fix) by replacing Flowable.create with a Flowable.range but I didn't encounter a problem. Can you give an example when problems may occur?
Flowable.range(1, 10)
.subscribeOn(Schedulers.io(), false)
.doOnNext(v -> System.out.println(Thread.currentThread().getName()))
.observeOn(Schedulers.single(), false, 1)
.blockingSubscribe();
这最初打印 RxCachedThreadScheduler-1
然后 RxSingleScheduler-1
9 次,因为 observeOn
的补充请求将 运行 在单个调度程序上而不是路由回 io 调度程序。用 subscribeOn true 试试这个。