如何自定义我自己的 Observable?
How can I customize my own Observable?
我正在使用 retrofit2、rxjava2 和 adapter-rxjava 来实现我的 http api 调用。
//API definition
Observable<String> queryProducts(@Body Query query);
//API implementation.
serviceApi.queryProducts(query)
.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())
.subscribe(new Observer());
如果我有很多api需要实现,每一个api实现都需要加上这两行:
.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())
我不想在每个 api 实现中都添加它们。我想使用 MyObservable 作为我的 api 定义的结果类型。
我的想法如下所示:
//API definition
MyObservable<String> queryProducts(@Body Query query);
//MyObservable definition
public class MyObservable<T> extends Observable<T> {
/**
* Creates an Observable with a Function to execute when it is subscribed to.
* <p>
* <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
* unless you specifically have a need for inheritance.
*
* @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
*/
protected MyObservable(OnSubscribe<T> f) {
super(f);
this.subscribeOn(new Scheduler().ioThread());
this.observeOn(new Scheduler().mainThread());
}
}
当我 运行 它时,我遇到了以下异常:
java.lang.IllegalArgumentException: 无法创建呼叫适配器
MyObservable.
我跟踪了 RxJavaCallAdapterFactory.java 代码
https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java。我在第 100 行找到了 RxJavaCallAdapterFactory,它似乎只让 Observable class 通过这个检查点。我无法扩展和覆盖此方法,因为此 class 是最终的 class。
if (rawType != Observable.class && !isSingle && !isCompletable) {
return null;
}
有没有办法在超级 class 中添加这两行,我不想在每个 api 实现中添加它们?非常感谢。
虽然在 RxJava2 中你可以 ,但它不太可能适合这种你想重用公共代码而不是复制它的情况(但是从头开始创建 Observable,通常用于包装外部异步回调代码)。
相反,您可以使用 compose()
运算符,用您的自定义代码转换 Observable,它是向 Observable
.
添加通用逻辑的经典方法
您可以按照 Dan Lew's article 获取完全符合您需要的示例(添加 Schedulers
)。
正在对改装适配器进行重新升级,因为它正在创建带有反射的服务,所以它不支持自定义 类 但会生成现有的 RxJava 类.
顺便说一句,你looking/using用RxJava2改造了RxJava1适配器,你需要切换到RxJava2适配器,用RxJava2适配器你可以看到实际上改造使用它自己的自定义Observable
类.
如果使用 compose()
对您来说还不够(因为您仍然需要将它添加到每个 API),请使用官方方法创建您自己的 CallAdapter.Factory
并实施通过包装 RxJava2CallAdapterFactory
委派适应它来改进 CallAdapter
,然后用您的自定义 code/operators/schdeulers 包装 return Observable
。看到这个 tutorial. or example with RxJava2 (pretty the same) at some library I'm working on.
我正在使用 retrofit2、rxjava2 和 adapter-rxjava 来实现我的 http api 调用。
//API definition
Observable<String> queryProducts(@Body Query query);
//API implementation.
serviceApi.queryProducts(query)
.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())
.subscribe(new Observer());
如果我有很多api需要实现,每一个api实现都需要加上这两行:
.subscribeOn(new Scheduler().ioThread())
.observeOn(new Scheduler().mainThread())
我不想在每个 api 实现中都添加它们。我想使用 MyObservable 作为我的 api 定义的结果类型。
我的想法如下所示:
//API definition
MyObservable<String> queryProducts(@Body Query query);
//MyObservable definition
public class MyObservable<T> extends Observable<T> {
/**
* Creates an Observable with a Function to execute when it is subscribed to.
* <p>
* <em>Note:</em> Use {@link #create(OnSubscribe)} to create an Observable, instead of this constructor,
* unless you specifically have a need for inheritance.
*
* @param f {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
*/
protected MyObservable(OnSubscribe<T> f) {
super(f);
this.subscribeOn(new Scheduler().ioThread());
this.observeOn(new Scheduler().mainThread());
}
}
当我 运行 它时,我遇到了以下异常:
java.lang.IllegalArgumentException: 无法创建呼叫适配器 MyObservable.
我跟踪了 RxJavaCallAdapterFactory.java 代码 https://github.com/square/retrofit/blob/master/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java。我在第 100 行找到了 RxJavaCallAdapterFactory,它似乎只让 Observable class 通过这个检查点。我无法扩展和覆盖此方法,因为此 class 是最终的 class。
if (rawType != Observable.class && !isSingle && !isCompletable) {
return null;
}
有没有办法在超级 class 中添加这两行,我不想在每个 api 实现中添加它们?非常感谢。
虽然在 RxJava2 中你可以
相反,您可以使用 compose()
运算符,用您的自定义代码转换 Observable,它是向 Observable
.
添加通用逻辑的经典方法
您可以按照 Dan Lew's article 获取完全符合您需要的示例(添加 Schedulers
)。
正在对改装适配器进行重新升级,因为它正在创建带有反射的服务,所以它不支持自定义 类 但会生成现有的 RxJava 类.
顺便说一句,你looking/using用RxJava2改造了RxJava1适配器,你需要切换到RxJava2适配器,用RxJava2适配器你可以看到实际上改造使用它自己的自定义Observable
类.
如果使用 compose()
对您来说还不够(因为您仍然需要将它添加到每个 API),请使用官方方法创建您自己的 CallAdapter.Factory
并实施通过包装 RxJava2CallAdapterFactory
委派适应它来改进 CallAdapter
,然后用您的自定义 code/operators/schdeulers 包装 return Observable
。看到这个 tutorial. or example with RxJava2 (pretty the same) at some library I'm working on.