使用 RX Android 进行并行 API 调用

make parallel API calls using RX Android

我有两个不同的端点用于为用户获取数据。我正在对 RX 适配器工厂进行改造。如果我依次调用单个方法中的两个端点,它是否被视为在两个不同线程上执行的并行调用。如果不是,我如何使用 RX 使这些 API 调用并行?或一种在并行获取数据的同时获得响应的方法。例如,第一个端点可能需要 5 秒,而第二个端点需要 7 秒,但结束响应将在 7 秒后可用。

fun fetchData() {
    api.getData()
        .subscribeOn(Schedulers.io())
        .subscribe(
            { profileResponse ->
                //ProfileResponse Object
                Timber.d("profileResponse: $profileResponse")
                //store response for later use
                Cache.save("key", profileResponse.toString())
            },
            {
                Timber.e("error")
            }
        )


    api2.getData()
        .subscribeOn(Schedulers.io())
        .subscribe(
            { profileDetails ->
                //profileDetails Object
                Timber.d("profileDetails: $profileDetails")
            },
            {
                Timber.e("error")
            }
        )
}

If I make a call to both the endpoints inside a single method sequentially is it considered to be a parallel call executing on two different threads. If not how could I make these API calls parallel using RX?

它们是平行的。您正在订阅 IO 调度程序上的可观察对象,而不是阻塞等待响应。

or a way to get the response at the same time while fetching the data in parallel. for example, the first endpoint could take 5 seconds while the second takes 7 seconds but the end response would be available after 7 seconds.

一种方法是使用 zip() 将您的 observable 组合成一个单一的 observable,在所有源都发出时发出。

首先,您正在为每个可观察对象使用 subscribeOn(),因此它已经在并行执行。

Is there a way to get the response at the same time while fetching the data in parallel. for example, the first endpoint could take 5 seconds while the second takes 7 seconds but the end response would be available after 7 seconds.

为此,您可以像下面这样使用 Observable.zip,其中所需的时间是 最多两次调用:

val disposable = Observable.zip(
        firstNetworkCall().subscribeOn(Schedulers.io()),
        secondNetworkCall().subscribeOn(Schedulers.io()),
        BiFunction{
            firstResonse: ResponseOneType,
            secondResponse: ResponseTwoType ->
            combineResult(firstResponse, secondResponse) }))
.observeOn(AndroidSchedulers.mainThread())
        .subscribe { it -> doSomethingWithIndividualResponse(it) }

这个 article 可能有助于形象化地了解它是如何工作的。