Swift 将 Future 与多个值组合?

Swift Combine Future with multiple values?

我可能会以错误的方式解决这个问题,但我有一个函数,我想随着时间的推移发出多个值。但我不希望它在订阅该对象之前开始发射。我从 RxSwift 开始合并,所以我基本上是在尝试在 RxSwift 世界中复制 Observable.create() 。我发现最接近的是返回一个 Future,但 futures 只成功或失败(所以它们基本上就像 RxSwift 中的 Single。)

我在这里缺少一些基本的东西吗?我的最终目标是制作一个函数来处理视频文件并发出进度事件直到完成,然后为完成的文件发出 URL。

通常您可以使用 PassthroughSubject 来发布自定义输出。您可以在自己的 Publisher 实现中包装一个 PassthroughSubject(或多个 PassthroughSubject),以确保只有您的进程才能通过主题发送事件。

为了示例目的,让我们模拟一个 VideoFrame 类型和一些输入帧:

typealias VideoFrame = String
let inputFrames: [VideoFrame] = ["a", "b", "c"]

现在我们要编写一个同步处理这些帧的函数。我们的函数应该以某种方式报告进度,最后,它应该 return 输出帧。为了报告进度,我们的函数将采用 PassthroughSubject<Double, Never>,并将其进度(作为从 0 到 1 的分数)发送给主题:

func process(_ inputFrames: [VideoFrame], progress: PassthroughSubject<Double, Never>) -> [VideoFrame] {
    var outputFrames: [VideoFrame] = []
    for input in inputFrames {
        progress.send(Double(outputFrames.count) / Double(inputFrames.count))
        outputFrames.append("output for \(input)")
    }
    return outputFrames
}

好的,现在我们要把它变成一个发布者。发布者需要输出进度和最终结果。所以我们将使用这个 enum 作为它的输出:

public enum ProgressEvent<Value> {
    case progress(Double)
    case done(Value)
}

现在我们可以定义 Publisher 类型了。我们称它为 SyncPublisher,因为当它收到 Subscriber 时,它会立即(同步)执行其整个计算。

public struct SyncPublisher<Value>: Publisher {
    public init(_ run: @escaping (PassthroughSubject<Double, Never>) throws -> Value) {
        self.run = run
    }

    public var run: (PassthroughSubject<Double, Never>) throws -> Value

    public typealias Output = ProgressEvent<Value>
    public typealias Failure = Error

    public func receive<Downstream: Subscriber>(subscriber: Downstream) where Downstream.Input == Output, Downstream.Failure == Failure {
        let progressSubject = PassthroughSubject<Double, Never>()
        let doneSubject = PassthroughSubject<ProgressEvent<Value>, Error>()
        progressSubject
            .setFailureType(to: Error.self)
            .map { ProgressEvent<Value>.progress([=13=]) }
            .append(doneSubject)
            .subscribe(subscriber)
        do {
            let value = try run(progressSubject)
            progressSubject.send(completion: .finished)
            doneSubject.send(.done(value))
            doneSubject.send(completion: .finished)
        } catch {
            progressSubject.send(completion: .finished)
            doneSubject.send(completion: .failure(error))
        }
    }
}

现在我们可以把我们的 process(_:progress:) 函数变成一个 SyncPublisher 像这样:

let inputFrames: [VideoFrame] = ["a", "b", "c"]
let pub = SyncPublisher<[VideoFrame]> { process(inputFrames, progress: [=14=]) }

run 闭包是 { process(inputFrames, progress: [=30=]) }。请记住,这里的 [=31=] 是一个 PassthroughSubject<Double, Never>,正是 process(_:progress:) 想要的第二个参数。

当我们订阅这个pub时,它会首先创建两个主题。一个主题是进度主题并被传递到闭包。我们将使用另一个主题来发布最终结果和 .finished 完成,或者如果 run 闭包抛出错误则仅发布 .failure 完成。

我们使用两个单独主题的原因是因为它确保我们的发布者表现良好。如果 run 闭包 return 正常,则发布者发布零个或多个进度报告,然后是一个结果,然后是 .finished。如果 run 闭包抛出错误,发布者会发布零个或多个进度报告,然后是 .failedrun 闭包无法使发布者发出多个结果,或者在发出结果后发出更多进度报告。

最后我们可以订阅pub看看是否正常:

pub
    .sink(
        receiveCompletion: { print("completion: \([=15=])") },
        receiveValue: { print("output: \([=15=])") })

这是输出:

output: progress(0.0)
output: progress(0.3333333333333333)
output: progress(0.6666666666666666)
output: done(["output for a", "output for b", "output for c"])
completion: finished