rxjava 2.2.2,偶尔死锁
rxjava 2.2.2, Occasionally deadlock
rxjava 2.2.2,偶尔会在转储线程中死锁
代码如下,有问题吗?
Observable.fromCallable(() -> {
Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable
.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),false, 5)
.observeOn(Schedulers.io())
.buffer(3)
.blockingSubscribe(types -> {
// ...
});
return Result.success();
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});
线程转储如下
"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
at com.test.Test.lambda$test(Test.java:221)
at com.test.Test$$Lambda/1184820793.call(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
at io.reactivex.Observable.subscribe(Observable.java:12090)
at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)
希望得到解答,非常感谢
如果您在 RxJava 中使用 blockingX
方法,这些方法可能会导致死锁。鉴于您的示例,根本没有理由使用 blockingSubscribe
。试试这个:
Observable.defer(() -> {
return Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),
false, 5
)
.observeOn(Schedulers.io())
.buffer(3)
.doOnNext(types -> {
// ...
})
.ignoreElements()
.andThen(Observable.just(Result.success()));
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});
rxjava 2.2.2,偶尔会在转储线程中死锁
代码如下,有问题吗?
Observable.fromCallable(() -> {
Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable
.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),false, 5)
.observeOn(Schedulers.io())
.buffer(3)
.blockingSubscribe(types -> {
// ...
});
return Result.success();
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});
线程转储如下
"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
at com.test.Test.lambda$test(Test.java:221)
at com.test.Test$$Lambda/1184820793.call(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
at io.reactivex.Observable.subscribe(Observable.java:12090)
at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)
希望得到解答,非常感谢
如果您在 RxJava 中使用 blockingX
方法,这些方法可能会导致死锁。鉴于您的示例,根本没有理由使用 blockingSubscribe
。试试这个:
Observable.defer(() -> {
return Observable
.intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
.flatMap(i -> Observable.fromCallable(() -> service.getType(type))
.subscribeOn(Schedulers.io()),
false, 5
)
.observeOn(Schedulers.io())
.buffer(3)
.doOnNext(types -> {
// ...
})
.ignoreElements()
.andThen(Observable.just(Result.success()));
}
).retryWhen(new RetryWithDelay(5, 2000))
.observeOn(Schedulers.io(), true)
.subscribeOn(Schedulers.io())
.subscribe(result -> {
}, throwable -> {
});