订阅 Hot Observable w/o onComplete
Subscribing to Hot Observable w/o onComplete
我无法理解正确的语法来执行以下操作:
- 创建一个只执行一次且从不调用 onComplete 的 Hot Observable
- 让不同的 Observable 监听它并响应从热 Observable 发出的数据,而不强制热 Observable 调用 onComplete。
下面是一个简化的例子:
// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
while (true) {
Foo someData = listenToInputStream();
emitter.onNext(someData);
}
}
// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
.subscribeOn(Schedulers.io())
.publish();
cObs.connect();
Single<Foo> obs1 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
.filter(someFilter)
.singleOrError();
obs1.subscribe(someConsumer);
Single<Bar> obs2 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
.filter(someOtherFilter)
.map(fooToBar)
.singleOrError();
obs2.subscribe(someOtherConsumer);
我看到 firstOrError() 有效但 singleOrError()/lastOrError()/.takeLast(1) 无效。有没有办法在没有blocking/hanging的情况下获取符合过滤条件的最新消息?
FWIW,如果我执行 .take(1).singleOrError() 它会通过,但我认为这与 firstOrError() 相同。我正在寻找与观察者过滤器相匹配的最新数据。
我还有其他观察者监听从热观察者发出的任何 number/types 数据,这就是我为这些特定观察者调用 doOnSubscribe 而不是将输入流直接集成到观察者本身的原因。
根据 Skynets 的 Subject 想法,我认为如果我使用 PublishSubject 作为中间人,它有点管用。目前它的工作方式有点像 take() 但我想我可以对此进行扩展以使其更加灵活 return 1 到 N 项。
示例:
PublishSubject<Foo> pSubj = PublishSubject.create();
cObjs
.filter(getCorrectData)
.doOnSubscribe(x -> triggerEventToBeSentToInputStream)
.subscribe(x -> {
pSubj.onNext(x);
pSubj.complete();
});
Single<Foo> obs1 = pSubj
.subscribeOn(Schedulers.io())
//etc
我无法理解正确的语法来执行以下操作:
- 创建一个只执行一次且从不调用 onComplete 的 Hot Observable
- 让不同的 Observable 监听它并响应从热 Observable 发出的数据,而不强制热 Observable 调用 onComplete。
下面是一个简化的例子:
// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
while (true) {
Foo someData = listenToInputStream();
emitter.onNext(someData);
}
}
// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
.subscribeOn(Schedulers.io())
.publish();
cObs.connect();
Single<Foo> obs1 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
.filter(someFilter)
.singleOrError();
obs1.subscribe(someConsumer);
Single<Bar> obs2 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
.filter(someOtherFilter)
.map(fooToBar)
.singleOrError();
obs2.subscribe(someOtherConsumer);
我看到 firstOrError() 有效但 singleOrError()/lastOrError()/.takeLast(1) 无效。有没有办法在没有blocking/hanging的情况下获取符合过滤条件的最新消息?
FWIW,如果我执行 .take(1).singleOrError() 它会通过,但我认为这与 firstOrError() 相同。我正在寻找与观察者过滤器相匹配的最新数据。
我还有其他观察者监听从热观察者发出的任何 number/types 数据,这就是我为这些特定观察者调用 doOnSubscribe 而不是将输入流直接集成到观察者本身的原因。
根据 Skynets 的 Subject 想法,我认为如果我使用 PublishSubject 作为中间人,它有点管用。目前它的工作方式有点像 take() 但我想我可以对此进行扩展以使其更加灵活 return 1 到 N 项。
示例:
PublishSubject<Foo> pSubj = PublishSubject.create();
cObjs
.filter(getCorrectData)
.doOnSubscribe(x -> triggerEventToBeSentToInputStream)
.subscribe(x -> {
pSubj.onNext(x);
pSubj.complete();
});
Single<Foo> obs1 = pSubj
.subscribeOn(Schedulers.io())
//etc