rxjava 2.2.2,偶尔死锁

rxjava 2.2.2, Occasionally deadlock

rxjava 2.2.2,偶尔会在转储线程中死锁

代码如下,有问题吗?

Observable.fromCallable(() -> {
    Observable
        .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
        .flatMap(i -> Observable
        .fromCallable(() -> service.getType(type))
        .subscribeOn(Schedulers.io()),false, 5)
        .observeOn(Schedulers.io())
        .buffer(3)
        .blockingSubscribe(types -> {
            // ...
        });
        return Result.success();
    }
).retryWhen(new RetryWithDelay(5, 2000))
    .observeOn(Schedulers.io(), true)
    .subscribeOn(Schedulers.io())
    .subscribe(result -> {
        
    }, throwable -> {
        
    });

线程转储如下

"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
        at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
        at com.test.Test.lambda$test(Test.java:221)
        at com.test.Test$$Lambda/1184820793.call(Unknown Source)
        at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)

希望得到解答,非常感谢

如果您在 RxJava 中使用 blockingX 方法,这些方法可能会导致死锁。鉴于您的示例,根本没有理由使用 blockingSubscribe 。试试这个:

Observable.defer(() -> {
    return Observable
        .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
        .flatMap(i -> Observable.fromCallable(() -> service.getType(type))
                     .subscribeOn(Schedulers.io()),
              false, 5
        )
        .observeOn(Schedulers.io())
        .buffer(3)
        .doOnNext(types -> {
            // ...
        })
        .ignoreElements()
        .andThen(Observable.just(Result.success()));
    }
).retryWhen(new RetryWithDelay(5, 2000))
    .observeOn(Schedulers.io(), true)
    .subscribeOn(Schedulers.io())
    .subscribe(result -> {
        
    }, throwable -> {
        
    });