RxJava - 两个 Observable 源,仅结合特定值的输出

RxJava - two Observable sources, combine output only on certain values

问题:

我应该使用哪个运算符或转换以最少的代码量和最有效的方式组合两个数据流?

ViewPager、RxBinding、事件流

使用 Android ViewPager,我正在观察事件(使用 RxBinding)

1) OnPageSelected(页面当前可见)

Observable<Integer> pageSelectObs = RxViewPager.pageSelections(mPlaceImageViewPager);

2) OnPageScrollStateChanged(滑动开始 = 1,运动 = 2,完成 = 0)

Observable<Integer> scrollStateObs = RxViewPager.pageScrollStateChanges(mPlaceImageViewPager);

整数流如下所示:

I/System.out: Page: 0 ScrollState: 1
I/System.out: Page: 0 ScrollState: 2
I/System.out: Page: 1 ScrollState: 2
I/System.out: Page: 1 ScrollState: 0
I/System.out: Page: 1 ScrollState: 1
I/System.out: Page: 1 ScrollState: 2
I/System.out: Page: 2 ScrollState: 2
I/System.out: Page: 2 ScrollState: 0

我只在以下情况感兴趣:

当前代码

这是我目前观察的方式:

Disposable d = ObservableCombineLatest.combineLatest(pageSelectObs, scrollStateObs, new BiFunction<Integer, Integer, Integer>() {
    @Override
    public Integer apply(@NonNull Integer pageSelected, @NonNull Integer scrollState) throws Exception {
        AUtils.logSystem(TAG, "Page: %s ScrollState: %s", pageSelected, scrollState);

        if (adapter.isLastVisibleItem(pageSelected) && adapter.hasHiddenItemsRight() && scrollState == 0) {
            return 1;
        }

        if (adapter.isFirstVisibleItem(pageSelected) && adapter.hasHiddenItemsLeft() && scrollState == 0) {
            return -1;
        }
        return 0;
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer doAction) throws Exception {
        if (doAction == -1) {
            AUtils.logSystem(TAG, "shift LEFT");
            adapter.shiftLeft();
        }
        if (doAction == 1) {
            AUtils.logSystem(TAG, "shift RIGHT");
            adapter.shiftRight();
        }
    }
});

是否有更简单的方法来完成上述操作?

好吧'the most efficient way'取决于您的要求以及您如何定义最有效。是时间,还是资源?

我采用了您的代码并添加了 50 毫秒的速率限制-window,突发事件不会过于频繁地调用 onNext。

在 MAP-opreator 中,您将向枚举添加一些匹配项,因为 -1 和 1 不是代表值。

@Test
void name() throws Exception {
    Observable<Integer> pageSelectObs = Observable.just(0, 0, 1, 1, 1, 1, 2, 2);
    Observable<Integer> scrollStateObs = Observable.just(1, 2, 2, 0, 1, 2, 2, 0);

    // Test-Obs
    Observable<ShiftOperation> filter = Observable.combineLatest(pageSelectObs, scrollStateObs, Combined::new)
            .window(50, TimeUnit.MILLISECONDS)
            .flatMap(combinedObservable -> combinedObservable.filter(combined -> combined.scrollState == 0)
                    .takeLast(1))
            .map(combined -> {
                // do mapping here

                return ShiftOperation.SHIFT_LEFT; // do your adapter... check here and decide which operation you want to return.
            });

    filter.test()
            .await()
            .assertValueCount(1);
}

数据结构:

class Combined {
    final int pageState;
    final int scrollState;

    Combined(int pageState, int scrollState) {
        this.pageState = pageState;
        this.scrollState = scrollState;
    }
}

enum ShiftOperation {
    SHIFT_LEFT,
    SHIFT_RIGHT
}

由于您的条件非常简单,您可以使用简单的 filter() 运算符来表达它们。

Observable<Integer> scrollStateObs = RxViewPager.pageScrollStateChanges(mPlaceImageViewPager)
        .filter(scrollState -> scrollState == ViewPager.SCROLL_STATE_IDLE);

为了仅对 scrollState 变化做出反应,您可以使用 withLatestFrom() 运算符

Disposable d = pageSelectObs.withLatestFrom(scrollStateObs, (pageSelected, scrollState) -> pageSelected)
        .filter(pageSelected -> adapter.isLastVisibleItem(pageSelected));
        .subscribe(pageSelected -> {
            if (adapter.hasHiddenItemsRight()) {
                adapter.shiftRight();
            } else if (adapter.hasHiddenItemsLeft()) {
                adapter.shiftRight();
            }
        });