RxJava:observeOn、subscribeOn 和 doFinally,在 IO 和 UI 线程之间切换
RxJava: observeOn, subscribeOn, and doFinally, switching between IO and UI thread
我 运行 遇到一个问题,我的可观察对象在 IO 线程上订阅并在 android 主(UI)线程上观察,但 doFinally
运算符在 IO 线程上是 运行,它需要在 UI 线程上是 运行。
用例与这个medium article几乎完全一样。
我基本上想在 Observable 被订阅时显示 ProgressBar
并在 Observable 终止或完成时隐藏 ProgressBar
。
我收到的错误是:java.lang.IllegalStateException: 当前线程必须有循环程序!
任何人都可以帮我将 doFinally
操作移回具有循环程序的 UI 线程吗?还是我遗漏了一些其他信息?
编辑
用例工作流程是:
-> 启动 Activity
-> 初始化
-> 执行可观察流
-> 开始新的 Activity 并完成当前的 activity
-> 新 activity
-> 开始原始 activity 并完成
-> 重复初始化
非常感谢。
详情:
- RxJava 2.0.7
- RxAndroid 2.0.1
- Android sdk 最小 14 和目标 25
示例代码
listUseCase.execute(null)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.main())
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
堆栈跟踪:
FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.example.android.demo.customerfirst.alpha, PID: 16685
java.lang.IllegalStateException: The current thread must have a looper!
at android.view.Choreographer.initialValue(Choreographer.java:96)
at android.view.Choreographer.initialValue(Choreographer.java:91)
at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
at java.lang.ThreadLocal.get(ThreadLocal.java:65)
at android.view.Choreographer.getInstance(Choreographer.java:192)
at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
at android.animation.ValueAnimator.end(ValueAnimator.java:998)
at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
at android.view.View.dispatchVisibilityChanged(View.java:8643)
at android.view.View.setFlags(View.java:9686)
at android.view.View.setVisibility(View.java:6663)
at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator.run(ProductListPresenterMediator.java:56)
at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler.run(Scheduler.java:138)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
您需要做的就是将 observeOn
移到链上。 observeOn
方法更改调用 onNext
、onError
和 onCompleted
的线程,这是内部操作和副作用的工作方式(通过 lift)
listUseCase.execute(null)
.subscribeOn(schedulerProvider.io()) // Move subscribe on here
.observeOn(schedulerProvider.main()) // Change threads here
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true); // This should be on the main thread also
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
出现此问题是因为在 activity 为 finished/destroyed 时我没有处理订阅。
现在每个 activity/view 都会告诉演示者它们何时停止或销毁,并且演示者会处理订阅。
这似乎解决了我的问题。
@Override
public void initialize() {
if (!isViewAttached()) {
throw new ViewNotAttachedException();
}
disposable = listUseCase.execute(null)
.subscribeOn(schedulerProvider.io()) // Move subscribe on here
.observeOn(schedulerProvider.main()) // Change threads here
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true); // This should be on the main thread also
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
}
@Override
public void dispose() {
if (disposable != null) {
disposable.dispose();
}
}
也许为时已晚。
但是请查看有关 doFinally
的文档 here
他们明确地说“
Calls the specified action after this Observable signals onError
or onCompleted
or gets disposed by the downstream.
这意味着 doFinally
它无论如何都会被调用,并且不保证你有一个有效的上下文。
我不使用 doFinally
这个东西。我总是设置 loading(false) onNext
或 onError
。不漂亮但有效。
我 运行 遇到一个问题,我的可观察对象在 IO 线程上订阅并在 android 主(UI)线程上观察,但 doFinally
运算符在 IO 线程上是 运行,它需要在 UI 线程上是 运行。
用例与这个medium article几乎完全一样。
我基本上想在 Observable 被订阅时显示 ProgressBar
并在 Observable 终止或完成时隐藏 ProgressBar
。
我收到的错误是:java.lang.IllegalStateException: 当前线程必须有循环程序!
任何人都可以帮我将 doFinally
操作移回具有循环程序的 UI 线程吗?还是我遗漏了一些其他信息?
编辑 用例工作流程是:
-> 启动 Activity
-> 初始化
-> 执行可观察流
-> 开始新的 Activity 并完成当前的 activity
-> 新 activity
-> 开始原始 activity 并完成
-> 重复初始化
非常感谢。
详情:
- RxJava 2.0.7
- RxAndroid 2.0.1
- Android sdk 最小 14 和目标 25
示例代码
listUseCase.execute(null)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribeOn(schedulerProvider.io())
.observeOn(schedulerProvider.main())
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
堆栈跟踪:
FATAL EXCEPTION: RxCachedThreadScheduler-1
Process: com.example.android.demo.customerfirst.alpha, PID: 16685
java.lang.IllegalStateException: The current thread must have a looper!
at android.view.Choreographer.initialValue(Choreographer.java:96)
at android.view.Choreographer.initialValue(Choreographer.java:91)
at java.lang.ThreadLocal$Values.getAfterMiss(ThreadLocal.java:430)
at java.lang.ThreadLocal.get(ThreadLocal.java:65)
at android.view.Choreographer.getInstance(Choreographer.java:192)
at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:600)
at android.animation.ValueAnimator$AnimationHandler.<init>(ValueAnimator.java:575)
at android.animation.ValueAnimator.getOrCreateAnimationHandler(ValueAnimator.java:1366)
at android.animation.ValueAnimator.end(ValueAnimator.java:998)
at android.graphics.drawable.AnimatedVectorDrawable.stop(AnimatedVectorDrawable.java:439)
at android.widget.ProgressBar.stopAnimation(ProgressBar.java:1523)
at android.widget.ProgressBar.onVisibilityChanged(ProgressBar.java:1583)
at android.view.View.dispatchVisibilityChanged(View.java:8643)
at android.view.View.setFlags(View.java:9686)
at android.view.View.setVisibility(View.java:6663)
at android.widget.ProgressBar.setVisibility(ProgressBar.java:1563)
at com.example.android.demo.customerfirst.featuresstore.list.ProductListActivity.showLoading(ProductListActivity.java:121)
at com.example.android.demo.customerfirst.featuresstore.list.ProductListPresenterMediator.run(ProductListPresenterMediator.java:56)
at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.runFinally(ObservableDoFinally.java:144)
at io.reactivex.internal.operators.observable.ObservableDoFinally$DoFinallyObserver.onComplete(ObservableDoFinally.java:94)
at io.reactivex.internal.observers.DisposableLambdaObserver.onComplete(DisposableLambdaObserver.java:73)
at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:84)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:52)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableDoFinally.subscribeActual(ObservableDoFinally.java:45)
at io.reactivex.Observable.subscribe(Observable.java:10700)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler.run(Scheduler.java:138)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:818)
您需要做的就是将 observeOn
移到链上。 observeOn
方法更改调用 onNext
、onError
和 onCompleted
的线程,这是内部操作和副作用的工作方式(通过 lift)
listUseCase.execute(null)
.subscribeOn(schedulerProvider.io()) // Move subscribe on here
.observeOn(schedulerProvider.main()) // Change threads here
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true); // This should be on the main thread also
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
出现此问题是因为在 activity 为 finished/destroyed 时我没有处理订阅。
现在每个 activity/view 都会告诉演示者它们何时停止或销毁,并且演示者会处理订阅。
这似乎解决了我的问题。
@Override
public void initialize() {
if (!isViewAttached()) {
throw new ViewNotAttachedException();
}
disposable = listUseCase.execute(null)
.subscribeOn(schedulerProvider.io()) // Move subscribe on here
.observeOn(schedulerProvider.main()) // Change threads here
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
getView().showLoading(true); // This should be on the main thread also
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
getView().showLoading(false);
}
})
.subscribe(
new Consumer<List<AccountEntity>>() {
@Override
public void accept(@NonNull List<AccountEntity> accountEntities) throws Exception {
getView().setAccounts(accountEntities);
}
},
new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
if (isViewAttached()) {
getView().showError(throwable.getMessage());
}
}
}
);
}
@Override
public void dispose() {
if (disposable != null) {
disposable.dispose();
}
}
也许为时已晚。
但是请查看有关 doFinally
的文档 here
他们明确地说“
Calls the specified action after this Observable signals
onError
oronCompleted
or gets disposed by the downstream.
这意味着 doFinally
它无论如何都会被调用,并且不保证你有一个有效的上下文。
我不使用 doFinally
这个东西。我总是设置 loading(false) onNext
或 onError
。不漂亮但有效。