订阅和取消订阅每个发出的项目

Subscribe and Unsubscribe on Each Emitted Item

我正在使用 reactive-location 库。

我的用例是我有一个从可观察对象发出的对象流。这些物品可能每隔几个小时就会发出一次。一旦发出一个项目,我想获得一个位置并使用 zipWith(据我所知)发出一个包含该位置的对象。

问题是:由于对象只会每隔几个小时发射一次,我无法让可观察位置保持高温,因为它会耗尽电池。

所以我需要以下内容:将对象传递到流中后,订阅可观察的位置一旦获得位置,取消订阅可观察的位置。这必须持续进行。

据我了解,这个转换器负责取消订阅

public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
    return new Observable.Transformer<T, T>() {

        @Override
        public Observable<T> call(Observable<T> tObservable) {
            final BehaviorSubject subject = BehaviorSubject.create();
            Observable source = tObservable.doOnNext(new Action1<T>() {
                @Override
                public void call(T t) {
                    subject.onNext(t);
                }
            });
            return Observable
                    .merge(source.takeUntil(subject), subject)
                    .take(1);
        }

    };
}

但是一旦新对象被发送到流中,我该如何再次订阅?

听起来你需要的是在发射时将源项目与当前位置结合起来。这里不需要任何花哨的东西。只需在每个源项目上使用 flatMap() 将其与位置结合起来。

source.flatMap(item ->
        locationProvider
                .getLastKnownLocation()
                .map(location -> new ItemWithLocation<>(item, location))
);

class ItemWithLocation<T> {
    private final T item;
    private final Location location;

    public ItemWithLocation(T item, Location location) {
        this.item = item;
        this.location = location;
    }

    public T getItem() {
        return item;
    }

    public Location getLocation() {
        return location;
    }
}

编辑:更新了第二个例子。以下将订阅位置更新,直到达到特定精度,然后将其与您的源项目结合起来。这里的关键是first()的使用。只要您获得满足您需求的位置,使用它就会取消订阅位置提供商。

LocationRequest request = 
        LocationRequest
            .create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);

source.flatMap(item ->
        locationProvider
                .getUpdatedLocation(request)
                .first(location -> location.getAccuracy() < 5.0f)
                .map(location -> new ItemWithLocation<>(item, location))
);