RxJava2 - 订阅 PublishSubject

RxJava2 - Subscribing PublishSubject

 private val searchSubject =  PublishSubject.create<Boolean>()
private val compositeDisposable = CompositeDisposable()

fun textChange(){
    searSubject.onNext(true)
}

fun getSubject(){
    compositeDisposable += searchSubject
        .doOnNext {
            if (it) showLoading()
        }
        .switchMap { searchGithubReposObservable() }
        .subscribeWith(object : DisposableObserver<List<GithubRepo>>() {
            override fun onNext(t: List<GithubRepo>) {
                hideLoading()
                adapter.items = t
            }

            override fun onComplete() {
            }

            override fun onError(e: Throwable) {
                hideLoading()
            }
        })
}

为了学习RxJava,我在github中搜索了示例代码。 但是上面的代码我看不懂

我知道要从 PublishSubject 接收数据,我需要订阅它。

在上面的代码中,我认为 subscribeWith 订阅了 searchGithubReposObservable() 的 return Observable ,但是我可以在调用 textchange() 时从 PublishSubject 获取数据。

为什么可能?

是的,您是否可以在调用 textchange() 方法时获取数据我在键入文本时实现了此类功能api在 textchange 上被调用并且我收到了代码下方的数据。

已写请查收

 autocompletetextview.debounce(500L, TimeUnit.MILLISECONDS)
                    .distinctUntilChanged()
                    .filter { it.trim().isNotEmpty() || it.isEmpty() }
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .flatMap {
                        Observable.just(callapi here )
                    }
                    .subscribe({
                        it.subscribe({ serviceResponse ->
                            if (serviceResponse.meta.status == KeyUtils.HTTP_SUCCESS ||
                                    serviceResponse.meta.status == KeyUtils.STATUS_META_ERROR) {
                                setSuccessResponse(serviceResponse, true)
                            } else {
                                setSuccessResponse(serviceResponse, false)
                            }
                        }, { throwable ->
                            setErrorResponse(throwable)
                        }).collect()

您正在收听发布主题的 RX 链的开始。

compositeDisposable += searchSubject
    .doOnNext {
        if (it) showLoading()
    }

每次调用方法 textChange() 时,都会推送到 searchSubject,它会再次触发 RX 链,从而触发开关映射。