在 Android 应用中挂起时跳过 BehaviousSubject 的中间元素
Skip middle elements of a BehaviousSubject while pending in an Android app
我有 Android 使用 Rx-Java 的应用程序。
我有一个屏幕显示数据库中的模型列表
我有一个过滤器,用户在其中输入文本并过滤列表的结果。
为此,我有:
过滤器的行为主题
BehaviorSubject<String> filterEmmiter = BehaviorSubject.create();
// When users inputs a text
filterEmmiter.onNext(newText);
// To get items
Flowable<String> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
filter.flatMap((currentFilter)-> vacaDao.fullDataGroupedWhere(state, stage, currentFilter));
问题是查询比用户输入文本慢,因此我希望在按下新键时,如果有查询尚未开始则丢弃它们。那是 BackpressureStrategy.LATEST 的想法,但它不起作用(调用所有查询)。
我在这个 中读到问题是观察器上有一个缓冲区,但是我不喜欢发布的解决方案。
有办法解决吗?
谢谢,
独立样本
我只想打印以下测试:
D/TestTime: 值 processed:1
D/TestTime:值processed:10
@Test
public void someTest(){
BehaviorSubject<Integer> filterEmmiter = BehaviorSubject.create();
Flowable<Integer> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
final int maxint = 10;
new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i <= maxint; i++) {
filterEmmiter.onNext(i);
}
}).start();
filter.flatMap(integer -> {
return Flowable.fromCallable(() -> {
longOperation();
Log.d("TestTime", "value processed:"+integer);
return integer;
});
}, false, 1, 1)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
.filter(value -> value==maxint).blockingFirst();
}
如评论所述,当有新的过滤文本时,您可以使用 switchMap
取消先前的查询。此外,您可能希望在触发下一个查询之前等待用户暂停输入:
filterEmmiter
.debounce(500, TimeUnit.MILLISECONDS)
.switchMap(currentFilter -> vacaDao.fullDataGroupedWhere(state, stage, currentFilter))
...
编辑
如果要将查询执行限制为一次,即使用户不断键入,也请使用 maxConcurrency overload of flatMap
filterEmmiter
.toFlowable(BackpressureStrategy.LATEST)
.flatMap(currentFilter ->
vacaDao.fullDataGroupedWhere(state, stage, currentFilter),
false, // <-------- delayErrors
1 // <-------- maxConcurrency
)
...
还有第 3 方运营商 flatMapLatest
可以直接通过 Observable
完成此操作。
编辑 2
根据评论,你必须让你的 fromCallable
异步,否则它会阻塞链并且背压将不起作用:
filter.flatMap(integer -> {
return Flowable.fromCallable(() -> {
longOperation();
Log.d("TestTime", "value processed:"+integer);
return integer;
}).subscribeOn(Schedulers.io()); // <---------------------------------
}, false, 1, 1)
.observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
.filter(value -> value==maxint).blockingFirst();
我有 Android 使用 Rx-Java 的应用程序。 我有一个屏幕显示数据库中的模型列表 我有一个过滤器,用户在其中输入文本并过滤列表的结果。 为此,我有:
过滤器的行为主题
BehaviorSubject<String> filterEmmiter = BehaviorSubject.create();
// When users inputs a text
filterEmmiter.onNext(newText);
// To get items
Flowable<String> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
filter.flatMap((currentFilter)-> vacaDao.fullDataGroupedWhere(state, stage, currentFilter));
问题是查询比用户输入文本慢,因此我希望在按下新键时,如果有查询尚未开始则丢弃它们。那是 BackpressureStrategy.LATEST 的想法,但它不起作用(调用所有查询)。
我在这个
有办法解决吗?
谢谢,
独立样本
我只想打印以下测试: D/TestTime: 值 processed:1 D/TestTime:值processed:10
@Test
public void someTest(){
BehaviorSubject<Integer> filterEmmiter = BehaviorSubject.create();
Flowable<Integer> filter = filterEmmiter.toFlowable(BackpressureStrategy.LATEST);
final int maxint = 10;
new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i <= maxint; i++) {
filterEmmiter.onNext(i);
}
}).start();
filter.flatMap(integer -> {
return Flowable.fromCallable(() -> {
longOperation();
Log.d("TestTime", "value processed:"+integer);
return integer;
});
}, false, 1, 1)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
.filter(value -> value==maxint).blockingFirst();
}
如评论所述,当有新的过滤文本时,您可以使用 switchMap
取消先前的查询。此外,您可能希望在触发下一个查询之前等待用户暂停输入:
filterEmmiter
.debounce(500, TimeUnit.MILLISECONDS)
.switchMap(currentFilter -> vacaDao.fullDataGroupedWhere(state, stage, currentFilter))
...
编辑
如果要将查询执行限制为一次,即使用户不断键入,也请使用 maxConcurrency overload of flatMap
filterEmmiter
.toFlowable(BackpressureStrategy.LATEST)
.flatMap(currentFilter ->
vacaDao.fullDataGroupedWhere(state, stage, currentFilter),
false, // <-------- delayErrors
1 // <-------- maxConcurrency
)
...
还有第 3 方运营商 flatMapLatest
可以直接通过 Observable
完成此操作。
编辑 2
根据评论,你必须让你的 fromCallable
异步,否则它会阻塞链并且背压将不起作用:
filter.flatMap(integer -> {
return Flowable.fromCallable(() -> {
longOperation();
Log.d("TestTime", "value processed:"+integer);
return integer;
}).subscribeOn(Schedulers.io()); // <---------------------------------
}, false, 1, 1)
.observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
.filter(value -> value==maxint).blockingFirst();