RxJava2 Flowable 在不使用创建的情况下发出多个网络调用的结果?

RxJava2 Flowable that emits results of multiple network calls without using create?

我有一个通用屏幕,它订阅了一个 return 列表的 RxJava2 flowable。然后它会显示列表中的内容。

我现在有一个用例,虽然我需要从多个端点收集数据,并在一些完成后发出数据,然后在其余端点完成后再次发出数据。

我正在使用 Flowable.create() 执行此操作,但我看到很多帖子说通常有比使用 create 更好更安全的方法来执行此操作?我似乎相信是这种情况,因为我需要订阅可观察对象中的可观察对象,而理想情况下我不想这样做?

因为我在其中订阅,所以我知道当其他网络调用正在完成时,发射器可能会在可观察对象中被取消,所以我添加了检查以确保它在处理后不会抛出错误(至少在测试中...) [我还记得我有代码可以处理内部订阅,如果我保持这样,当外部订阅被处理时]

前 2 个调用可能非常快(或即时),这就是为什么我想立即发出第一个结果,然后依赖该数据的以下 4 个网络调用可能需要时间来处理。

现在大致是这样的...

return Flowable.create<List<Object>>({ activeEmitter ->
        Single.zip(
                single1(),
                single2(),
                BiFunction { single1Result: Object, single2result: Object ->

                    if (single1result.something || single2Result.somethingElse) {
                        activeEmitter.onNext(function(single1result, single2result) //returns list
                    }

                    Single.zip(
                            single3(single1result),
                            single4(single2result),
                            single5(single1result),
                            single6(single2result),
                            Function4 { single3Result: Object,
                                        single4Result: Object,
                                        single5Result: Object,
                                        single6Result: Object ->
                                ObjectHolder(single1Result, single2Result, single3Result, single4Result, single5Result, single6Result)
                            }
                    )
                }
        ).flatMap { objectHolder ->
            objects.flatMap { objectHolder ->
                Single.just(parseObjects(objectHolder))
            }
        }.subscribeBy(
                onError = { error ->
                    if (!activeEmitter.isCancelled) {
                        activeEmitter.onError(error)
                    }
                },
                onSuccess = { results ->
                    if (!activeEmitter.isCancelled) {
                        activeEmitter.onNext(results)
                        activeEmitter.onComplete()
                    }
                }
        )
    }, BackpressureStrategy.BUFFER)

我想不出另一种方法来 return 一个 Flowable 发出多个不同网络调用的结果而不像这样?

有没有different/better我找不到的方法?


根据 ctranxuan 的回复,我解决了这个问题。发帖让他 tweak/optimize 然后我接受他的回答

 return Single.zip(single1(), single2(),
            BiFunction { single1result: Object, single2result: Object ->
                Pair(single1result, single2result)
            }
    ).toFlowable()
            .flatMap { single1AndSingle2 ->
                if (isFirstLoad) {
                    createItemOrNull(single1AndSingle2.first, single1AndSingle2.second)?.let { result ->
                        Single.just(listOf(result)).mergeWith(proceedWithFinalNetworkCalls(single1AndSingle2))
                    }.orElse {
                        proceedWithFinalNetworkCalls(single1AndSingle2).toFlowable()
                    }
                } else {
                    proceedWithFinalNetworkCalls(single1AndSingle2).toFlowable()
                }
            }.doOnComplete {
                isFirstLoad = false
            }

fun proceedWithFinalNetworkCalls(): Flowable<List> {
          return Single.zip(
                            single3(single1result),
                            single4(single2result),
                            single5(single1result),
                            single6(single2result),
                            Function4 { single3Result: Object,
                                        single4Result: Object,
                                        single5Result: Object,
                                        single6Result: Object ->
                                ObjectHolder(single1Result, single2Result, single3Result, single4Result, single5Result, single6Result)
                            }
)

抱歉,它在 Java 中,但据我了解,类似的方法可能是一个可能的解决方案?

public static void main(String[] args) {
    final Single<String> single1 = single1().cache();

    single1.map(List::of)
           .mergeWith(single1.zipWith(single2(), Map::entry)
                             .flatMap(entry -> Single.zip(
                                                      single3(entry.getKey()),
                                                      single4(entry.getValue()),
                                                      single5(entry.getKey()),
                                                      single6(entry.getValue()),
                                                      (el3, el4, el5, el6) -> objectHolder(entry.getKey(), entry.getValue(), el3, el4, el5, el6))))
           .subscribe(System.out::println,
                      System.err::println);

    Flowable.timer(1, MINUTES)  // Just to block the main thread for a while
            .blockingSubscribe();
}

private static List<String> objectHolder(final String el1,
                                         final String el2,
                                         final String el3,
                                         final String el4,
                                         final String el5,
                                         final String el6) {
    return List.of(el1, el2, el3, el4, el5, el6);
}

static Single<String> single1() {
    return Single.just("s1");
}

static Single<String> single2() {
    return Single.just("s2");
}

static Single<String> single3(String value) {
    return single("s3", value);
}

static Single<String> single4(String value) {
    return single("s4", value);
}

static Single<String> single5(String value) {
    return single("s5", value);
}

static Single<String> single6(String value) {
    return single("s6", value);
}

static Single<String> single(String value1, String value2) {
    return Single.just(value1).map(l -> l + "_" + value2);
}

这输出:

[s1]
[s1, s2, s3_s1, s4_s2, s5_s1, s6_s2]