使用 RxAndroid 的智能轮询
Smart Polling using RxAndroid
我想达到的目标:
我想通过每 5 分钟轮询一次来从网络上轮询一些资源,但前提是有观察者订阅。我使用 BehaviorSubject 和 interval observable 进行池化。我设法实现了它,但我是 Rx 的新手,我认为它可以做得更好。
我是这样做的:
private BehaviorSubject<String> observable;
private Subscription> subscription;
public Subscription subscribe(final Action1<String> action) {
if (observable == null) {
observable = BehaviorSubject.create();
}
if (subscription == null) {
Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return getDataFromServer();
}
}).observeOn(AndroidSchedulers.mainThread());
subscription = source.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if (observable.hasObservers()) {
observable.onNext(s);
} else {
subscription.unsubscribe();
subscription = null;
}
}
});
}
return observable.subscribe(action);
}
想法:
- 我有一个用于轮询的可观察源和另一个客户端可以订阅的可观察源(使用 BehaviourSubject 实现 - 所以他们总是获得最新数据)。当源 observable 发出一些东西时,如果 behaviorsubject 有订阅者,它就会被传递,否则什么也不会传递,我从源取消订阅,这样它就会停止。
怎么样:
Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS)
.doOnNext(new LoggingAction1<Long>("doOnNext"))
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long pulse) {
return Observable.just(String.format("Request %d", pulse));
}
})
.replay(1)
.refCount();
我认为它可以满足您的设置要求:
- 只要没有
Subscribers
它就什么都不做。
- 当第一个
Subscriber
订阅时,interval
启动并立即发出一个值,然后每 5 秒发出一个值。
- 新的
Subscriber
将立即获取最后一项,然后获取所有后续项。
- 只会启动一个
interval
- 因此每 5 秒只会执行一个网络请求 - 无论有多少 Subscribers
。
- 当所有
Subscribers
取消订阅后,interval
将停止发送项目。
我想达到的目标: 我想通过每 5 分钟轮询一次来从网络上轮询一些资源,但前提是有观察者订阅。我使用 BehaviorSubject 和 interval observable 进行池化。我设法实现了它,但我是 Rx 的新手,我认为它可以做得更好。
我是这样做的:
private BehaviorSubject<String> observable;
private Subscription> subscription;
public Subscription subscribe(final Action1<String> action) {
if (observable == null) {
observable = BehaviorSubject.create();
}
if (subscription == null) {
Observable<String> source = Observable.interval(5, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return getDataFromServer();
}
}).observeOn(AndroidSchedulers.mainThread());
subscription = source.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if (observable.hasObservers()) {
observable.onNext(s);
} else {
subscription.unsubscribe();
subscription = null;
}
}
});
}
return observable.subscribe(action);
}
想法: - 我有一个用于轮询的可观察源和另一个客户端可以订阅的可观察源(使用 BehaviourSubject 实现 - 所以他们总是获得最新数据)。当源 observable 发出一些东西时,如果 behaviorsubject 有订阅者,它就会被传递,否则什么也不会传递,我从源取消订阅,这样它就会停止。
怎么样:
Observable<String> observable = Observable.interval(0, 5, TimeUnit.SECONDS)
.doOnNext(new LoggingAction1<Long>("doOnNext"))
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long pulse) {
return Observable.just(String.format("Request %d", pulse));
}
})
.replay(1)
.refCount();
我认为它可以满足您的设置要求:
- 只要没有
Subscribers
它就什么都不做。 - 当第一个
Subscriber
订阅时,interval
启动并立即发出一个值,然后每 5 秒发出一个值。 - 新的
Subscriber
将立即获取最后一项,然后获取所有后续项。 - 只会启动一个
interval
- 因此每 5 秒只会执行一个网络请求 - 无论有多少Subscribers
。 - 当所有
Subscribers
取消订阅后,interval
将停止发送项目。