如何在最后一个订阅者退订后延迟拆除共享的、无限的 Observables
How to tear down shared, infinite Observables with a delay after the last subscriber unsubscribed
我们在 Android 应用程序中使用多项服务。这些服务以无限 Observables
形式提供其数据,这些数据通常是通过组合其他服务的 Observables
来构建的。
这些 Observables
的构建成本很高。此外,这些服务通常在多个地方使用,因此 Observable
应该在订阅者之间共享。
示例:
LocationService
,提供无限 Observable<Location>
,发出当前位置
ReminderService
,提供一个无限的Observable<List<Reminder>>
,它在数据集 中的每次更改后发出所有存储提醒的列表
LocationAwareReminderService
,通过 Observable.combineLatest
前两个服务的 Observables
提供无限 Observable<List<Reminders>>
的附近提醒
第一种方法:内部 BehaviorSubjects 作为缓存
每个服务都合并了消耗的 Observables
并订阅它的内部 BehaviorSubject
生成的提要。然后消费者可以订阅this BehaviorSubject
。
LocationAwareReminderService
例如:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).subscribe(cache);
feed = cache.asObservable();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
缺点:
- 由于 Behavior 主题,reminderService 和 locatoinService 的提要永远不会被拆除。即使没有消费者
- 如果它们依赖于不断发布新项目的服务(如 LocationService),这尤其有问题
- 由于构造函数中的订阅(缓存),即使没有订阅者,服务也会开始计算附近的提醒
优势:
- 生成的提要由所有订阅者共享
- 因为源永远不会被拆除,没有订阅者的短时间不会崩溃整个管道
第二种方法:replay(1).refCount().
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).replay(1).refCount();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
缺点:
- 没有
Subscriber
的短时间会使整个管道崩溃。在下一次订阅期间,需要重建整个管道。
- A 从
Activity
A 过渡到 Activity
B,两者都订阅了 LocationAwareReminderService.getFeed()
,导致管道的完全去化和重建
优势:
- 在最后一个
Subscriber
退订后,LocationAwareReminderService
也会退订LocationService.getFeed()
和reminderService.getFeed()
Observables
。
LocationAwareReminderService
仅在第一个 Subscriber
订阅后才开始提供 nearbyReminders
- 生成的提要由所有
Subscriber
s 共享
第三种方法:使用超时取消订阅 refCount
因此,我构建了一个 Transformer
,在最后一次 Subscriber
取消订阅后的一段定义的时间内保持订阅有效
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {
private long keepAlive;
private TimeUnit timeUnit;
public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
this.keepAlive = keepAlive;
this.timeUnit = timeUnit;
}
@Override
public Observable<T> call(Observable<T> upstream) {
final Observable<T> sharedUpstream = upstream.replay(1).refCount();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed())
return;
// subscribe an empty Subscriber that keeps the subsription of refCount() alive
final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
// listen to unsubscribe from the subscriber
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// the subscriber unsubscribed
Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
@Override
public void call(Long _) {
// unsubscribe the keep alive subscription
keepAliveSubscription.unsubscribe();
}
});
}
}));
sharedUpstream.subscribe(subscriber);
}
});
}
public class NopSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(T o) {}
}
}
LocationAwareReminderService
利用RxPublishTimeoutCache
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
优势:
LocationAwareReminderService
仅在第一个 Subscriber
订阅后才开始提供 nearbyReminders
- 生成的提要由所有订阅者共享
- 没有订阅者的短时间不会崩溃整个管道
- 在定义的时间段内没有订阅后,整个管道将被拆除
缺点:
- 可能有一些普遍的缺陷?
问题:
- 在 RxJava 中是否已经有其他方法可以实现这一点?
RxPublishTimeoutCache
中是否存在一些一般设计缺陷?
- 使用 RxJava 组合此类服务的整体策略是否存在缺陷?
我认为这是一个有趣的问题,而且似乎是一个有用的运算符,所以我在 rxjava-extras 中创建了 Transformers.delayFinalUnsubscribe
:
observable
.publish()
.refCount()
.compose(Transformers
.delayFinalUnsubscribe(1, TimeUnit.MINUTES));
Maven Central 从 0.7.9.1 开始在 rxjava-extras 中可用。喜欢的话试一试,看看有没有问题。
现在有一个需要超时的 refCount 重载,正是这样做的
我们在 Android 应用程序中使用多项服务。这些服务以无限 Observables
形式提供其数据,这些数据通常是通过组合其他服务的 Observables
来构建的。
这些 Observables
的构建成本很高。此外,这些服务通常在多个地方使用,因此 Observable
应该在订阅者之间共享。
示例:
LocationService
,提供无限Observable<Location>
,发出当前位置ReminderService
,提供一个无限的Observable<List<Reminder>>
,它在数据集 中的每次更改后发出所有存储提醒的列表
LocationAwareReminderService
,通过Observable.combineLatest
前两个服务的Observables
提供无限
Observable<List<Reminders>>
的附近提醒
第一种方法:内部 BehaviorSubjects 作为缓存
每个服务都合并了消耗的 Observables
并订阅它的内部 BehaviorSubject
生成的提要。然后消费者可以订阅this BehaviorSubject
。
LocationAwareReminderService
例如:
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).subscribe(cache);
feed = cache.asObservable();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
缺点:
- 由于 Behavior 主题,reminderService 和 locatoinService 的提要永远不会被拆除。即使没有消费者
- 如果它们依赖于不断发布新项目的服务(如 LocationService),这尤其有问题
- 由于构造函数中的订阅(缓存),即使没有订阅者,服务也会开始计算附近的提醒
优势:
- 生成的提要由所有订阅者共享
- 因为源永远不会被拆除,没有订阅者的短时间不会崩溃整个管道
第二种方法:replay(1).refCount().
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).replay(1).refCount();
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
缺点:
- 没有
Subscriber
的短时间会使整个管道崩溃。在下一次订阅期间,需要重建整个管道。 - A 从
Activity
A 过渡到Activity
B,两者都订阅了LocationAwareReminderService.getFeed()
,导致管道的完全去化和重建
优势:
- 在最后一个
Subscriber
退订后,LocationAwareReminderService
也会退订LocationService.getFeed()
和reminderService.getFeed()
Observables
。 LocationAwareReminderService
仅在第一个Subscriber
订阅后才开始提供 nearbyReminders- 生成的提要由所有
Subscriber
s 共享
第三种方法:使用超时取消订阅 refCount
因此,我构建了一个 Transformer
,在最后一次 Subscriber
取消订阅后的一段定义的时间内保持订阅有效
public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {
private long keepAlive;
private TimeUnit timeUnit;
public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
this.keepAlive = keepAlive;
this.timeUnit = timeUnit;
}
@Override
public Observable<T> call(Observable<T> upstream) {
final Observable<T> sharedUpstream = upstream.replay(1).refCount();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed())
return;
// subscribe an empty Subscriber that keeps the subsription of refCount() alive
final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
// listen to unsubscribe from the subscriber
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
// the subscriber unsubscribed
Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
@Override
public void call(Long _) {
// unsubscribe the keep alive subscription
keepAliveSubscription.unsubscribe();
}
});
}
}));
sharedUpstream.subscribe(subscriber);
}
});
}
public class NopSubscriber<T> extends Subscriber<T> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(T o) {}
}
}
LocationAwareReminderService
利用RxPublishTimeoutCache
public class LocationAwareReminderService {
Observable<List<Reminder>> feed;
public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
@Override
public List<Reminder> call(List<Reminder> reminders, Location location) {
return calculateNearbyReminders(reminders, location);
}
}).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
}
public Observable<List<Reminder>> getFeed() {
return feed;
}
}
优势:
LocationAwareReminderService
仅在第一个Subscriber
订阅后才开始提供 nearbyReminders- 生成的提要由所有订阅者共享
- 没有订阅者的短时间不会崩溃整个管道
- 在定义的时间段内没有订阅后,整个管道将被拆除
缺点:
- 可能有一些普遍的缺陷?
问题:
- 在 RxJava 中是否已经有其他方法可以实现这一点?
RxPublishTimeoutCache
中是否存在一些一般设计缺陷?- 使用 RxJava 组合此类服务的整体策略是否存在缺陷?
我认为这是一个有趣的问题,而且似乎是一个有用的运算符,所以我在 rxjava-extras 中创建了 Transformers.delayFinalUnsubscribe
:
observable
.publish()
.refCount()
.compose(Transformers
.delayFinalUnsubscribe(1, TimeUnit.MINUTES));
Maven Central 从 0.7.9.1 开始在 rxjava-extras 中可用。喜欢的话试一试,看看有没有问题。
现在有一个需要超时的 refCount 重载,正是这样做的