如何自定义我自己的 Observable?

How can I customize my own Observable?

我正在使用 retrofit2、rxjava2 和 adapter-rxjava 来实现我的 http api 调用。

//API definition
Observable<String> queryProducts(@Body Query query);

//API implementation.
serviceApi.queryProducts(query)
                .subscribeOn(new Scheduler().ioThread())
                .observeOn(new Scheduler().mainThread())
                .subscribe(new Observer());

如果我有很多api需要实现,每一个api实现都需要加上这两行:

.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())

我不想在每个 api 实现中都添加它们。我想使用 MyObservable 作为我的 api 定义的结果类型。

我的想法如下所示:

//API definition
 MyObservable<String> queryProducts(@Body Query query);

//MyObservable definition
    public class MyObservable<T> extends Observable<T> {
        /**
         * Creates an Observable with a Function to execute when it is subscribed to.
         * <p>
         * <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
         * unless you specifically have a need for inheritance.
         *
         * @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
         */
        protected MyObservable(OnSubscribe<T> f) {
            super(f);
            this.subscribeOn(new Scheduler().ioThread());
            this.observeOn(new Scheduler().mainThread());
        }
    }

当我 运行 它时,我遇到了以下异常:

java.lang.IllegalArgumentException: 无法创建呼叫适配器 MyObservable.

我跟踪了 RxJavaCallAdapterFactory.java 代码 https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java。我在第 100 行找到了 RxJavaCallAdapterFactory,它似乎只让 Observable class 通过这个检查点。我无法扩展和覆盖此方法,因为此 class 是最终的 class。

if (rawType != Observable.class && !isSingle && !isCompletable) {
      return null;
}

有没有办法在超级 class 中添加这两行,我不想在每个 api 实现中添加它们?非常感谢。

虽然在 RxJava2 中你可以 ,但它不太可能适合这种你想重用公共代码而不是复制它的情况(但是从头开始创建 Observable,通常用于包装外部异步回调代码)。
相反,您可以使用 compose() 运算符,用您的自定义代码转换 Observable,它是向 Observable.
添加通用逻辑的经典方法 您可以按照 Dan Lew's article 获取完全符合您需要的示例(添加 Schedulers)。

正在对改装适配器进行重新升级,因为它正在创建带有反射的服务,所以它不支持自定义 类 但会生成现有的 RxJava 类.
顺便说一句,你looking/using用RxJava2改造了RxJava1适配器,你需要切换到RxJava2适配器,用RxJava2适配器你可以看到实际上改造使用它自己的自定义Observable 类.

如果使用 compose() 对您来说还不够(因为您仍然需要将它添加到每个 API),请使用官方方法创建您自己的 CallAdapter.Factory 并实施通过包装 RxJava2CallAdapterFactory 委派适应它来改进 CallAdapter,然后用您的自定义 code/operators/schdeulers 包装 return Observable。看到这个 tutorial. or example with RxJava2 (pretty the same) at some library I'm working on.