订阅 Hot Observable w/o onComplete

Subscribing to Hot Observable w/o onComplete

我无法理解正确的语法来执行以下操作:

  1. 创建一个只执行一次且从不调用 onComplete 的 Hot Observable
  2. 让不同的 Observable 监听它并响应从热 Observable 发出的数据,而不强制热 Observable 调用 onComplete。

下面是一个简化的例子:

// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
   while (true) {
     Foo someData = listenToInputStream();
     emitter.onNext(someData);
   }
}

// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
  .subscribeOn(Schedulers.io())
  .publish();
cObs.connect();

Single<Foo> obs1 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
 .filter(someFilter)
 .singleOrError();
obs1.subscribe(someConsumer);

Single<Bar> obs2 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
 .filter(someOtherFilter)
 .map(fooToBar)
 .singleOrError();
obs2.subscribe(someOtherConsumer);

我看到 firstOrError() 有效但 singleOrError()/lastOrError()/.takeLast(1) 无效。有没有办法在没有blocking/hanging的情况下获取符合过滤条件的最新消息?

FWIW,如果我执行 .take(1).singleOrError() 它会通过,但我认为这与 firstOrError() 相同。我正在寻找与观察者过滤器相匹配的最新数据。

我还有其他观察者监听从热观察者发出的任何 number/types 数据,这就是我为这些特定观察者调用 doOnSubscribe 而不是将输入流直接集成到观察者本身的原因。

根据 Skynets 的 Subject 想法,我认为如果我使用 PublishSubject 作为中间人,它有点管用。目前它的工作方式有点像 take() 但我想我可以对此进行扩展以使其更加灵活 return 1 到 N 项。

示例:

PublishSubject<Foo> pSubj = PublishSubject.create();
cObjs
.filter(getCorrectData)
.doOnSubscribe(x -> triggerEventToBeSentToInputStream)
.subscribe(x -> {
  pSubj.onNext(x);
  pSubj.complete();
});

Single<Foo> obs1 = pSubj
 .subscribeOn(Schedulers.io())
 //etc