AnyPublishers 的链接数组

Chaining array of AnyPublishers

我想知道是否有一种方法可以链接 一系列发布者 类似于我们使用常规 flatMap

链接发布者的方式

假设我有三个发布者:publisher1、publisher2、publisher3它们都具有相同的输出、失败类型。例如,它们中的每一个都是 AnyPublisher<String, Never> 并发出单个 String 值。每个发布者的唯一作用是获取自己的值并发出与自己的值相结合的先前值。

我正在寻找与以下伪代码相同的效果:

let pipe = publisher1(value: "")
      .flatMap { publisher2(value: [=10=]) }
      .flatMap { publisher3(value: [=10=]) }

执行流程:

publisher1 (fetches "A") -> publisher2 (fetches "B") -> publisher3 (fetches "C") -> "ABC"

我想为发布者数量未知的数组重现相同的流程 n ([AnyPublisher<String, Never>])

1 -> 2 -> 3 -> ... -> n

如果有任何提示,我将不胜感激,谢谢! :)

如果我理解正确的话,你应该可以在你的发布商上使用append

let pub1: AnyPublisher<String, Never> = ["A1", "B1", "C1"].publisher.eraseToAnyPublisher()
let pub2: AnyPublisher<String, Never> = ["A2", "B2", "C2"].publisher.eraseToAnyPublisher()
let pub3: AnyPublisher<String, Never> = ["A3", "B3", "C3"].publisher.eraseToAnyPublisher()


_ = pub1.append(pub2.append(pub3))
    .sink(receiveValue: { value in
        print(value)
    })

首先,让我们澄清一下你的问题。根据您想要创建 flatMap-ed 发布者链的描述,您必须拥有的是一系列闭包 - 而不是发布者 - 每个返回一个 AnyPublisher<String, Never> 发布者给定 String 参数.

let pubs: [(String) -> AnyPublisher<String, Never>] = [
    publisher1, publisher2, publisher3 
]

要链接它们,您可以使用数组的 reduce 方法,方法是从 Just 发布者开始发出初始参数:

let pipe = pubs.reduce(Just("").eraseToAnyPublisher(), { acc, next in
   acc.flatMap { next([=11=]) }.eraseToAnyPublisher()
}

另一种方法是zip发布者在一起,他们结合最新的值:

let publisher1 = ["A"].publisher
let publisher2 = ["B"].publisher
let publisher3 = ["C"].publisher

_ = publisher1.zip(publisher2, publisher3)
    .map { [=10=]++ }
    .sink(receiveValue: { print("Combined: \([=10=])") })

/// prints ABC

或者,如果您的发布商数量可变,您可以使用 MergeManyreduce

// same result: ABC
_ = Publishers.MergeMany([publisher1, publisher2, publisher3])
    .reduce("") { [=11=] +  }
    .sink(receiveValue: { print("Combined: \([=11=])") })

如果您认为您将在多个地方需要此功能,您可以走得更远,编写您自己的发布者:

extension Publishers {
    /// works also with arrays, or any other range replaceable collection
    struct ConcatenateOutputs<Upstream: Publisher> : Publisher where Upstream.Output: RangeReplaceableCollection {
        typealias Output = Upstream.Output
        typealias Failure = Upstream.Failure
        
        private let reducer: AnyPublisher<Upstream.Output, Failure>
                
        init(_ publishers: Upstream...) {
            self.init(publishers)
        }
        
        init<S: Swift.Sequence>(_ publishers: S) where S.Element == Upstream {
            reducer = MergeMany(publishers)
                .reduce(Output.init()) { [=12=] +  }
                .eraseToAnyPublisher()
        }
        
        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            reducer.receive(subscriber: subscriber)
        }
    }
}

extension Sequence where Element: Publisher, Element.Output: RangeReplaceableCollection {
    var concatenateOutputs: Publishers.ConcatenateOutputs<Element> { .init(self) }
}

// same output
_ = Publishers.ConcatenateOutputs([publisher1, publisher2, publisher3])
    .sink(receiveValue: { print("Combined: \([=12=])") })

// the variadic initializer works the same
_ = Publishers.ConcatenateOutputs(publisher1, publisher2, publisher3)
    .sink(receiveValue: { print("Combined: \([=12=])") })

// the less verbose construct
_ = [publisher1, publisher2, publisher3].concatenateOutputs
    .sink(receiveValue: { print("Combined: \([=12=])") })