地图中的 RxSwift 多个可观察对象
RxSwift multiple observable in map
我 运行 我会获取一个 API 的情况,它将生成 json 注册用户的数据。然后我将不得不遍历每个用户并从远程 url 获取他们的头像并将其保存到磁盘。我可以在 subscribe
内执行第二个任务,但这不是最佳做法。我正在尝试使用 map
、flatMap
等
来实现它
这是我的示例代码:
self.dataManager.getUsers()
.observeOn(MainScheduler.instance)
.subscribeOn(globalScheduler)
.map{ [unowned self] (data) -> Users in
var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars
if let cats = users.categories {
for cat in cats {
if let profiles = cat.profiles {
for profile in profiles {
if let thumbnail = profile.thumbnail,
let url = URL(string: thumbnail) {
URLSession.shared.rx.response(request: URLRequest(url: url))
.subscribeOn(MainScheduler.instance)
.subscribe(onNext: { response in
// Update Image
if let img = UIImage(data: response.data) {
try? Disk.save(img, to: .caches, as: url.lastPathComponent)
}
}, onError: { (error) in
}).disposed(by: self.disposeBag)
}
}
}
}
}
return users
}
.subscribe(onSuccess: { [weak self] (users) in
}).disposed(by: disposeBag)
此代码中有 2 个问题。首先是 URLSession
上的 rx
,它在另一个线程的后台执行任务,并且当此操作完成时无法确认主要的 subscribe
。其次是循环和 rx,它效率不高,因为它应该生成多个 observable,然后对其进行处理。
欢迎任何改进此逻辑的想法。
这是一个有趣的谜题。
解决问题的"special sauce"在这一行:
.flatMap {
Observable.combineLatest([=10=].map {
Observable.combineLatest(
Observable.just([=10=].0),
URLSession.shared.rx.data(request: [=10=].1)
.materialize()
)
})
}
该行之前的 map
创建一个 Observable<[(URL, URLRequest)]>
并且有问题的行将其转换为 Observable<[(URL, Event<Data>)]>
.
该行通过以下方式执行此操作:
- 设置网络调用以创建
Observable<Data>
- 实现它以创建一个
Observable<Event<Data>>
(这样做是为了让一次下载中的错误不会关闭整个流。)
- 将 URL 提升回 Observable,这给了我们一个
Observable<URL>
- 将第 2 步和第 3 步中的可观察结果结合起来生成
Observable<(URL, Event<Data>)>
。
- 映射每个数组元素以生成
[Observable<(URL, Event<Data>)>]
- 合并该数组中的可观察值最终产生
Observable<[(URL, Event<Data>)]>
这是代码
// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
.map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
.asObservable()
.share(replay: 1)
// this chain is for handling the users object. You left it blank in your code so I did too.
users
.observeOn(MainScheduler.instance)
.subscribe(onNext: { users in
})
.disposed(by: disposeBag)
// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { [=11=].categories ?? [] }
.map { [=11=].flatMap { [=11=].profiles ?? [] } }
.map { [=11=].compactMap { [=11=].thumbnail } }
.map { [=11=].compactMap { URL(string: [=11=]) } }
.map { [=11=].map { ([=11=], URLRequest(url: [=11=])) } }
.flatMap {
Observable.combineLatest([=11=].map {
Observable.combineLatest(
Observable.just([=11=].0),
URLSession.shared.rx.data(request: [=11=].1)
.materialize()
)
})
}
.flatMap { Observable.from([=11=]) }
.share(replay: 1)
// this chain filters out the errors and saves the successful downloads.
images
.filter { [=11=].1.element != nil }
.map { ([=11=].0, [=11=].1.element!) }
.map { ([=11=].0, UIImage(data: [=11=].1)!) }
.observeOn(MainScheduler.instance)
.bind(onNext: { url, image in
try? Disk.save(image, to: .caches, as: url.lastPathComponent)
return // need two lines here because this needs to return Void, not Void?
})
.disposed(by: disposeBag)
// this chain handles the download errors if you want to.
images
.filter { [=11=].1.error != nil }
.bind(onNext: { url, error in
print("failed to download \(url) because of \(error)")
})
.disposed(by: disposeBag)
我 运行 我会获取一个 API 的情况,它将生成 json 注册用户的数据。然后我将不得不遍历每个用户并从远程 url 获取他们的头像并将其保存到磁盘。我可以在 subscribe
内执行第二个任务,但这不是最佳做法。我正在尝试使用 map
、flatMap
等
这是我的示例代码:
self.dataManager.getUsers()
.observeOn(MainScheduler.instance)
.subscribeOn(globalScheduler)
.map{ [unowned self] (data) -> Users in
var users = data
// other code for manipulating users goes here
// then below I am trying to use another loop to fetch their avatars
if let cats = users.categories {
for cat in cats {
if let profiles = cat.profiles {
for profile in profiles {
if let thumbnail = profile.thumbnail,
let url = URL(string: thumbnail) {
URLSession.shared.rx.response(request: URLRequest(url: url))
.subscribeOn(MainScheduler.instance)
.subscribe(onNext: { response in
// Update Image
if let img = UIImage(data: response.data) {
try? Disk.save(img, to: .caches, as: url.lastPathComponent)
}
}, onError: { (error) in
}).disposed(by: self.disposeBag)
}
}
}
}
}
return users
}
.subscribe(onSuccess: { [weak self] (users) in
}).disposed(by: disposeBag)
此代码中有 2 个问题。首先是 URLSession
上的 rx
,它在另一个线程的后台执行任务,并且当此操作完成时无法确认主要的 subscribe
。其次是循环和 rx,它效率不高,因为它应该生成多个 observable,然后对其进行处理。
欢迎任何改进此逻辑的想法。
这是一个有趣的谜题。
解决问题的"special sauce"在这一行:
.flatMap {
Observable.combineLatest([=10=].map {
Observable.combineLatest(
Observable.just([=10=].0),
URLSession.shared.rx.data(request: [=10=].1)
.materialize()
)
})
}
该行之前的 map
创建一个 Observable<[(URL, URLRequest)]>
并且有问题的行将其转换为 Observable<[(URL, Event<Data>)]>
.
该行通过以下方式执行此操作:
- 设置网络调用以创建
Observable<Data>
- 实现它以创建一个
Observable<Event<Data>>
(这样做是为了让一次下载中的错误不会关闭整个流。) - 将 URL 提升回 Observable,这给了我们一个
Observable<URL>
- 将第 2 步和第 3 步中的可观察结果结合起来生成
Observable<(URL, Event<Data>)>
。 - 映射每个数组元素以生成
[Observable<(URL, Event<Data>)>]
- 合并该数组中的可观察值最终产生
Observable<[(URL, Event<Data>)]>
这是代码
// manipulatedUsers is for the code you commented out.
// users: Observable<Users>
let users = self.dataManager.getUsers()
.map(manipulatedUsers) // manipulatedUsers(_ users: Users) -> Users
.asObservable()
.share(replay: 1)
// this chain is for handling the users object. You left it blank in your code so I did too.
users
.observeOn(MainScheduler.instance)
.subscribe(onNext: { users in
})
.disposed(by: disposeBag)
// This navigates through the users structure and downloads the images.
// images: Observable<(URL, Event<Data>)>
let images = users.map { [=11=].categories ?? [] }
.map { [=11=].flatMap { [=11=].profiles ?? [] } }
.map { [=11=].compactMap { [=11=].thumbnail } }
.map { [=11=].compactMap { URL(string: [=11=]) } }
.map { [=11=].map { ([=11=], URLRequest(url: [=11=])) } }
.flatMap {
Observable.combineLatest([=11=].map {
Observable.combineLatest(
Observable.just([=11=].0),
URLSession.shared.rx.data(request: [=11=].1)
.materialize()
)
})
}
.flatMap { Observable.from([=11=]) }
.share(replay: 1)
// this chain filters out the errors and saves the successful downloads.
images
.filter { [=11=].1.element != nil }
.map { ([=11=].0, [=11=].1.element!) }
.map { ([=11=].0, UIImage(data: [=11=].1)!) }
.observeOn(MainScheduler.instance)
.bind(onNext: { url, image in
try? Disk.save(image, to: .caches, as: url.lastPathComponent)
return // need two lines here because this needs to return Void, not Void?
})
.disposed(by: disposeBag)
// this chain handles the download errors if you want to.
images
.filter { [=11=].1.error != nil }
.bind(onNext: { url, error in
print("failed to download \(url) because of \(error)")
})
.disposed(by: disposeBag)