地图中的 RxSwift 多个可观察对象

RxSwift multiple observable in map

我 运行 我会获取一个 API 的情况,它将生成 json 注册用户的数据。然后我将不得不遍历每个用户并从远程 url 获取他们的头像并将其保存到磁盘。我可以在 subscribe 内执行第二个任务,但这不是最佳做法。我正在尝试使用 mapflatMap

来实现它

这是我的示例代码:

self.dataManager.getUsers()
            .observeOn(MainScheduler.instance)
            .subscribeOn(globalScheduler)
            .map{ [unowned self] (data) -> Users in
                var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars

                if let cats = users.categories {
                    for cat in cats  {
                        if let profiles = cat.profiles {
                            for profile in profiles {
                                if let thumbnail = profile.thumbnail,
                                    let url = URL(string: thumbnail) {
                                    URLSession.shared.rx.response(request: URLRequest(url: url))
                                        .subscribeOn(MainScheduler.instance)
                                        .subscribe(onNext: { response in
                                            // Update Image
                                            if let img = UIImage(data: response.data) {
                                                try? Disk.save(img, to: .caches, as: url.lastPathComponent)
                                            }
                                        }, onError: { (error) in

                                        }).disposed(by: self.disposeBag)
                                }
                            }
                        }
                    }
                }

                return users
            }
            .subscribe(onSuccess: { [weak self] (users) in

            }).disposed(by: disposeBag)

此代码中有 2 个问题。首先是 URLSession 上的 rx,它在另一个线程的后台执行任务,并且当此操作完成时无法确认主要的 subscribe。其次是循环和 rx,它效率不高,因为它应该生成多个 observable,然后对其进行处理。

欢迎任何改进此逻辑的想法。

这是一个有趣的谜题。

解决问题的"special sauce"在这一行:

.flatMap { 
    Observable.combineLatest([=10=].map { 
        Observable.combineLatest(
            Observable.just([=10=].0), 
            URLSession.shared.rx.data(request: [=10=].1)
                .materialize()
        ) 
    }) 
}

该行之前的 map 创建一个 Observable<[(URL, URLRequest)]> 并且有问题的行将其转换为 Observable<[(URL, Event<Data>)]>.

该行通过以下方式执行此操作:

  1. 设置网络调用以创建 Observable<Data>
  2. 实现它以创建一个 Observable<Event<Data>>(这样做是为了让一次下载中的错误不会关闭整个流。)
  3. 将 URL 提升回 Observable,这给了我们一个 Observable<URL>
  4. 将第 2 步和第 3 步中的可观察结果结合起来生成 Observable<(URL, Event<Data>)>
  5. 映射每个数组元素以生成 [Observable<(URL, Event<Data>)>]
  6. 合并该数组中的可观察值最终产生 Observable<[(URL, Event<Data>)]>

这是代码

// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
    .map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
    .asObservable()
    .share(replay: 1)

// this chain is for handling the users object. You left it blank in your code so I did too.
users
    .observeOn(MainScheduler.instance)
    .subscribe(onNext: { users in

    })
    .disposed(by: disposeBag)

// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { [=11=].categories ?? [] }
    .map { [=11=].flatMap { [=11=].profiles ?? [] } }
    .map { [=11=].compactMap { [=11=].thumbnail } }
    .map { [=11=].compactMap { URL(string: [=11=]) } }
    .map { [=11=].map { ([=11=], URLRequest(url: [=11=])) } }
    .flatMap { 
        Observable.combineLatest([=11=].map { 
            Observable.combineLatest(
                Observable.just([=11=].0), 
                URLSession.shared.rx.data(request: [=11=].1)
                    .materialize()
            ) 
        }) 
    }
    .flatMap { Observable.from([=11=]) }
    .share(replay: 1)

// this chain filters out the errors and saves the successful downloads.
images
    .filter { [=11=].1.element != nil }
    .map { ([=11=].0, [=11=].1.element!) }
    .map { ([=11=].0, UIImage(data: [=11=].1)!) }
    .observeOn(MainScheduler.instance)
    .bind(onNext: { url, image in
        try? Disk.save(image, to: .caches, as: url.lastPathComponent)
        return // need two lines here because this needs to return Void, not Void?
    })
    .disposed(by: disposeBag)

// this chain handles the download errors if you want to.
images
    .filter { [=11=].1.error != nil }
    .bind(onNext: { url, error in
        print("failed to download \(url) because of \(error)")
    })
    .disposed(by: disposeBag)