如何在最后一个订阅者退订后延迟拆除共享的、无限的 Observables

How to tear down shared, infinite Observables with a delay after the last subscriber unsubscribed

我们在 Android 应用程序中使用多项服务。这些服务以无限 Observables 形式提供其数据,这些数据通常是通过组合其他服务的 Observables 来构建的。 这些 Observables 的构建成本很高。此外,这些服务通常在多个地方使用,因此 Observable 应该在订阅者之间共享。

示例:

第一种方法:内部 BehaviorSubjects 作为缓存

每个服务都合并了消耗的 Observables 并订阅它的内部 BehaviorSubject 生成的提要。然后消费者可以订阅this BehaviorSubjectLocationAwareReminderService 例如:

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        BehaviorSubject<List<Reminder>> cache = BehaviorSubject.create();
        Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).subscribe(cache);

        feed = cache.asObservable();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

缺点:

优势:

第二种方法:replay(1).refCount().

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).replay(1).refCount();
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

缺点:

优势:

第三种方法:使用超时取消订阅 refCount

因此,我构建了一个 Transformer,在最后一次 Subscriber 取消订阅后的一段定义的时间内保持订阅有效

public class RxPublishTimeoutCache<T> implements Observable.Transformer<T, T> {

    private long keepAlive;
    private TimeUnit timeUnit;

    public RxPublishTimeoutCache(long keepAlive, TimeUnit timeUnit) {
        this.keepAlive = keepAlive;
        this.timeUnit = timeUnit;
    }

    @Override
    public Observable<T> call(Observable<T> upstream) {

        final Observable<T> sharedUpstream = upstream.replay(1).refCount();

        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                if (subscriber.isUnsubscribed())
                    return;
                // subscribe an empty Subscriber that keeps the subsription of refCount() alive
                final Subscription keepAliveSubscription = sharedUpstream.subscribe(new NopSubscriber<T>());
                // listen to unsubscribe from the subscriber
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        // the subscriber unsubscribed
                        Observable.timer(keepAlive, timeUnit).subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long _) {
                                // unsubscribe the keep alive subscription
                                keepAliveSubscription.unsubscribe();
                            }
                        });
                    }
                }));
                sharedUpstream.subscribe(subscriber);
            }
        });
    }

    public class NopSubscriber<T> extends Subscriber<T> {
        @Override
        public void onCompleted() {}
        @Override
        public void onError(Throwable e) {}
        @Override
        public void onNext(T o) {}
    }
}

LocationAwareReminderService利用RxPublishTimeoutCache

public class LocationAwareReminderService {

    Observable<List<Reminder>> feed;

    public LocationAwareReminderService(ReminderService reminderService, LocationService locationService) {
        feed = Observable.combineLatest(reminderService.getFeed(), locationService.getFeed(), new Func2<List<Reminder>, Location, List<Reminder>>() {
            @Override
            public List<Reminder> call(List<Reminder> reminders, Location location) {
                return calculateNearbyReminders(reminders, location);
            }
        }).compose(new RxPublishTimeoutCache<List<Reminder>>(10, TimeUnit.SECONDS));
    }

    public Observable<List<Reminder>> getFeed() {
        return feed;
    }
}

优势:

缺点:

问题:

我认为这是一个有趣的问题,而且似乎是一个有用的运算符,所以我在 rxjava-extras 中创建了 Transformers.delayFinalUnsubscribe:

observable
  .publish()
  .refCount()
  .compose(Transformers
      .delayFinalUnsubscribe(1, TimeUnit.MINUTES));

Maven Central 从 0.7.9.1 开始在 rxjava-extras 中可用。喜欢的话试一试,看看有没有问题。

现在有一个需要超时的 refCount 重载,正是这样做的