全部完成后 ReactiveSwift 管道计数失败

ReactiveSwift pipeline count failures after all complete

我在 ReactiveSwift 中有一个上传管道。我想确保即使其中一个上传失败,其余的也不会中断。

在所有成功或失败完成后,我应该 return 从 performUploads 方法成功,或者如果有任何失败,我应该 return 一个错误,所以下一步,下载部分不会开始。即使有错误,所有的上传都应该有机会上传,不应该停止。

有没有办法判断上传完成后是否有错误? 请参阅此处的方法:

let pendingUploadItemsArray: [Items] = ...
func performUploads() -> SignalProducer<(), MyError> {
    return upload(pendingUploadItemsArray)
        .then(doAnything())
}

private func upload(_ items: [Items]) -> SignalProducer<Signal<(), MyError>.Event, Never> {
    let producers = items
        .filter { item in
            return item.readyForUpload
        }
        .map { self.upload([=10=]).materialize() }
    
    return SignalProducer.merge(producers)
}

private func upload(_ item: Item) -> SignalProducer<(), MyError> {
    return internalUploader.upload(item)
        .on(failed: failedToUpload(item),
            value: successfullyUploaded(item))
        .ignoreValues()
}

其中 internalUploader 上传方法是:

func upload(_ item: Item) -> SignalProducer<Any, MyError>

然后在另一个 class 中,您可以调用此上传器:

let sync = self.uploader.performUploads()
        .then(startDownloads())

如果所有上传都成功完成,startDownloads 应该只 运行。 感谢您的任何见解。

这可能是应该以完全不同的方式完成的事情。

我不确切知道 successfullyUploadedfailedToUpload 在您的代码中做了什么,但大概您正在跟踪成功和失败以提供某种实时进度 UI。这就是我的结构:

struct UploadResult {
    let item: Item
    let error: Error? // nil if the upload succeeded

    var succeeded: Bool { error == nil }
    var failed: Bool { !succeeded }
}

...

static func upload(_ items: [Item]) -> SignalProducer<[UploadResult], Never> {
    SignalProducer(items)
        .filter(\.readyForUpload)
        .flatMap(.merge) { item in
            Self.internalUploader(item)
                .map { UploadResult(item: item, error: nil) }
                .flatMapError { error in
                    SignalProducer(value: UploadResult(item: item, error: error))
                }
        }
        .scan(into: [UploadResult]()) { ( results: inout [UploadResult], nextResult) in
            results.append(nextResult)
        }
}
  1. 我创建了一个 UploadResult 结构来表示上传成功或失败的项目。
  2. upload 函数中,我没有创建生产者数组然后合并它们,而是将项目数组转换为具有 SignalProducer(items) 的项目信号生产者,然后使用 flatMap(.merge) 将上传合并到一个信号生成器中。
  3. 我没有使用 materialize,而是使用 map 将成功的上传转换为 UploadResult,我使用 flatMapError 将失败的上传转换为 UploadResult.
  4. 我使用 scan 在每次上传完成时累积结果。每次上传完成(成功或错误)时,scan 将发送更新的上传结果数组,可用于更新 UI.

那么你可以这样使用它:

Uploader.upload(someItems)
    .on(value: { resultsSoFar in
        // Update UI here
    })
    .take(last: 1)
    .attempt { results in
        if !results.allSatisfy(\.succeeded) {
            // At least one of the uploads failed, so send an error
            throw MyError()
        }
    }
    .then(startDownloads)
  1. 我使用 on(value:) 运算符根据当前结果更新 UI。每次下载成功或失败时,都会使用更新后的结果调用此闭包。
  2. 我用take(last: 1)过滤掉所有中间结果;它只会在所有上传完成后发送最终结果。
  3. 我使用 attempt 检查是否有任何上传失败,如果失败则抛出错误。这确保只有在所有上传都成功后才会开始下载。

希望这可以处理您的用例,但如果我遗漏了更广泛的上下文,请在评论中告诉我!

编辑

如果您只关心一次处理一个结果而不是作为一个 运行 数组,您可以去掉 scan 然后用 [= 替换 take(last: 1) 31=]:

static func upload(_ items: [Item]) -> SignalProducer<UploadResult, Never> {
    SignalProducer(items)
        .filter(\.readyForUpload)
        .flatMap(.merge) { item in
            Self.internalUploader(item)
                .map { UploadResult(item: item, error: nil) }
                .flatMapError { error in
                    SignalProducer(value: UploadResult(item: item, error: error))
                }
        }
}

...

Uploader.upload(someItems)
    .on(value: { latestResult in
        // Do something with the latest result
    })
    .collect()
    .attempt { results in
        if !results.allSatisfy(\.succeeded) {
            // At least one of the uploads failed, so send an error
            throw MyError()
        }
    }
    .then(startDownloads)