订阅和取消订阅每个发出的项目
Subscribe and Unsubscribe on Each Emitted Item
我正在使用 reactive-location 库。
我的用例是我有一个从可观察对象发出的对象流。这些物品可能每隔几个小时就会发出一次。一旦发出一个项目,我想获得一个位置并使用 zipWith(据我所知)发出一个包含该位置的对象。
问题是:由于对象只会每隔几个小时发射一次,我无法让可观察位置保持高温,因为它会耗尽电池。
所以我需要以下内容:将对象传递到流中后,订阅可观察的位置一旦获得位置,取消订阅可观察的位置。这必须持续进行。
据我了解,这个转换器负责取消订阅
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
但是一旦新对象被发送到流中,我该如何再次订阅?
听起来你需要的是在发射时将源项目与当前位置结合起来。这里不需要任何花哨的东西。只需在每个源项目上使用 flatMap()
将其与位置结合起来。
source.flatMap(item ->
locationProvider
.getLastKnownLocation()
.map(location -> new ItemWithLocation<>(item, location))
);
class ItemWithLocation<T> {
private final T item;
private final Location location;
public ItemWithLocation(T item, Location location) {
this.item = item;
this.location = location;
}
public T getItem() {
return item;
}
public Location getLocation() {
return location;
}
}
编辑:更新了第二个例子。以下将订阅位置更新,直到达到特定精度,然后将其与您的源项目结合起来。这里的关键是first()
的使用。只要您获得满足您需求的位置,使用它就会取消订阅位置提供商。
LocationRequest request =
LocationRequest
.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
source.flatMap(item ->
locationProvider
.getUpdatedLocation(request)
.first(location -> location.getAccuracy() < 5.0f)
.map(location -> new ItemWithLocation<>(item, location))
);
我正在使用 reactive-location 库。
我的用例是我有一个从可观察对象发出的对象流。这些物品可能每隔几个小时就会发出一次。一旦发出一个项目,我想获得一个位置并使用 zipWith(据我所知)发出一个包含该位置的对象。
问题是:由于对象只会每隔几个小时发射一次,我无法让可观察位置保持高温,因为它会耗尽电池。
所以我需要以下内容:将对象传递到流中后,订阅可观察的位置一旦获得位置,取消订阅可观察的位置。这必须持续进行。
据我了解,这个转换器负责取消订阅
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
但是一旦新对象被发送到流中,我该如何再次订阅?
听起来你需要的是在发射时将源项目与当前位置结合起来。这里不需要任何花哨的东西。只需在每个源项目上使用 flatMap()
将其与位置结合起来。
source.flatMap(item ->
locationProvider
.getLastKnownLocation()
.map(location -> new ItemWithLocation<>(item, location))
);
class ItemWithLocation<T> {
private final T item;
private final Location location;
public ItemWithLocation(T item, Location location) {
this.item = item;
this.location = location;
}
public T getItem() {
return item;
}
public Location getLocation() {
return location;
}
}
编辑:更新了第二个例子。以下将订阅位置更新,直到达到特定精度,然后将其与您的源项目结合起来。这里的关键是first()
的使用。只要您获得满足您需求的位置,使用它就会取消订阅位置提供商。
LocationRequest request =
LocationRequest
.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
source.flatMap(item ->
locationProvider
.getUpdatedLocation(request)
.first(location -> location.getAccuracy() < 5.0f)
.map(location -> new ItemWithLocation<>(item, location))
);