使用 ReactiveX 管理多个上传(在 iOS 上使用 Swift 和 Alamofire)
Managing multiple uploads with ReactiveX (on iOS with Swift and Alamofire)
我正在尝试使用 ReactiveX (RxSwift) 将多张照片上传到服务器,收集每个请求的响应,然后发出最后一个请求以完成提交。
在我尝试 reduce
所有响应之前,一切似乎都运行良好。最后的 subscribeNext
永远不会被调用。 (也许我误解了 flatMap
或 reduce
的工作原理?)
具体来说,这就是我尝试执行此过程的方式。
准备一个 observable 来编码每张照片(self.imageMgr
是 PHCachingImageManager()
的一个实例)
func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> {
return create { observer in
self.imageMgr.requestImageForAsset(asset,
targetSize: PHImageManagerMaximumSize,
contentMode: .AspectFit,
options: nil,
resultHandler: { (myImage, myInfo) -> Void in
let data = UIImageJPEGRepresentation(myImage!, 1.0)!
NSLog("Encoded photo")
observer.onNext(data)
self.converts += 1
if self.converts == self.userReview.photos.count {
NSLog("Completed encoding photos")
observer.onCompleted()
}
})
return NopDisposable.instance
}
}
准备一个 observable 来上传每张照片(使用 Alamofire 和 RxAlamofire)
func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> {
return create { observer in
NSLog("Uploading Photo")
upload(.POST,
urlRequest.URLString,
headers: nil,
multipartFormData: { mfd in
mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg")
},
encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold,
encodingCompletion: { encodingResult in
switch encodingResult {
case .Success(let upload, _, _):
upload.responseJSON(completionHandler: { (myResponse) -> Void in
if let photoResponse = myResponse.result.value {
let photoObject = photoResponse.objectForKey("photo")!
let photo = ReviewPhotoObject()
photo.photoID = photoObject.objectForKey("id")! as! NSNumber
NSLog("Uploaded Photo")
observer.onNext(photo)
}
self.uploads += 1
if self.uploads == self.userReview.photos.count {
NSLog("Completed uploading photos")
observer.onCompleted()
}
})
case .Failure(let encodingError):
observer.onError(encodingError)
print(encodingError)
}
})
return NopDisposable.instance
}
}
最后,把它们放在一起
func postReview(review: MyReview) {
self.userReview = review
_ = review.photos.toObservable().flatMap { photos in
return self.getPhotoDataObservable(photos)
}.flatMap { photoData in
return self.getPostPhotoObservable(photoData)
}.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in
accumulator.append(Int(photo.photoID))
return accumulator
}).subscribeNext({ (photoIds) -> Void in
print(photoIds) // Never called
})
}
当运行时(以2张照片为例),这是输出:
Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos
但是 subscribeNext
从未被调用过。由于 RxSwift 的具体文档仍然有点薄,我希望这里的人能给我一些我误解的线索。
这里的想法是,一旦一个 observable 发送完它要发送的所有元素,它就应该完成。您正在为每个 PHAsset 创建一个 observable,并且该 observable 只发送一个元素,因此它应该在此之后完成。按照你的代码方式,只有最后一个会完成,所以 reduce
运算符只是坐在那里等待其余代码完成,然后才能完成它的工作。
这是我编写第一个函数的方式(在 Swift 3 而不是 2 中。)
extension PHImageManager {
func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
return .create { observer in
let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
if let image = image {
observer.onNext(image)
observer.onCompleted()
}
else if let info = info, let error = info[PHImageErrorKey] as? Error {
observer.onError(error)
}
})
return Disposables.create { self.cancelImageRequest(request) }
}
}
}
你会看到我会把它作为 PHImageManager 的扩展而不是免费功能,但这只是风格上的差异。功能上的区别是,如果底层请求出错,我的代码将发出错误,如果订阅者在请求完成之前全部退出,我的代码将取消请求。此外,它不进行 JPEG 转换。保持这些操作小,并在地图内进行 JPEG 转换,如下所示:
let imagesData = review.photos.toObservable().flatMap {
self.imageMgr.requestMaximumSizeImage(for: [=11=])
}.map {
UIImageJPEGRepresentation([=11=], 1.0)
}.filter { [=11=] != nil }.map { [=11=]! }
以上代码向管理器请求图像,然后将它们转换为 JPEG 数据,过滤掉转换失败的任何图像。 imagesData
是 Observable<Data>
类型。
你的 getPostPhotoObservable
很好,除了已完成的问题,以及它不处理一次性取消的事实。此外,您可以将 post 函数 return 设为 Observable,而不是将结果包装在 ReviewPhotoObject 中。
其他警告:
您将它们放在一起的方式并不能确保 ReviewPhotoObject
与照片的顺序相同(因为您无法保证顺序上传将完成。)要解决此问题,如有必要,您需要使用 concat
而不是 flatMap
。
如果任何上传失败,整个管道将断开连接并中止任何后续上传。您可能应该设置一些东西来捕获错误并做一些适当的事情。 catchErrorJustReturn
或 catchError
取决于您的要求。
我正在尝试使用 ReactiveX (RxSwift) 将多张照片上传到服务器,收集每个请求的响应,然后发出最后一个请求以完成提交。
在我尝试 reduce
所有响应之前,一切似乎都运行良好。最后的 subscribeNext
永远不会被调用。 (也许我误解了 flatMap
或 reduce
的工作原理?)
具体来说,这就是我尝试执行此过程的方式。
准备一个 observable 来编码每张照片(
self.imageMgr
是PHCachingImageManager()
的一个实例)func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> { return create { observer in self.imageMgr.requestImageForAsset(asset, targetSize: PHImageManagerMaximumSize, contentMode: .AspectFit, options: nil, resultHandler: { (myImage, myInfo) -> Void in let data = UIImageJPEGRepresentation(myImage!, 1.0)! NSLog("Encoded photo") observer.onNext(data) self.converts += 1 if self.converts == self.userReview.photos.count { NSLog("Completed encoding photos") observer.onCompleted() } }) return NopDisposable.instance } }
准备一个 observable 来上传每张照片(使用 Alamofire 和 RxAlamofire)
func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> { return create { observer in NSLog("Uploading Photo") upload(.POST, urlRequest.URLString, headers: nil, multipartFormData: { mfd in mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg") }, encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold, encodingCompletion: { encodingResult in switch encodingResult { case .Success(let upload, _, _): upload.responseJSON(completionHandler: { (myResponse) -> Void in if let photoResponse = myResponse.result.value { let photoObject = photoResponse.objectForKey("photo")! let photo = ReviewPhotoObject() photo.photoID = photoObject.objectForKey("id")! as! NSNumber NSLog("Uploaded Photo") observer.onNext(photo) } self.uploads += 1 if self.uploads == self.userReview.photos.count { NSLog("Completed uploading photos") observer.onCompleted() } }) case .Failure(let encodingError): observer.onError(encodingError) print(encodingError) } }) return NopDisposable.instance } }
最后,把它们放在一起
func postReview(review: MyReview) { self.userReview = review _ = review.photos.toObservable().flatMap { photos in return self.getPhotoDataObservable(photos) }.flatMap { photoData in return self.getPostPhotoObservable(photoData) }.reduce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in accumulator.append(Int(photo.photoID)) return accumulator }).subscribeNext({ (photoIds) -> Void in print(photoIds) // Never called }) }
当运行时(以2张照片为例),这是输出:
Encoded photo
Uploading photo
Encoded photo
Uploading photo
Completed encoding photos
Uploaded photo
Uploaded photo
Completed uploading photos
但是 subscribeNext
从未被调用过。由于 RxSwift 的具体文档仍然有点薄,我希望这里的人能给我一些我误解的线索。
这里的想法是,一旦一个 observable 发送完它要发送的所有元素,它就应该完成。您正在为每个 PHAsset 创建一个 observable,并且该 observable 只发送一个元素,因此它应该在此之后完成。按照你的代码方式,只有最后一个会完成,所以 reduce
运算符只是坐在那里等待其余代码完成,然后才能完成它的工作。
这是我编写第一个函数的方式(在 Swift 3 而不是 2 中。)
extension PHImageManager {
func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> {
return .create { observer in
let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in
if let image = image {
observer.onNext(image)
observer.onCompleted()
}
else if let info = info, let error = info[PHImageErrorKey] as? Error {
observer.onError(error)
}
})
return Disposables.create { self.cancelImageRequest(request) }
}
}
}
你会看到我会把它作为 PHImageManager 的扩展而不是免费功能,但这只是风格上的差异。功能上的区别是,如果底层请求出错,我的代码将发出错误,如果订阅者在请求完成之前全部退出,我的代码将取消请求。此外,它不进行 JPEG 转换。保持这些操作小,并在地图内进行 JPEG 转换,如下所示:
let imagesData = review.photos.toObservable().flatMap {
self.imageMgr.requestMaximumSizeImage(for: [=11=])
}.map {
UIImageJPEGRepresentation([=11=], 1.0)
}.filter { [=11=] != nil }.map { [=11=]! }
以上代码向管理器请求图像,然后将它们转换为 JPEG 数据,过滤掉转换失败的任何图像。 imagesData
是 Observable<Data>
类型。
你的 getPostPhotoObservable
很好,除了已完成的问题,以及它不处理一次性取消的事实。此外,您可以将 post 函数 return 设为 Observable,而不是将结果包装在 ReviewPhotoObject 中。
其他警告:
您将它们放在一起的方式并不能确保
ReviewPhotoObject
与照片的顺序相同(因为您无法保证顺序上传将完成。)要解决此问题,如有必要,您需要使用concat
而不是flatMap
。如果任何上传失败,整个管道将断开连接并中止任何后续上传。您可能应该设置一些东西来捕获错误并做一些适当的事情。
catchErrorJustReturn
或catchError
取决于您的要求。