发布者发出操作进度和最终值

Publisher emitting progress of operation and final value

鉴于我有一个提供以下功能的 SDK

class SDK {
    static func upload(completion: @escaping (Result<String, Error>) -> Void) {
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            completion(.success("my_value"))
        }
    }
}

我能够创建一个包装器以使其更实用

class CombineSDK {
    func upload() -> AnyPublisher<String, Error> {
        Future { promise in
            SDK.upload { result in
                switch result {
                case .success(let key):
                    promise(.success(key))
                case .failure(let error):
                    promise(.failure(error))
                }
            }
        }.eraseToAnyPublisher()
    }
}

现在我想了解如果 SDK 上传方法还提供如下进度块,我的 CombineSDK.upload 方法应该是什么样子:

class SDK {
    static func upload(progress: @escaping (Double) -> Void, completion: @escaping (Result<String, Error>) -> Void) {
        DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {
            progress(0.5)
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            progress(1)
        }

        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            completion(.success("s3Key"))
        }
    }
}

这是可能的方法

extension CombineSDK {
    func upload() -> AnyPublisher<(Double, String?), Error> {
        let publisher = PassthroughSubject<(Double, String?), Error>()
        SDK.upload(progress: { value in
            publisher.send((value, nil))
        }, completion: { result in
            switch result {
            case .success(let key):
                publisher.send((1.0, key))
                publisher.send(completion: .finished)
            case .failure(let error):
                publisher.send(completion: .failure(error))
            }
        })
        return publisher.eraseToAnyPublisher()
    }
}

我们需要您的发布商的 Output 类型来表示进度或最终值。所以我们应该使用enum。由于 Foundation 框架已经定义了一个名为 Progress 的类型,我们将其命名为 Progressable 以避免名称冲突。我们不妨将其设为通用:

enum Progressable<Value> {
    case progress(Double)
    case value(Value)
}

现在我们需要考虑发布者应该如何表现。一个典型的发布者,比如 URLSession.DataTaskPublisher, doesn't do anything until it gets a subscription, and it starts its work fresh for each subscription. The retry 运营商只有在上游发布者表现得像这样时才有效。

所以我们的发布商也应该这样做:

extension SDK {
    static func uploadPublisher() -> UploadPublisher {
        return UploadPublisher()
    }

    struct UploadPublisher: Publisher {
        typealias Output = Progressable<String>
        typealias Failure = Error

        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            <#code#>
        }
    }
}

创建发布者(通过调用 SDK.uploadPublisher())不会启动任何工作。我们将用开始上传的代码替换 <#code#>

extension SDK {
    static func uploadPublisher() -> UploadPublisher {
        return UploadPublisher()
    }

    struct UploadPublisher: Publisher {
        typealias Output = Progressable<String>
        typealias Failure = Error

        func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
            let subject = PassthroughSubject<Output, Failure>()
            subject.receive(subscriber: subscriber)

           upload(
                progress: { subject.send(.progress([=12=])) },
                completion: {
                    switch [=12=] {
                    case .success(let value):
                        subject.send(.value(value))
                        subject.send(completion: .finished)
                    case .failure(let error):
                        subject.send(completion: .failure(error))
                    }
                }
            )
        }
    }
}

请注意,我们在 开始上传之前调用 subject.receive(subscriber: subscriber) 。这个很重要!如果 upload 在返回之前同步调用其回调之一怎么办?通过在调用上传之前将订阅者传递给主题,我们确保即使 upload 同步调用其回调,订阅者也有机会收到通知。

Note: started writing an answer that's has a largely similar intent to @robmayoff's answer, but using Deferred, so posting here for completeness.

Swift Combine 仅适用于值和错误 - 没有单独的进度类型。但是您可以将进度建模为输出的一部分,或者作为元组,如另一个答案中所建议的,或者作为自定义枚举,同时将进度和结果作为案例,这是我的首选方法。

class CombineSDK {
   enum UploadProgress<T> {
      case progress(Double)
      case result(T)
   }
    
   func upload() -> AnyPublisher<UploadProgress<String>, Error> {

      Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
         let subject = PassthroughSubject<UploadProgress<String>, Error>()

         SDK.upload(
            progress: { subject.send(.progress([=10=])) },
            completion: { r in
               let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
            })

         return subject.eraseToAnyPublisher()
      }
      .eraseToAnyPublisher()
   }
}

编辑

根据@robmayoff 的评论,上述解决方案不处理在返回 subject 之前调用 subject.send 的同步情况。

解决方案在很大程度上是相同的,但它确实引入了一个小的并发症,即必须捕获这些值,以防万一。这可以通过 Record 来完成,它将为 subject

提供一个临时接收器
func upload() -> AnyPublisher<UploadProgress<String>, Error> {

   Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
      
      let subject = PassthroughSubject<UploadProgress<String>, Error>()
      
      var recording = Record<UploadProgress<String>, Error>.Recording()  
      subject.sink(
         receiveCompletion: { recording.receive(completion: [=11=]) }, 
         receiveValue: { recording.receive([=11=]) })

      SDK.upload(
         progress: { subject.send(.progress([=11=])) },
         completion: { r in
            let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
         })
        
      return Record(recording: recording).append(subject).eraseToAnyPublisher()
   }
   .eraseToAnyPublisher()
}