合并发布者时如何处理错误?

How are errors handled when combining publishers?

我最近开始使用 Combine,我正在尝试创建一个 return 多个数据源组合的存储库。为此,每个数据源都以非常简单的加载方法加载自己的数据:

func loadData() -> AnyPublisher<DataObject, Error>

要合并数据源,我想使用 combineLatest,因为它会等待数据源完成加载,然后它会发布包含数据的组合集或错误指示失败了:

func loadData() -> AnyPublisher<[DataObject], Error> {
   return dataSource1.loadData()
             .combineLatest(dataSource2.loadData())
             .map { [=11=].0 + [=11=].1 }
             .eraseToAnyPublisher()
}

总的来说,它的行为似乎没问题,我可以调用 repository.loadData(),我会得到一个包含两个项目数据的数组。但是,如果任何数据源出现故障,情况就不是这样了。在这种情况下,无论其他数据源是否成功,加载方法都会 return 出错。

是否有标准或推荐的方法来收集合并发布者时的所有错误?在我的使用上下文中,我希望仅当两个发布者都失败时才能够丢弃错误,但如果只有其中一个发布者失败,我会通过并成功。

认为你是说你想要这个:

  1. 如果 dataSource1 失败并且 dataSource2 产生输出,丢弃 dataSource1 的失败并仅传递 dataSource2 的输出。

  2. 如果 dataSource1 产生输出而 dataSource2 失败,则仅传递 dataSource1 的输出并丢弃 dataSource2 的失败。

  3. 如果 dataSource1dataSource2 都产生输出,则传递组合输出。

  4. 如果 dataSource1dataSource2 都失败,请传递其中一个错误。

我假设每个数据源最多产生一个输出。这是一个测试设置:

typealias DataObject = String

struct DataSource {
    var result: Result<DataObject, Error>

    func loadData() -> AnyPublisher<DataObject, Error> {
        return result.publisher.eraseToAnyPublisher()
    }
}

我们确实想使用 combineLatest,但我们不能让 combineLatest 的任何一个输入失败,因为那会使 combineLatest 失败。如果 both 数据源失败,我们只希望 combineLatest 失败。因此,我们需要一种方法将错误作为其输入发布者之一的 output 传递给 combineLatest,而不是作为 failure它的输入出版商之一。

我们通过将每个输入发布者转换为具有 Result<DataObject, Error>OutputNeverFailure 来做到这一点。

func combine(
    _ source1: DataSource,
    _ source2: DataSource
) -> AnyPublisher<[DataObject], Error> {
    let ds1 = source1.loadData()
        .map { Result.success([=11=]) }
        .catch { Just(Result.failure([=11=])) }

    let ds2 = source2.loadData()
        .map { Result.success([=11=]) }
        .catch { Just(Result.failure([=11=])) }

    let combo = ds1.combineLatest(ds2)
        .tryMap { r1, r2 -> [DataObject] in
            switch (r1, r2) {
            case (.success(let s1), .success(let s2)): return [s1, s2]
            case (.success(let s1), .failure(_)): return [s1]
            case (.failure(_), .success(let s2)): return [s2]
            case (.failure(let f1), .failure(_)): throw f1
            }
        }

    return combo.eraseToAnyPublisher()
}

我们来测试一下:

struct MockError: Error { }

combine(.init(result: .success("hello")), .init(result: .success("world")))
    .sink(
        receiveCompletion: { print([=12=]) },
        receiveValue: { print([=12=]) })
// Output:
// ["hello", "world"]
// finished

combine(.init(result: .success("hello")), .init(result: .failure(MockError())))
    .sink(
        receiveCompletion: { print([=12=]) },
        receiveValue: { print([=12=]) })
// Output:
// ["hello"]
// finished

combine(.init(result: .failure(MockError())), .init(result: .success("world")))
    .sink(
        receiveCompletion: { print([=12=]) },
        receiveValue: { print([=12=]) })
// Output:
// ["world"]
// finished

combine(.init(result: .failure(MockError())), .init(result: .failure(MockError())))
    .sink(
        receiveCompletion: { print([=12=]) },
        receiveValue: { print([=12=]) })
// Output:
// failure(__lldb_expr_28.MockError())