我怎样才能同步订阅一个可观察对象,这样我就不会错过该可观察对象的发射?

How can I synchronously subscribe to an observable so that I don't miss out on emissions from that observable?

我有一个采用 MVVM 架构的 android 应用程序。

视图层(片段)订阅了 onStart() 中 ViewModel 公开的可观察对象。在我对该 observable 调用 subscribe() 之后,我直接调用 ViewModel 来启动它。通过这种直接调用,会发生两件事。首先,被订阅的可观察对象发出一个事件来表示应用程序处于加载状态。接下来,ViewModel 获取一些数据,然后发出该数据。

问题是,我没有收到第一次发射。但是,如果我将我的调用移到生命周期链更远的地方,例如在 onCreate() 中(并将我的调用留在 onStart() 中),我确实会收到发射。显然,对 subscribe() 的调用是异步的,我如何确保在开始发出之前可以订阅一个可观察对象?

这里是第一次发射没有收到的情况

//The fragment
 override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        viewModel = ViewModelProviders.of(this).get(OverviewFragmentViewModel::class.java)
    }

override fun onStart() {
    super.onStart()
    allSubscriptions.add(viewModel.uiStateChanged
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({ uiState ->
            when (uiState) {
                is UiState.Loading -> showLoadingView()
                is UiState.ListReady -> showList(uiState)
                is UiState.Error -> showErrorView()
            }
        }, { error ->
            Log.e(TAG, error.message, error)
        })
    )
    viewModel.loadMovies()
}
}


//The ViewModel

class OverviewFragmentViewModel : ViewModel(){

    val uiStateChanged = PublishSubject.create<UiState>()
    val model = OverviewFragmentRepo()

    companion object {
        val TAG = OverviewFragmentViewModel::class.java.simpleName
    }

    override fun onCleared() {
        super.onCleared()
    }

    fun loadMovies(){
        //This is the emission that happens to fast for the fragment to receive it!
        uiStateChanged.onNext(UiState.Loading())
        model.getMovies()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({response ->
                uiStateChanged.onNext(UiState.ListReady(response.results))
            }, { error ->
                uiStateChanged.onNext(UiState.Error())
                Log.e(TAG, error.message, error)
            })
    }
}

现在,如果我只是将订阅上移,就会收到发射。但是,我不想希望事情按时完成,我想确定这一点,这就是为什么我希望能够保证在直接调用 [= 之前​​我已经订阅了17=]。这是同样的事情,订阅增加了,发射也收到了。

 //The fragment
     override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            viewModel = ViewModelProviders.of(this).get(OverviewFragmentViewModel::class.java)

allSubscriptions.add(viewModel.uiStateChanged
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ uiState ->
                when (uiState) {
                    is UiState.Loading -> showLoadingView()
                    is UiState.ListReady -> showList(uiState)
                    is UiState.Error -> showErrorView()
                }
            }, { error ->
                Log.e(TAG, error.message, error)
            })
        )}
        }

    override fun onStart() {
        super.onStart()
        viewModel.loadMovies()
    }


    //The ViewModel

    class OverviewFragmentViewModel : ViewModel(){

        val uiStateChanged = PublishSubject.create<UiState>()
        val model = OverviewFragmentRepo()

        companion object {
            val TAG = OverviewFragmentViewModel::class.java.simpleName
        }

        override fun onCleared() {
            super.onCleared()
        }

        fun loadMovies(){
            //This is the emission that happens to fast for the fragment to receive it!
            uiStateChanged.onNext(UiState.Loading())
            model.getMovies()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({response ->
                    uiStateChanged.onNext(UiState.ListReady(response.results))
                }, { error ->
                    uiStateChanged.onNext(UiState.Error())
                    Log.e(TAG, error.message, error)
                })
        }
    }

从我的角度来看,您可以在 ViewModel 中创建可观察对象,而不是创建 PublishSubject 并调用 loadMovies(),例如:

val uiStateChanged = model.getMovies()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())    
    .compose(ResponseOrError.toResponseOrErrorObservable())
    .map { if (it.isData) UiState.ListReady(it.data().results) else UiState.Error() }
    .startWith(UiState.Loading())

然后你在 Fragment 中订阅这个 Observable 并且你可以移除 viewModel.loadMovies()

read more about ResponseOrError