如何使用 rxJava 实现一系列连续操作?

How to implement a sequence of consecutive operations using rxJava?

我的下载过程由 3 个连续操作组成:preProcessdownloadingpostProcess。每个操作都具有异步性质(preProcess 调用 API、downloading 等待文件下载等)。 UI 必须显示正在执行的操作(例如 "preparing..."、"downloading..."、"unpacking...")。 我将整个过程视为 Observable,它发出整个操作的当前状态。每个操作也是一个可观察的,在执行开始时发出状态并在执行后完成。

    Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.PRE_PROCESS);
            doPreProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mDonwloadingOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(final Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.DOWNLOADING);
            doDownloading()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

    Observable<DownloadStatus> mPosProcessOperation = Observable.create(new Observable.OnSubscribe<DownloadStatus>() {
        @Override
        public void call(Subscriber<? super DownloadStatus> subscriber) {
            subscriber.onNext(DownloadStatus.POST_PROCESS);
            doPostProcess()
                    .subscribe(new Action1<File>() {
                        @Override
                        public void call(File file) {
                            subscriber.onCompleted();
                        }
                    });
        }
    });

一方面,每个操作都应该等到前面的操作完成。另一方面,订阅者需要接收每个发出的状态(例如 PRE_PROCESS -> DOWNLOADING -> POST_PROCESS -> onComplete)

我不能使用 merge 因为每个操作都应该取决于前一个操作的完成。 我不能使用 flatMap 因为我不知道如何传播发射状态。我认为 Subject 可能是解决方案,但我也不知道如何传播发射状态。

如何使用 rxJava 解决此类问题?感谢任何 ideas/clues.

concat 就是您所需要的。一旦前一个完成,这就会订阅串联的可观察对象。

concatMap 也像 flatMap 一样工作,但连接了扁平化的投影。关于这两者之间的区别,有一个很好的图表 here