如何使用 rxJava 实现一系列连续操作?
How to implement a sequence of consecutive operations using rxJava?
我的下载过程由 3 个连续操作组成:preProcess
、downloading
、postProcess
。每个操作都具有异步性质(preProcess
调用 API、downloading
等待文件下载等)。 UI 必须显示正在执行的操作(例如 "preparing..."、"downloading..."、"unpacking...")。
我将整个过程视为 Observable
,它发出整个操作的当前状态。每个操作也是一个可观察的,在执行开始时发出状态并在执行后完成。
Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.PRE_PROCESS);
doPreProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(final Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.DOWNLOADING);
doDownloading()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.POST_PROCESS);
doPostProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
一方面,每个操作都应该等到前面的操作完成。另一方面,订阅者需要接收每个发出的状态(例如 PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)
我不能使用 merge
因为每个操作都应该取决于前一个操作的完成。
我不能使用 flatMap
因为我不知道如何传播发射状态。我认为 Subject
可能是解决方案,但我也不知道如何传播发射状态。
如何使用 rxJava 解决此类问题?感谢任何 ideas/clues.
我的下载过程由 3 个连续操作组成:preProcess
、downloading
、postProcess
。每个操作都具有异步性质(preProcess
调用 API、downloading
等待文件下载等)。 UI 必须显示正在执行的操作(例如 "preparing..."、"downloading..."、"unpacking...")。
我将整个过程视为 Observable
,它发出整个操作的当前状态。每个操作也是一个可观察的,在执行开始时发出状态并在执行后完成。
Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.PRE_PROCESS);
doPreProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(final Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.DOWNLOADING);
doDownloading()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
@Override
public void call(Subscriber<? super DownloadStatus> subscriber) {
subscriber.onNext(DownloadStatus.POST_PROCESS);
doPostProcess()
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
subscriber.onCompleted();
}
});
}
});
一方面,每个操作都应该等到前面的操作完成。另一方面,订阅者需要接收每个发出的状态(例如 PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)
我不能使用 merge
因为每个操作都应该取决于前一个操作的完成。
我不能使用 flatMap
因为我不知道如何传播发射状态。我认为 Subject
可能是解决方案,但我也不知道如何传播发射状态。
如何使用 rxJava 解决此类问题?感谢任何 ideas/clues.