Observable超时逻辑:迁移到RxJava2
Observable timeout logic: migration to RxJava2
我正在慢慢地将我的应用程序从 RxJava 1 迁移到 RxJava 2。
更新所有代码后一切正常,除了一个用例,我现在有点不知所措,我想我需要返回文档以正确获取它。
应用程序从网络加载 Asset
的集合,并且此操作花费的时间超过 x 毫秒,它会显示加载动画。然后在获取数据的时候,动画是stopped/removed,数据显示。
这是我在 RxJava 1 中所拥有的并且正在运行:
getAssetsSubscription = new GetAssetsUseCase().execute()
.publish(new Func1<Observable<List<Asset>>, Observable<List<Asset>>>() {
@Override
public Observable<List<Asset>> call(Observable<List<Asset>> o) {
return o.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
}
)
).ignoreElements().mergeWith(o);
}
})
.subscribe(new Subscriber<List<Asset>>() {
@Override
public void onCompleted() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
});
这是我对 RxJava 2 的 "translation":有了这个,数据永远不会显示,onComplete
总是被调用,但 onNext
永远不会。在没有触发超时的情况下也是如此。
disposables.add(new GetAssetsUseCase().execute().publish(new Function<Observable<List<Asset>>,
ObservableSource<List<Asset>>>() {
@Override
public ObservableSource<List<Asset>> apply(Observable<List<Asset>> listObservable) throws
Exception {
return listObservable.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
})
).ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable();
}
})
.subscribeWith(new DisposableObserver<List<Asset>>() {
@Override
public void onComplete() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
}));
那是因为你的可调用 returns null
,这意味着它是一个终端事件。
@Override
public List<Asset> call() throws Exception {
if(isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
应该是
@Override
public List<Asset> call() throws Exception {
if(isAdded()) {
getActivity().runOnUiThread(new Runnable() { // TODO: move after `observeOn(AndroidSchedulers.mainThread())`
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return Collections.emptyList();
}
原码使用Observable#mergeWith(Observable)
。
由于 RxJava2 在适当的地方缩小了类型,这在您修改后的代码中更改为 Completable.mergeWith(Completable)
.
要获得与旧代码相同的行为,您需要更改操作顺序:
- 来自
.ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable()
- 到
.ignoreElements().<List<Asset>>toObservable().mergeWith(listObservable)
因为 Completable.fromObservable(...)
基本上等同于 Observable#ignoreElements()
.
此外,return null;
可能会导致 RxJava2 出现问题,因为合同规定事件流中不能有 null
值。考虑将 Observable.fromCallable(...)
替换为 Completable.fromRunnable(...).toObservable()
我正在慢慢地将我的应用程序从 RxJava 1 迁移到 RxJava 2。 更新所有代码后一切正常,除了一个用例,我现在有点不知所措,我想我需要返回文档以正确获取它。
应用程序从网络加载 Asset
的集合,并且此操作花费的时间超过 x 毫秒,它会显示加载动画。然后在获取数据的时候,动画是stopped/removed,数据显示。
这是我在 RxJava 1 中所拥有的并且正在运行:
getAssetsSubscription = new GetAssetsUseCase().execute()
.publish(new Func1<Observable<List<Asset>>, Observable<List<Asset>>>() {
@Override
public Observable<List<Asset>> call(Observable<List<Asset>> o) {
return o.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
}
)
).ignoreElements().mergeWith(o);
}
})
.subscribe(new Subscriber<List<Asset>>() {
@Override
public void onCompleted() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
});
这是我对 RxJava 2 的 "translation":有了这个,数据永远不会显示,onComplete
总是被调用,但 onNext
永远不会。在没有触发超时的情况下也是如此。
disposables.add(new GetAssetsUseCase().execute().publish(new Function<Observable<List<Asset>>,
ObservableSource<List<Asset>>>() {
@Override
public ObservableSource<List<Asset>> apply(Observable<List<Asset>> listObservable) throws
Exception {
return listObservable.timeout(LOADING_VIEW_THRESHOLD_MS, TimeUnit.MILLISECONDS,
Observable.fromCallable(new Callable<List<Asset>>() {
@Override
public List<Asset> call() throws Exception {
if (isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
})
).ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable();
}
})
.subscribeWith(new DisposableObserver<List<Asset>>() {
@Override
public void onComplete() {
// Do things...
}
@Override
public void onError(Throwable e) {
// Do things...
}
@Override
public void onNext(List<Asset> assets) {
// Do things...
}
}));
那是因为你的可调用 returns null
,这意味着它是一个终端事件。
@Override
public List<Asset> call() throws Exception {
if(isAdded()) {
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return null;
}
应该是
@Override
public List<Asset> call() throws Exception {
if(isAdded()) {
getActivity().runOnUiThread(new Runnable() { // TODO: move after `observeOn(AndroidSchedulers.mainThread())`
@Override
public void run() {
setLoadingViewVisibility(true);
}
});
}
return Collections.emptyList();
}
原码使用Observable#mergeWith(Observable)
。
由于 RxJava2 在适当的地方缩小了类型,这在您修改后的代码中更改为 Completable.mergeWith(Completable)
.
要获得与旧代码相同的行为,您需要更改操作顺序:
- 来自
.ignoreElements().mergeWith(Completable.fromObservable(listObservable)).toObservable()
- 到
.ignoreElements().<List<Asset>>toObservable().mergeWith(listObservable)
因为 Completable.fromObservable(...)
基本上等同于 Observable#ignoreElements()
.
此外,return null;
可能会导致 RxJava2 出现问题,因为合同规定事件流中不能有 null
值。考虑将 Observable.fromCallable(...)
替换为 Completable.fromRunnable(...).toObservable()