RxJava - 如何在发出新项目时停止 运行 任务

RxJava - How to stop a running task when new item emitted

我正在使用 RxJava 为我的 Android 应用构建搜索功能。当用户 type/change 查询 EditText 中的字符时,发射器将发出新的查询文本,我搜索我的数据库以获取与查询匹配的结果。

getEditTextObservable()
                .subscribeOn(Schedulers.computation())
                .debounce(500, TimeUnit.MILLISECONDS)
                .filter {
                    !it.isNullOrEmpty()
                }
                .map {
                    //start searching
                    getResultsInDatabase(it) //this function takes a long time to complete
                }.observeOn(AndroidSchedulers.mainThread())
                .subscribe{
                    //render results on screen
                }

方法getResultsInDatabase(string:String)需要很长时间才能完成,当查询文本发生变化时,我想停止方法getResultsInDatabase(string:String)(以防它是运行之前发出的查询) 运行 再次使用新查询。我该怎么做才能实现这一目标?我将不胜感激你的帮助。感谢您阅读我的问题。

您可以使用 valveRxJava 中暂停流。根据它的值,流继续发出或将保持暂停状态。

您需要 switchMap.

getEditTextObservable()
    .subscribeOn(Schedulers.computation())
    .debounce(500, TimeUnit.MILLISECONDS)
    .filter { !it.isNullOrEmpty() } //Be careful with this. Should clearing text clear results too?
    .switchMap { 
        Observable.fromCallable { getResultsInDatabase(it) }.subscribeOn(Schedulers.io()) 
    }.observeOn(AndroidSchedulers.mainThread())
    .subscribe{
        //render results on screen
    }