在 doOnSubscribe 中调用 subject.onNext()

Calling subject.onNext() inside doOnSubscribe

为什么在 doOnSubscribe 中调用 subject.onNext(o) 没有任何效果,但是调用 subject.onComplete() 会导致流终止!?

final PublishSubject<Integer> subject = PublishSubject.create();

    final Observable<Integer> observable = subject.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception {
            System.out.println("disposable = [" + disposable + "]");
            subject.onNext(1);
            //or
            Observable.just(2, 3).subscribe(subject);
        }
    });

    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("d = [" + d.isDisposed() + "]");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("item = [" + integer + "]");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("e = [" + e + "]");
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });/*
    expected:
    disposable = [false]
    d = [false]
    item = 1
    item = 2
    item = 3
    onComplete
    but received :
    disposable = [false]
    d = [false]
    onComplete
    */

当订阅 2.x 中的 Subject 时,表示连接的 Disposable 在特定 Observer 变得可见之前遍历 onSubscribe() 链一个 onNext。如果您从 onSubscribe 调用 hasObservers,您可以看到这一点,它会 return 错误直到 onSubscribe 实际上 return。

这是 Observable 协议所要求的,因为它不允许同时 运行 onSubscribeonNext,并且 onSubscribe 必须在 onNext 之前发生。如果不遵守此规则,则对 Subject.onNext 的并发调用将 运行 在 Observer.onSubscribe 调用之前甚至同时调用,并找到可能没有准备好的消费者。

由于 PublishSubject 不保留任何 onNext 调用,未观察到的 onNext 项将被丢弃。根据用例,您应该改用 BehaviorSubjectyourSubject.startWith(initialValue).subscribe()Subject.

中获取任何其他 onNext 之前的值