Rxjava2 方法类似于 BiFunction 的背压
Rxjava2 approach similar to backpressure for BiFunction
我用sqlbrite监听了tablea和b的变化。并使用 combineLatest 运算符组合 sqlbrite 生成的可观察值。在 BiFunction 过程中,observableA 和 observableB 的发射项。
private CompositeDisposable mSubscriptions = new CompositeDisposable();
private void initialize(){
QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null);
QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null);
ResourceSubscriber subscriber = Flowable.combineLatest(
RxJavaInterop.toV2Observable(observableA
.mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
,
RxJavaInterop.toV2Observable(observableB
.mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
, new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() {
@Override
public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception {
List<ResultItem> resultItems = convertToResultItems(aItems, bItems); // long process here, convert aItems and bItems to resultItems
return resultItems;
}
}
)
.onBackpressureLatest()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new ResourceSubscriber<List<ResultItem>>() {
@Override
public void onNext(List<ResultItem> resultItems) {
adapter.addData(resultItems);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
mSubscriptions.add(subscriber);
}
问题:如果BiFunction 运行太长(例如10秒),即比observables触发间隔长(例如observables每1秒触发一次),那将导致 BiFunction 做不必要的工作,因为我只需要最新发出的项目,但 BiFunction 是一个接一个地处理发出的项目,所以 BiFunction 将处理旧的发出的项目,我不需要处理它。
我希望 BiFunction 在 BiFunction 中每次完成 apply() 时跳过旧的发出项并处理最新发出的项,以减少资源浪费并节省时间。 rxjava 是否有类似 BiFunction 背压的方法或其他方法来解决这个问题?
图中显示了当前和预期的 BiFunction 时间线。 figure link
我找到了两种方法来解决这个问题,但都存在缺陷。
方法一:将"aItems"和"bItems"配对,然后将引用传递给switchMap并处理作业。
缺陷:switchMap 只向订阅者发送最新的项目,但仍然做不必要的工作。
方法 2: 也结合 "aItems" 和 "bItems" 然后将引用传递给 onNext 并处理作业。
缺陷:阻塞了 UI 线程。
您可以只传递 combineLatest
的组合器函数中的一对值,并使用 observeOn
将计算置于原始源线程之外:
.combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b))
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.map(pair -> compute(pair))
.observeOn(AndroidSchedulers.mainThread())
...
我用sqlbrite监听了tablea和b的变化。并使用 combineLatest 运算符组合 sqlbrite 生成的可观察值。在 BiFunction 过程中,observableA 和 observableB 的发射项。
private CompositeDisposable mSubscriptions = new CompositeDisposable();
private void initialize(){
QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null);
QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null);
ResourceSubscriber subscriber = Flowable.combineLatest(
RxJavaInterop.toV2Observable(observableA
.mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
,
RxJavaInterop.toV2Observable(observableB
.mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
, new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() {
@Override
public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception {
List<ResultItem> resultItems = convertToResultItems(aItems, bItems); // long process here, convert aItems and bItems to resultItems
return resultItems;
}
}
)
.onBackpressureLatest()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new ResourceSubscriber<List<ResultItem>>() {
@Override
public void onNext(List<ResultItem> resultItems) {
adapter.addData(resultItems);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
mSubscriptions.add(subscriber);
}
问题:如果BiFunction 运行太长(例如10秒),即比observables触发间隔长(例如observables每1秒触发一次),那将导致 BiFunction 做不必要的工作,因为我只需要最新发出的项目,但 BiFunction 是一个接一个地处理发出的项目,所以 BiFunction 将处理旧的发出的项目,我不需要处理它。 我希望 BiFunction 在 BiFunction 中每次完成 apply() 时跳过旧的发出项并处理最新发出的项,以减少资源浪费并节省时间。 rxjava 是否有类似 BiFunction 背压的方法或其他方法来解决这个问题?
图中显示了当前和预期的 BiFunction 时间线。 figure link
我找到了两种方法来解决这个问题,但都存在缺陷。
方法一:将"aItems"和"bItems"配对,然后将引用传递给switchMap并处理作业。
缺陷:switchMap 只向订阅者发送最新的项目,但仍然做不必要的工作。
方法 2: 也结合 "aItems" 和 "bItems" 然后将引用传递给 onNext 并处理作业。
缺陷:阻塞了 UI 线程。
您可以只传递 combineLatest
的组合器函数中的一对值,并使用 observeOn
将计算置于原始源线程之外:
.combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b))
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.map(pair -> compute(pair))
.observeOn(AndroidSchedulers.mainThread())
...