如何在 RxJava 中子类化 Observable?
How to subclass Observable in RxJava?
我知道你应该不惜一切代价避免这种情况,但是如果我在 RxJava 中有一个 subclassed Observable 的有效用例怎么办?可能吗?我该怎么做?
在这种特定情况下,我有一个 "repository" class 当前 returns 请求:
class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();
[...]
public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
Request<T> self = this;
return new Request<T>() {
@Override public Object key() { return self.key; }
@Override public Observable<T> asObservable() { return transformation.call(self); }
}
}
}
然后我在需要请求键(如缓存)的上下文中使用转换方法修改响应可观察对象 (asObservable):
service.getItemList() // <- returns a Request<List<Item>>
.transform(r -> r.asObservable()
// The activity is the current Activity in Android
.compose(Operators.ensureThereIsAnAccount(activity))
// The cache comes last because we don't need auth for cached responses
.compose(cache.cacheTransformation(r.key())))
.asObservable()
[... your common RxJava code ...]
现在,如果我的请求 class 是一个 Observable subclass,那将非常方便,因为我可以消除所有 .asObservable() 调用,而客户端甚至不需要了解我的请求 class.
可以subclass Observable(我们为Subject
s和ConnectableObservable
s做这个),但是需要额外考虑,因为你需要传入一个OnSubscribe
回调来处理你传入的 Subscriber
s。我不清楚如果有人订阅你的 Request 应该做什么,所以我会给你两个扩展 Observable 的例子:
无需共享可变状态即可观察
如果您没有要在订阅者之间共享的可变状态,您可以扩展 Observable 并将您的操作传递给 super
public final class MyObservable extends Observable<Long> {
public MyObservable() {
super(new OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> child) {
child.onNext(System.currentTimeMillis());
child.onCompleted();
}
});
}
}
可观察共享可变状态
这个通常比较棘手,因为您需要从 OnSubscribe
方法和您的 Observable 方法访问共享状态,但是 Java 不会让您从 OnSubscribe
内部 class 在 super
完成之前。解决方案是从构造函数中提取出这样的共享状态和 OnSubscribe
并使用静态工厂方法来设置两者:
public final class MySharedObservable extends Observable<Long> {
public static MySharedObservable create() {
final AtomicLong counter = new AtomicLong();
OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> t1) {
t1.onNext(counter.incrementAndGet());
t1.onCompleted();
}
};
return new MySharedObservable(onSubscribe, counter);
}
private AtomicLong counter;
private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
super(onSubscribe);
this.counter = counter;
}
public long getCounter() {
return counter.get();
}
}
我知道你应该不惜一切代价避免这种情况,但是如果我在 RxJava 中有一个 subclassed Observable 的有效用例怎么办?可能吗?我该怎么做?
在这种特定情况下,我有一个 "repository" class 当前 returns 请求:
class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();
[...]
public Request<T> transform(Func1<Request<T>, Observable<T>> transformation) {
Request<T> self = this;
return new Request<T>() {
@Override public Object key() { return self.key; }
@Override public Observable<T> asObservable() { return transformation.call(self); }
}
}
}
然后我在需要请求键(如缓存)的上下文中使用转换方法修改响应可观察对象 (asObservable):
service.getItemList() // <- returns a Request<List<Item>>
.transform(r -> r.asObservable()
// The activity is the current Activity in Android
.compose(Operators.ensureThereIsAnAccount(activity))
// The cache comes last because we don't need auth for cached responses
.compose(cache.cacheTransformation(r.key())))
.asObservable()
[... your common RxJava code ...]
现在,如果我的请求 class 是一个 Observable subclass,那将非常方便,因为我可以消除所有 .asObservable() 调用,而客户端甚至不需要了解我的请求 class.
可以subclass Observable(我们为Subject
s和ConnectableObservable
s做这个),但是需要额外考虑,因为你需要传入一个OnSubscribe
回调来处理你传入的 Subscriber
s。我不清楚如果有人订阅你的 Request 应该做什么,所以我会给你两个扩展 Observable 的例子:
无需共享可变状态即可观察
如果您没有要在订阅者之间共享的可变状态,您可以扩展 Observable 并将您的操作传递给 super
public final class MyObservable extends Observable<Long> {
public MyObservable() {
super(new OnSubscribe<Long>() {
@Override public void call(Subscriber<? super Long> child) {
child.onNext(System.currentTimeMillis());
child.onCompleted();
}
});
}
}
可观察共享可变状态
这个通常比较棘手,因为您需要从 OnSubscribe
方法和您的 Observable 方法访问共享状态,但是 Java 不会让您从 OnSubscribe
内部 class 在 super
完成之前。解决方案是从构造函数中提取出这样的共享状态和 OnSubscribe
并使用静态工厂方法来设置两者:
public final class MySharedObservable extends Observable<Long> {
public static MySharedObservable create() {
final AtomicLong counter = new AtomicLong();
OnSubscribe<Long> onSubscribe = new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> t1) {
t1.onNext(counter.incrementAndGet());
t1.onCompleted();
}
};
return new MySharedObservable(onSubscribe, counter);
}
private AtomicLong counter;
private MySharedObservable(OnSubscribe<Long> onSubscribe, AtomicLong counter) {
super(onSubscribe);
this.counter = counter;
}
public long getCounter() {
return counter.get();
}
}