"Join" RxJava 中的 Observable 列表

"Join" a list of Observables in RxJava

我一遍又一遍地阅读文档,但找不到适合我的情况的运算符:

 return Observable.fromIterable(formIds)
                            .flatMap(new Function<String, Observable<List<Transmission>>>() {
                                @Override
                                public Observable<List<Transmission>> apply(String formId) {
                                    return getFormTransmissions(formId);
                                }
                            })
                            .toList()
                            .toObservable()
                            .map(new Function<List<List<Transmission>>, List<Transmission>>() {
                                @Override
                                public List<Transmission> apply(List<List<Transmission>> lists) {
                                    List<Transmission> transmissions = new ArrayList<>();
                                    for (List<Transmission> transmissionList : lists) {
                                        if (transmissionList != null) {
                                            transmissions.addAll(transmissionList);
                                        }
                                    }
                                    return transmissions;
                                }
                            });

如您所见,最后一个 map() 我获取了所有可观察对象并组合了它们的内容。我不喜欢这部分,因为我觉得必须有一些 rxjava 函数可以做到这一点,但我找不到。

我将尝试解释代码的作用: 来自 id 列表 对于它们中的每一个,获取每个 id 的传输列表 一旦所有这些都完成,将所有单独的传输列表合并为一个。有什么想法吗?

所以基本上你需要一个简单的 map 可以 flatten 结果。您可以使用 flatMapIterable

return Observable.fromIterable(formIds)
        .flatMap(new Function<String, Observable<List<Transmission>>>() {
            @Override
            public Observable<List<Transmission>> apply(String formId) {
                return getFormTransmissions(formId);
            }
        }) // Observable<List<Transmission>>
        .flatMapIterable(new Function<List<Transmission>, List<Transmission>>() {
                @Override
                public List<Transmission> apply(List<Transmission> list) {
                    return list;
                }
            }) // Observable<Transmission>
        .toList() // Single<List<Transmission>>
        .toObservable(); // Observable<List<Transmission>>

使用 Java 8 个功能更简单:

return Observable.fromIterable(formIds)
        .flatMap(formId -> getFormTransmissions(formId))  // Observable<List<Transmission>>
        .flatMapIterable(list -> list) // Observable<Transmission>
        .toList() // Single<List<Transmission>>
        .toObservable(); // Observable<List<Transmission>>

或者你可以在 Java 流中使用 flatMap:

return Observable.fromIterable(formIds)
        .flatMap(formId -> getFormTransmissions(formId))  // Observable<List<Transmission>>
        .toList() // Single<List<List<Transmission>>>
        .toObservable() // Observable<List<List<Transmission>>>
        .map(listOfList -> listOfList.stream().flatMap(Collection::stream)
                .collect(Collectors.toList()));  // Observable<List<Transmission>>