RxJava Flowable 缓存到单一死锁
RxJava Flowable cache to Single deadlock
下面是我的代码片段。
我知道你不应该像这样阻止 cachedFlowable
,但这只是一个例子。
它卡在了 blockingGet
行。
如果我用singleElement
替换singleOrError
,代码仍然会卡住。如果我将 singleOrError
替换为 firstElement
,代码将不再卡住。
有人可以向我解释为什么会这样吗?
public static void main(String[] args) {
final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
cachedFlowable
.doOnNext(i -> {
System.out.println("doOnNext " + i);
final Integer j = cachedFlowable.singleOrError().blockingGet();
System.out.println("after blockingGet " + j);
})
.blockingSubscribe();
}
它与 singleX
运算符死锁的原因是此类运算符等待可能的第二项发射,但由于您阻止了它们,因此无法执行任何第二项或来自主源的完成。使用 firstX
,他们只关心第一个项目,因此几乎立即解锁,允许源完成。
所以是的,你不应该在这样的流程中使用阻塞方法,而是使用 flatMap
或 concatMap
来执行每个项目的子流程:
var cache = Flowable.just(1).cache();
cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();
下面是我的代码片段。
我知道你不应该像这样阻止 cachedFlowable
,但这只是一个例子。
它卡在了 blockingGet
行。
如果我用singleElement
替换singleOrError
,代码仍然会卡住。如果我将 singleOrError
替换为 firstElement
,代码将不再卡住。
有人可以向我解释为什么会这样吗?
public static void main(String[] args) {
final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
cachedFlowable
.doOnNext(i -> {
System.out.println("doOnNext " + i);
final Integer j = cachedFlowable.singleOrError().blockingGet();
System.out.println("after blockingGet " + j);
})
.blockingSubscribe();
}
它与 singleX
运算符死锁的原因是此类运算符等待可能的第二项发射,但由于您阻止了它们,因此无法执行任何第二项或来自主源的完成。使用 firstX
,他们只关心第一个项目,因此几乎立即解锁,允许源完成。
所以是的,你不应该在这样的流程中使用阻塞方法,而是使用 flatMap
或 concatMap
来执行每个项目的子流程:
var cache = Flowable.just(1).cache();
cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();