onCompleted 在处理 RxJava concatMap 之前提前调用

onCompleted called early before processing RxJava concatMap

我有两个可观察对象并使用 concatDelayError 进行顺序处理。

我的问题是在处理之前提前调用了 onNext 和 onCompleted。 concatDelayError 我怎么知道所有的处理都已经完成了?

伪代码:

public Observable<Integer> concat(){
     int x = 10;
     int y = 20;


        Observable obx = Observable.create(emitter -> {
                    try {
                        int x = doSomeThing();
                        emitter.onNext(x);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);   
        Observable oby = Observable.create(emitter -> {
                    try {
                        int y = doSomeThing();
                        emitter.onNext(y);
                        emitter.onCompleted();
                    } catch (SQLiteException e) {
                        emitter.onError(e);
                    }
                }, Emitter.BackpressureMode.BUFFER);       
        Observable concated =  Observable.concatDelayError(ob1,ob2)
                 .compose(applySchedulers())
                 .replay().autoConnect();  

  }

    //somewhere else

    concat().subscribe(mReplaySubject);


    //somewhere else

     mReplaySubject.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            launchActivity(SplashActivity.this, HomeActivity.class);
                            SplashActivity.this.finish();
                        }    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }    
                        @Override
                        public void onNext(Integer value) {

                        }
                    });

我正在使用 reply() 和 autoConnect() 并通过 ReplySubject 进行订阅,因为我需要共享单个订阅。

您可以在 replay().autoConnect() 之前使用 doOnSubscribe,但您还需要一种方法来确保返回的 Observable 最多设置一次:

final class OnceObservable {

    final AtomicReference<Observable<Integer>> ref = new AtomicReference<>();

    int x;
    int y;

    Observable<Integer> concat() {
        Observable<Integer> obs = ref.get();
        if (obs != null) {
            return;
        }

        obs = Observable.concatDelayError(
            Observable.fromCallable(() -> doSomething()),
            Observable.fromCallable(() -> doSomethingElse())
        )
        .doOnSubscribe(() -> {
            x = 10;
            y = 20;
        })
        .replay().autoConnect();

        if (!ref.compareAndSet(null, obs)) {
            obs = ref.get();
        }
        return obs;
    }
}