使用 RxJava + Retrofit 对列表中的每个项目发出 API 请求

Using RxJava + Retrofit to make API requests for each item in a list

我正在尝试通过将多个改造 api 调用链接在一起来创建一个可观察对象。步骤是:

  1. 使用 api 调用
  2. 获取 json 个对象的列表
  3. 对于列表中的每个对象,再进行一次 api 调用以获取有关该项目的更多详细信息
  4. 将从这个新的详细对象中获取的数据写入磁盘上的文件(列表中的每个项目都会发生这种情况)
  5. 最后 return 一个单独对象的可观察对象,需要为之前的每个对象创建一个文件

这是我目前拥有的:

public static Observable<DownloadedFiles> downloadFiles() {
    DownloadedFiles downloadedFiles = new DownloadedFiles();
    Observable.create(subscriber -> {
        return getRestService().getObjectList()
        .flatMapIterable(objects -> objects)
        .flatMap(objectLimited -> getRestService().getObject(objectLimited.getPath()))
        .doOnNext(objectFull -> {

            try {
                File file = new File();
                // Extract data from objectFull and write new file to disk
                // ...
                } catch (IOException e) {
                subscriber.onError(e);
            }

            downloadedFiles.putFile(file);
        })
        .toList()
        .map(objects -> downloadedFiles)
        .finallyDo(() -> {
            subscriber.onNext(downloadedFiles);
            subscriber.onCompleted();
        });
    });
}

@GET("/api/...")
Observable<List<Object>> getObjectList();

@GET("/api/.../{path}")
Observable<Object> getObject(@Path("path") String path);

有人可以确认我使用了正确的运算符吗?谢谢.

我认为像这样的一些应该适合你。您可以只使用来自 REST 服务的 Observable 来映射到每个下载文件,而不是拉入主题以将下载文件发送到其中:

public static Observable<DownloadedFile> downloadFiles() {
    final Observable<Observable<FullObject>> observable =  getRestService().getObjectList()
            .flatMapIterable(objects -> objects)
            .map(objectLimited -> getRestService().getObject(objectLimited.getPath()));

    return  Observable.mergeDelayError(observable)
            .map(fullObject -> {
                try {
                    File file = new File("path");
                    // Extract data from objectFull and write new file to disk
                    // ...

                    return new DownloadedFile();
                } catch (IOException e) {
                    throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(e, fullObject));
                }
            });
}

如果您想在传播任何错误之前发出成功保存的文件,您可能需要考虑使用 mergeDelayError(map()) 而不是 flatMap。

编辑:删除了 Observable.create,改造已经为您创建了一个可观察对象,您只需要对其进行转换。

编辑 2:您也不需要对 subscriber.onError 执行任何操作,如果抛出错误,它将自行调用 subscriber.onError。

很好,不知道为什么要使用 flatmap observable。我宁愿对 Observable::from 做 flatmap,也值得添加 collect。基本上,我会将 1 事物映射到多事物,然后执行一些操作,将许多事物收集回一个事物,然后在我收集完所有发出的项目后订阅那个事物。

public static Observable<DownloadedFiles> downloadFiles() {        
        return getRestService().getObjectList()
        .flatMap(Observable::from)
        .flatMap(objectLimited -> getRestService().getObject(objectLimited.getPath()))
        .doOnNext(objectFull -> {
            try {
                File file = new File();
                // Extract data from objectFull and write new file to disk
                // ...
            } catch (IOException e) {
                new IOException(e);
            }})
      .collect(() -> new DownloadFiles<>(), (files, object) -> {  files.add(object});