RxJava 主题和错误处理

RxJava subjects and error handling

我正在尝试实现类似于事件总线的行为。根据我的要求,PublishSubject 似乎合适。

主题发出表示某些全局操作结果的项目,这些项目可能会成功解决或在出现异常时失败。我不能将 onNext() 用于成功事件,而将 onError()Throwable 一起使用以防出现错误,因为一旦 onError() 被调用,主题就会终止,任何未来的订阅者都将获得除了 onError() 之外没有排放。

现在我看到它的方式我必须创建一个 class 代表事件,并在出现错误时可选地引用 Throwable。然而,这似乎是不明智的,因为必须在 onNext().

中处理错误

你会怎么做?

创建通用 class 包装事件是一种可行的方法。假设我们称它为 ResponseOrError class,它应该基本上包含两个字段

private T data;
private Throwable error;

和两个简单​​的工厂方法:

public static <T> ResponseOrError<T> fromError(Throwable throwable) {
    return new ResponseOrError<>(throwable);
}

public static <T> ResponseOrError<T> fromData(T data) {
    return new ResponseOrError<>(data);
}

要删除一些样板代码,您可以提供 Transformer 来制作 ResponseOrError 类型的 Observable。

public static <T> Observable.Transformer<T, ResponseOrError<T>> toResponseOrErrorObservable() {
    return new Observable.Transformer<T, ResponseOrError<T>>() {

        @Override
        public Observable<ResponseOrError<T>> call(final Observable<T> observable) {
            return observable
                    .map(new Func1<T, ResponseOrError<T>>() {
                        @Override
                        public ResponseOrError<T> call(final T t) {
                            return ResponseOrError.fromData(t);
                        }
                    })
                    .onErrorResumeNext(new Func1<Throwable, Observable<? extends ResponseOrError<T>>>() {
                        @Override
                        public Observable<? extends ResponseOrError<T>> call(final Throwable throwable) {
                            return Observable.just(ResponseOrError.<T>fromError(throwable));
                        }
                    });
        }
    };
}

然后你就可以这样使用它了:

final Observable<ResponseOrError<ImportantData>> compose = mNetworkService
               .getImportantData()
               .compose(ResponseOrError.<ImportantData>toResponseOrErrorObservable());

现在您可以根据成功或失败轻松映射结果,甚至提供另一个 Transformer 返回映射的 Observable< T> 而不是Observable>