在 Android 应用中挂起时跳过 BehaviousSubject 的中间元素

Skip middle elements of a BehaviousSubject while pending in an Android app

我有 Android 使用 Rx-Java 的应用程序。 我有一个屏幕显示数据库中的模型列表 我有一个过滤器,用户在其中输入文本并过滤列表的结果。 为此,我有:

过滤器的行为主题

BehaviorSubject<String> filterEmmiter = BehaviorSubject.create();

// When users inputs a text
filterEmmiter.onNext(newText);

// To get items
Flowable<String> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
filter.flatMap((currentFilter)-> vacaDao.fullDataGroupedWhere(state,  stage, currentFilter));

问题是查询比用户输入文本慢,因此我希望在按下新键时,如果有查询尚未开始则丢弃它们。那是 BackpressureStrategy.LATEST 的想法,但它不起作用(调用所有查询)。

我在这个 中读到问题是观察器上有一个缓冲区,但是我不喜欢发布的解决方案。

有办法解决吗?

谢谢,

独立样本

我只想打印以下测试: D/TestTime: 值 processed:1 D/TestTime:值processed:10

    @Test
    public void someTest(){
        BehaviorSubject<Integer> filterEmmiter = BehaviorSubject.create();
        Flowable<Integer> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
        final int maxint = 10;
        new Thread(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            for (int i = 0; i <= maxint; i++) {
                filterEmmiter.onNext(i);
            }
        }).start();

        filter.flatMap(integer -> {
            return Flowable.fromCallable(() -> {
                longOperation();
                Log.d("TestTime", "value processed:"+integer);
                return integer;
            });
        }, false, 1, 1)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
                .filter(value -> value==maxint).blockingFirst();
    }

如评论所述,当有新的过滤文本时,您可以使用 switchMap 取消先前的查询。此外,您可能希望在触发下一个查询之前等待用户暂停输入:

filterEmmiter
.debounce(500, TimeUnit.MILLISECONDS)
.switchMap(currentFilter -> vacaDao.fullDataGroupedWhere(state,  stage, currentFilter))
...

编辑

如果要将查询执行限制为一次,即使用户不断键入,也请使用 maxConcurrency overload of flatMap

filterEmmiter
.toFlowable(BackpressureStrategy.LATEST)
.flatMap(currentFilter -> 
     vacaDao.fullDataGroupedWhere(state,  stage, currentFilter), 
     false,   // <-------- delayErrors
     1        // <-------- maxConcurrency
)
...

还有第 3 方运营商 flatMapLatest 可以直接通过 Observable 完成此操作。

编辑 2

根据评论,你必须让你的 fromCallable 异步,否则它会阻塞链并且背压将不起作用:

filter.flatMap(integer -> {
            return Flowable.fromCallable(() -> {
                longOperation();
                Log.d("TestTime", "value processed:"+integer);
                return integer;
            }).subscribeOn(Schedulers.io()); // <---------------------------------
        }, false, 1, 1)
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
        .filter(value -> value==maxint).blockingFirst();