使用 RX 进行调用,然后根据第一次调用的结果进行多个并行调用

Making a call with RX and then making multiple parallel calls from the result of the first call

我必须调用一个 API,其中 return 是一个项目列表。对于此列表中的每个项目,我必须调用另一个 API(如果列表 returns 8 个项目,我将不得不进行 8 个并行调用)。

我最终必须 return 一个列表,我将使用这 8 个并行调用中的每一个的结果创建一个列表。

我如何使用 RxJava 做到这一点?我认为我必须使用 flatMap 将第一次调用的结果转换为 Observable 列表,然后我必须使用 zip 运算符进行并行调用,但我不确定。

请注意,我使用的是 RxJava2,并且没有使用 lambda 表达式。

谢谢!

你可以这样做,例如, defer() 让您仅在订阅时获取数据,然后创建 Observable 发出项目列表中的所有项目(一个接一个)。
然后 flatMap() 将创建 Observable 来获取每个项目的数据,现在您将拥有 Observable 来发出数据对象。 为了收集它,您可以使用 toList() 来发出单个对象(List),该对象将包含每个 Observable 获取的所有 Data

注意,为了并行执行,重要的是 fetchDataFromItem() 将订阅 Schedulers.io(),即使所有流都订阅了 io.

Observable.defer(new Callable<ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> call() throws Exception {
            List<Item> items = getItems();
            return Observable.fromIterable(items);
        }
    })
            .flatMap(new Function<Item, ObservableSource<Data>>() {
                @Override
                public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                    return fetchDataFromItem(item);
                }
            })
            .toList()
            .subscribe(new Consumer<List<Data>>() {
                @Override
                public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
                }
            });

更新:

如果获取的项目已经是 Observable,则 defer() 可以替换为 flatMapIterable(),它采用单个项目列表并将其转换为多个项目的 Observable:

getItemsObservable()
        .flatMapIterable(new Function<List<Item>, Iterable<Item>>() {
            @Override
            public Iterable<Item> apply(@NonNull List<Item> items) throws Exception {
                return items;
            }
        })
        .flatMap(new Function<Item, ObservableSource<Data>>() {
            @Override
            public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                return fetchDataFromItem(item);
            }
        })
        .toList()
        .subscribe(new Consumer<List<Data>>() {
            @Override
            public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
            }
        });