RxAndroid:创建简单的 Hot Observable
RxAndroid: Create Simple Hot Observable
我正在创建一个在订阅时发出整数的 Observable。我现在的实现已经设置好,因此订阅它的行为会从一开始就触发生成,如下所示:
private Observable createObservable() {
return Observable.create (
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> sub) {
for (int i = 1; i < MAX_PROGRESS + 1; i++) {
sub.onNext(i);
SystemClock.sleep(1000);
}
sub.onCompleted();
}
}
);
}
我的理解是这是一个冷的 Observable。我希望生成的序列与任何订阅者无关,并且当订阅者订阅时,希望他们接收在订阅时恰好是最新的值。 IOW,把它变成一个热 Observable。我宁愿不将 Observable 子类化,因为它将它绑定到一个具体的 Integer 中,而实际上实际类型会有所不同。
在您的示例中,您每次调用函数时都使用 "Observable.create"。 "Hot" 订阅从 Observable 保存一些实例。您还需要使用一些 Rx 方法(缓存(),重试())在代码中它看起来像:
public Observable<Bitmap> mObservable;
Subscriber<Bitmap> mSubscriber;
Subscription mSubscription;
Bitmap loadedBitmap;
....
@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container,
Bundle savedInstanceState) {
if (savedInstanceState != null) {
loadedBitmap = savedInstanceState.getParcelable("LoadedBitmap");
imageView.setImageBitmap(loadedBitmap);
}
else {
runNewObservable();
}
runSubscribe();
return mainView;
}
....
private void runNewObservable () {
mObservable =
Observable.create(new Observable.OnSubscribe<Bitmap>() {
@Override
public void call(Subscriber<? super Bitmap> subscriber) {
subscriber.onNext(new LoadingImage().loadImageFrom(imageURL));
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.cache();
}
private void runNewSubscribe () {
mSubscriber = new Subscriber<Bitmap>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Bitmap bitmap) {
loadedBitmap = bitmap;
imageView.setImageBitmap(bitmap);
}
};
}
@Override
public void onResume() {
super.onResume();
mSubscription = mObservable.subscribe(mSubscriber);
}
@Override
public void onSaveInstanceState(Bundle outState) {
super.onSaveInstanceState(outState);
outState.putParcelable("LoadedBitmap", loadedBitmap);
}
@Override
public void onStop() {
super.onStop();
mSubscription.unsubscribe();
}
....
如你所见。我仅使用 mSubscription.subscribe() 和 mSubscription.unsubscribe() 并仅在 saveInstanceState 为 null 时创建 Observable。
查看 rx.subjects.BehaviorSubject<T>
。如果您不熟悉 rx.subjects.Subject
s,我能想到的最通用的描述方式是它们打破了 A 点和 B 点之间订阅的连续性。如何同时成为 Observer<T>
;可以接受来自多个来源的 onNext()
s(警告:需要外部线程安全)。另一方面,一个主题也是一个 Observable<T>
,因此多个 Observer<T>
可以订阅,并且 onNext()
进入将被多播到每个下游 Observer<T>
.
如果你的代码看起来像
Observable<T> src = ...;
Subscriber<T> dst;
src.subscribe(dst);
使用BehaviorSubject的方法是
Observable<T> src = ...;
BehaviorSubject<T> subject = BehaviorSubject.create(defaultValue);
src.subscribe(subject);
立即订阅源,主题将以最快的速度呈现。 BehaviorSubject 只保留最近的值并删除 defaultValue 和所有以前的值。
// safe to do multiple times.
Subscriber<T> dst;
subject.subscribe(dst);
订阅 dst
在订阅后立即从 src
(或 defaultValue
)接收最新值,然后是所有后续值,直到 dst
取消订阅。
警告:受试者有过度使用的倾向,所以确保你需要一个。
我正在创建一个在订阅时发出整数的 Observable。我现在的实现已经设置好,因此订阅它的行为会从一开始就触发生成,如下所示:
private Observable createObservable() {
return Observable.create (
new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> sub) {
for (int i = 1; i < MAX_PROGRESS + 1; i++) {
sub.onNext(i);
SystemClock.sleep(1000);
}
sub.onCompleted();
}
}
);
}
我的理解是这是一个冷的 Observable。我希望生成的序列与任何订阅者无关,并且当订阅者订阅时,希望他们接收在订阅时恰好是最新的值。 IOW,把它变成一个热 Observable。我宁愿不将 Observable 子类化,因为它将它绑定到一个具体的 Integer 中,而实际上实际类型会有所不同。
在您的示例中,您每次调用函数时都使用 "Observable.create"。 "Hot" 订阅从 Observable 保存一些实例。您还需要使用一些 Rx 方法(缓存(),重试())在代码中它看起来像:
public Observable<Bitmap> mObservable;
Subscriber<Bitmap> mSubscriber;
Subscription mSubscription;
Bitmap loadedBitmap;
....
@Override
public View onCreateView(LayoutInflater inflater, ViewGroup container,
Bundle savedInstanceState) {
if (savedInstanceState != null) {
loadedBitmap = savedInstanceState.getParcelable("LoadedBitmap");
imageView.setImageBitmap(loadedBitmap);
}
else {
runNewObservable();
}
runSubscribe();
return mainView;
}
....
private void runNewObservable () {
mObservable =
Observable.create(new Observable.OnSubscribe<Bitmap>() {
@Override
public void call(Subscriber<? super Bitmap> subscriber) {
subscriber.onNext(new LoadingImage().loadImageFrom(imageURL));
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.cache();
}
private void runNewSubscribe () {
mSubscriber = new Subscriber<Bitmap>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Bitmap bitmap) {
loadedBitmap = bitmap;
imageView.setImageBitmap(bitmap);
}
};
}
@Override
public void onResume() {
super.onResume();
mSubscription = mObservable.subscribe(mSubscriber);
}
@Override
public void onSaveInstanceState(Bundle outState) {
super.onSaveInstanceState(outState);
outState.putParcelable("LoadedBitmap", loadedBitmap);
}
@Override
public void onStop() {
super.onStop();
mSubscription.unsubscribe();
}
....
如你所见。我仅使用 mSubscription.subscribe() 和 mSubscription.unsubscribe() 并仅在 saveInstanceState 为 null 时创建 Observable。
查看 rx.subjects.BehaviorSubject<T>
。如果您不熟悉 rx.subjects.Subject
s,我能想到的最通用的描述方式是它们打破了 A 点和 B 点之间订阅的连续性。如何同时成为 Observer<T>
;可以接受来自多个来源的 onNext()
s(警告:需要外部线程安全)。另一方面,一个主题也是一个 Observable<T>
,因此多个 Observer<T>
可以订阅,并且 onNext()
进入将被多播到每个下游 Observer<T>
.
如果你的代码看起来像
Observable<T> src = ...;
Subscriber<T> dst;
src.subscribe(dst);
使用BehaviorSubject的方法是
Observable<T> src = ...;
BehaviorSubject<T> subject = BehaviorSubject.create(defaultValue);
src.subscribe(subject);
立即订阅源,主题将以最快的速度呈现。 BehaviorSubject 只保留最近的值并删除 defaultValue 和所有以前的值。
// safe to do multiple times.
Subscriber<T> dst;
subject.subscribe(dst);
订阅 dst
在订阅后立即从 src
(或 defaultValue
)接收最新值,然后是所有后续值,直到 dst
取消订阅。
警告:受试者有过度使用的倾向,所以确保你需要一个。