是否存在仅传播错误而不终止自身的可观察对象?

Is there an observable that just propagates the error without terminating itself?

我在 class 中使用 PublishSubject 负责同步。同步完成后,将通知所有订阅者。发生错误时也会发生同样的情况。 我注意到,下次我在发生错误后订阅时,它会立即 return 到订阅者。

所以 class 可能看起来像这样:

public class Synchronizer {
private final PublishSubject<Result> mSyncHeadObservable = PublishSubject.create();
    private final ThreadPoolExecutor mExecutor = new ThreadPoolExecutor(1, 1,
            10, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(true),
            new ThreadPoolExecutor.DiscardPolicy());


    public Observable<Result> syncHead(final int chunkSize) {
              mExecutor.execute(new Runnable() {
      @Override
      public void run() {
          try {
              //Do some work which either returns a result or throws an error
              //...

              mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
          } catch (Throwable error) {
              mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
          }
      }
  });

有没有可以作为代理的observable?可能是其他 Rx 方法?

更新: 我遵循了@akarnokd 的方法并发出了包装到 RxJava Notification 中的事件。然后通过 flatMap() 打开它们。所以 Synchronizer class 的客户不需要这样做。

    //...
private PublishSubject<Notification<Result>> mSyncHeadObservable = PublishSubject.create();

      public Observable<Result> syncHead(final int chunkSize) {

          return mSyncHeadObservable.flatMap(new Func1<Notification<Result>, Observable<Result>>() {
            @Override
            public Observable<Result> call(Notification<Result> result) {
                if (result.isOnError()) {
                    return Observable.error(result.getThrowable());
                }

                return Observable.just(result.getValue());
            }
        }).doOnSubscribe(
            new Action0() {
                @Override
                public void call() {
                    startHeadSync(chunkSize);
                }
            });
      }

      private void startHeadSync(final int chunkSize) {
          mExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //Do some work which either returns a result or throws an error
                    //...

                    mSyncHeadObservable.onNext(Notification.createOnNext(/*some result*/));
                } catch (Throwable error) {
                    mSyncHeadObservable.onError(Notification.<Result>createOnError(error));
                }
            }
          });
       }
  //...

发生错误时,可观察对象达到终止状态。

如果你想继续观察它,你应该重新订阅你的observable with retry operator 或者使用另一个错误handling operators

我不确定你想用这个设置实现什么,但一般来说,为了避免 PublishSubject 的终止条件,你应该将你的值和错误包装到一个通用结构中并始终发出那些,从来没有 onErroronCompleted。一种选择是使用 RxJava 自己的事件包装器,Notification,你的 Subscribers 应该解包值。