在 doOnSubscribe 中调用 subject.onNext()
Calling subject.onNext() inside doOnSubscribe
为什么在 doOnSubscribe
中调用 subject.onNext(o)
没有任何效果,但是调用 subject.onComplete()
会导致流终止!?
final PublishSubject<Integer> subject = PublishSubject.create();
final Observable<Integer> observable = subject.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
System.out.println("disposable = [" + disposable + "]");
subject.onNext(1);
//or
Observable.just(2, 3).subscribe(subject);
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d = [" + d.isDisposed() + "]");
}
@Override
public void onNext(Integer integer) {
System.out.println("item = [" + integer + "]");
}
@Override
public void onError(Throwable e) {
System.out.println("e = [" + e + "]");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});/*
expected:
disposable = [false]
d = [false]
item = 1
item = 2
item = 3
onComplete
but received :
disposable = [false]
d = [false]
onComplete
*/
当订阅 2.x 中的 Subject
时,表示连接的 Disposable
在特定 Observer
变得可见之前遍历 onSubscribe()
链一个 onNext
。如果您从 onSubscribe
调用 hasObservers
,您可以看到这一点,它会 return 错误直到 onSubscribe
实际上 return。
这是 Observable 协议所要求的,因为它不允许同时 运行 onSubscribe
和 onNext
,并且 onSubscribe
必须在 onNext
之前发生。如果不遵守此规则,则对 Subject.onNext
的并发调用将 运行 在 Observer.onSubscribe
调用之前甚至同时调用,并找到可能没有准备好的消费者。
由于 PublishSubject
不保留任何 onNext
调用,未观察到的 onNext
项将被丢弃。根据用例,您应该改用 BehaviorSubject
或 yourSubject.startWith(initialValue).subscribe()
从 Subject
.
中获取任何其他 onNext
之前的值
为什么在 doOnSubscribe
中调用 subject.onNext(o)
没有任何效果,但是调用 subject.onComplete()
会导致流终止!?
final PublishSubject<Integer> subject = PublishSubject.create();
final Observable<Integer> observable = subject.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
System.out.println("disposable = [" + disposable + "]");
subject.onNext(1);
//or
Observable.just(2, 3).subscribe(subject);
}
});
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("d = [" + d.isDisposed() + "]");
}
@Override
public void onNext(Integer integer) {
System.out.println("item = [" + integer + "]");
}
@Override
public void onError(Throwable e) {
System.out.println("e = [" + e + "]");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});/*
expected:
disposable = [false]
d = [false]
item = 1
item = 2
item = 3
onComplete
but received :
disposable = [false]
d = [false]
onComplete
*/
当订阅 2.x 中的 Subject
时,表示连接的 Disposable
在特定 Observer
变得可见之前遍历 onSubscribe()
链一个 onNext
。如果您从 onSubscribe
调用 hasObservers
,您可以看到这一点,它会 return 错误直到 onSubscribe
实际上 return。
这是 Observable 协议所要求的,因为它不允许同时 运行 onSubscribe
和 onNext
,并且 onSubscribe
必须在 onNext
之前发生。如果不遵守此规则,则对 Subject.onNext
的并发调用将 运行 在 Observer.onSubscribe
调用之前甚至同时调用,并找到可能没有准备好的消费者。
由于 PublishSubject
不保留任何 onNext
调用,未观察到的 onNext
项将被丢弃。根据用例,您应该改用 BehaviorSubject
或 yourSubject.startWith(initialValue).subscribe()
从 Subject
.
onNext
之前的值