RxSwift:使用运算符组合下载照片并保存在本地

RxSwift: use a combination of operators to download a photo and save it locally

我正在尝试实现一种反应式方式来执行一些操作:

  1. 请求下载照片
  2. next个事件中获取下载进度
  3. 完成后将照片保存到本地

所以我开始使用 RxSwift 并像这样实现它

photoController.downloadPhoto(photoItem.photo)
.doOnNext { downloadTaskInfo in
    photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
}
.flatMapLatest { downloadTaskInfo in
    return PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
}
.observeOn(MainScheduler.instance)
.subscribe(
    onError: { error in
        photoItem.viewState = .NetworkFailed
    },
    onCompleted: {
        photoItem.viewState = .Default
    }
)
.addDisposableTo(disposeBag)

但是 flatMapLatest 没有达到我的预期。我以为 flatMapLatest 可以让我抓取最新的事件并进行另一个操作。

因此,我决定将其替换为 reduce 以实现我的想法,但我认为这不是正确的运算符,因为我不想将所有下载进度加入一个变量中。我想要的是可以等待下载完成然后获取最新版本以继续其他操作的东西,例如在本地保存照片。 使用 concat 我无法收到第一个 Observable 的结果。

我需要

// 
.waitUntilDownloadFinishesAndContinueWith { downloadTaskInfo in
    return PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
}

谁能告诉我设计这个的正确方法?

更新

我决定使用 withLatestFrom,但即便如此我还是遇到了一些问题。 downloadPhotoObservable 处理得太早了。

let downloadPhotoObservable = photoController.downloadPhoto(photoItem.photo)
    .doOnNext { downloadTaskInfo in
        photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
    }

Observable.just(photoItem)
    .withLatestFrom(downloadPhotoObservable)
    .map { downloadTaskInfo in
        PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(
        onError: { error in
            photoItem.viewState = .NetworkFailed
        },
        onCompleted: {
            photoItem.viewState = .Default
        }
    )
    .addDisposableTo(disposeBag)

我肯定做错了。

所以,我找到了一种方法来实现我想要做的事情。我决定 过滤 所有结果并比较最终的 buffer 长度。 buffer 是照片持久化的下一部分。

photoController.downloadPhoto(photoItem.photo)
    .downloadProgress()
    // Receive the download progress
    .doOnNext { downloadTaskInfo in
        photoItem.viewState = .NetworkProgress(task: downloadTaskInfo.task, progress: downloadTaskInfo.progress)
    }
    // Wait for the complete buffer
    .filter { downloadTaskInfo in
        downloadTaskInfo.contentLength == Int64(downloadTaskInfo.buffer.length)
    }
    // Save it locally
    .flatMap { downloadTaskInfo in
        PHPhotoLibrary.savePhoto(downloadTaskInfo.buffer)
    }
    .observeOn(MainScheduler.instance)
    .subscribe(
        onError: { error in
            photoItem.viewState = .NetworkFailed
        },
        onCompleted: {
            photoItem.viewState = .Default
        }
    )
    .addDisposableTo(disposeBag)

顺便说一句,我正在使用 scan 运算符来调用进度信息。我使用名为 downloadProgress:

的自定义运算符创建了一个快捷方式
extension ObservableType where E == NetworkDataTaskInfo {
    func downloadProgress() -> Observable<NetworkDownloadTaskInfo> {
        let seed = NetworkDownloadTaskInfo(task: NopNetworkTask(), buffer: NSMutableData(), progress: 0, contentLength: 0)
        return scan(seed, accumulator: { latestDownloadTaskInfo, currentDataTaskInfo in
            var downloadedProgress: Float = 0
            var contentLength: Int64 = 0

            if let response = currentDataTaskInfo.response {
                // Start
                contentLength = response.expectedContentLength
            }
            else if let data = currentDataTaskInfo.data {
                // Accumulate
                contentLength = latestDownloadTaskInfo.contentLength
                latestDownloadTaskInfo.buffer.appendData(data)
                downloadedProgress = Float(latestDownloadTaskInfo.buffer.length) / Float(contentLength)
            }

            if contentLength <= 0 {
                throw NSURLError.ZeroByteResource
            }

            // Accumulated info
            return NetworkDownloadTaskInfo(
                task: currentDataTaskInfo.task,
                buffer: latestDownloadTaskInfo.buffer,
                progress: downloadedProgress,
                contentLength: contentLength
            )
        })
    }
}