发布者发出操作进度和最终值
Publisher emitting progress of operation and final value
鉴于我有一个提供以下功能的 SDK
class SDK {
static func upload(completion: @escaping (Result<String, Error>) -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
completion(.success("my_value"))
}
}
}
我能够创建一个包装器以使其更实用
class CombineSDK {
func upload() -> AnyPublisher<String, Error> {
Future { promise in
SDK.upload { result in
switch result {
case .success(let key):
promise(.success(key))
case .failure(let error):
promise(.failure(error))
}
}
}.eraseToAnyPublisher()
}
}
现在我想了解如果 SDK 上传方法还提供如下进度块,我的 CombineSDK.upload 方法应该是什么样子:
class SDK {
static func upload(progress: @escaping (Double) -> Void, completion: @escaping (Result<String, Error>) -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {
progress(0.5)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
progress(1)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
completion(.success("s3Key"))
}
}
}
这是可能的方法
extension CombineSDK {
func upload() -> AnyPublisher<(Double, String?), Error> {
let publisher = PassthroughSubject<(Double, String?), Error>()
SDK.upload(progress: { value in
publisher.send((value, nil))
}, completion: { result in
switch result {
case .success(let key):
publisher.send((1.0, key))
publisher.send(completion: .finished)
case .failure(let error):
publisher.send(completion: .failure(error))
}
})
return publisher.eraseToAnyPublisher()
}
}
我们需要您的发布商的 Output
类型来表示进度或最终值。所以我们应该使用enum
。由于 Foundation 框架已经定义了一个名为 Progress
的类型,我们将其命名为 Progressable
以避免名称冲突。我们不妨将其设为通用:
enum Progressable<Value> {
case progress(Double)
case value(Value)
}
现在我们需要考虑发布者应该如何表现。一个典型的发布者,比如 URLSession.DataTaskPublisher
, doesn't do anything until it gets a subscription, and it starts its work fresh for each subscription. The retry
运营商只有在上游发布者表现得像这样时才有效。
所以我们的发布商也应该这样做:
extension SDK {
static func uploadPublisher() -> UploadPublisher {
return UploadPublisher()
}
struct UploadPublisher: Publisher {
typealias Output = Progressable<String>
typealias Failure = Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
<#code#>
}
}
}
创建发布者(通过调用 SDK.uploadPublisher()
)不会启动任何工作。我们将用开始上传的代码替换 <#code#>
:
extension SDK {
static func uploadPublisher() -> UploadPublisher {
return UploadPublisher()
}
struct UploadPublisher: Publisher {
typealias Output = Progressable<String>
typealias Failure = Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let subject = PassthroughSubject<Output, Failure>()
subject.receive(subscriber: subscriber)
upload(
progress: { subject.send(.progress([=12=])) },
completion: {
switch [=12=] {
case .success(let value):
subject.send(.value(value))
subject.send(completion: .finished)
case .failure(let error):
subject.send(completion: .failure(error))
}
}
)
}
}
}
请注意,我们在 开始上传之前调用 subject.receive(subscriber: subscriber)
。这个很重要!如果 upload
在返回之前同步调用其回调之一怎么办?通过在调用上传之前将订阅者传递给主题,我们确保即使 upload
同步调用其回调,订阅者也有机会收到通知。
Note: started writing an answer that's has a largely similar intent to @robmayoff's answer, but using Deferred
, so posting here for completeness.
Swift Combine 仅适用于值和错误 - 没有单独的进度类型。但是您可以将进度建模为输出的一部分,或者作为元组,如另一个答案中所建议的,或者作为自定义枚举,同时将进度和结果作为案例,这是我的首选方法。
class CombineSDK {
enum UploadProgress<T> {
case progress(Double)
case result(T)
}
func upload() -> AnyPublisher<UploadProgress<String>, Error> {
Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
let subject = PassthroughSubject<UploadProgress<String>, Error>()
SDK.upload(
progress: { subject.send(.progress([=10=])) },
completion: { r in
let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
})
return subject.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}
编辑
根据@robmayoff 的评论,上述解决方案不处理在返回 subject
之前调用 subject.send
的同步情况。
解决方案在很大程度上是相同的,但它确实引入了一个小的并发症,即必须捕获这些值,以防万一。这可以通过 Record
来完成,它将为 subject
提供一个临时接收器
func upload() -> AnyPublisher<UploadProgress<String>, Error> {
Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
let subject = PassthroughSubject<UploadProgress<String>, Error>()
var recording = Record<UploadProgress<String>, Error>.Recording()
subject.sink(
receiveCompletion: { recording.receive(completion: [=11=]) },
receiveValue: { recording.receive([=11=]) })
SDK.upload(
progress: { subject.send(.progress([=11=])) },
completion: { r in
let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
})
return Record(recording: recording).append(subject).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
鉴于我有一个提供以下功能的 SDK
class SDK {
static func upload(completion: @escaping (Result<String, Error>) -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
completion(.success("my_value"))
}
}
}
我能够创建一个包装器以使其更实用
class CombineSDK {
func upload() -> AnyPublisher<String, Error> {
Future { promise in
SDK.upload { result in
switch result {
case .success(let key):
promise(.success(key))
case .failure(let error):
promise(.failure(error))
}
}
}.eraseToAnyPublisher()
}
}
现在我想了解如果 SDK 上传方法还提供如下进度块,我的 CombineSDK.upload 方法应该是什么样子:
class SDK {
static func upload(progress: @escaping (Double) -> Void, completion: @escaping (Result<String, Error>) -> Void) {
DispatchQueue.main.asyncAfter(deadline: .now() + 0.5) {
progress(0.5)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
progress(1)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
completion(.success("s3Key"))
}
}
}
这是可能的方法
extension CombineSDK {
func upload() -> AnyPublisher<(Double, String?), Error> {
let publisher = PassthroughSubject<(Double, String?), Error>()
SDK.upload(progress: { value in
publisher.send((value, nil))
}, completion: { result in
switch result {
case .success(let key):
publisher.send((1.0, key))
publisher.send(completion: .finished)
case .failure(let error):
publisher.send(completion: .failure(error))
}
})
return publisher.eraseToAnyPublisher()
}
}
我们需要您的发布商的 Output
类型来表示进度或最终值。所以我们应该使用enum
。由于 Foundation 框架已经定义了一个名为 Progress
的类型,我们将其命名为 Progressable
以避免名称冲突。我们不妨将其设为通用:
enum Progressable<Value> {
case progress(Double)
case value(Value)
}
现在我们需要考虑发布者应该如何表现。一个典型的发布者,比如 URLSession.DataTaskPublisher
, doesn't do anything until it gets a subscription, and it starts its work fresh for each subscription. The retry
运营商只有在上游发布者表现得像这样时才有效。
所以我们的发布商也应该这样做:
extension SDK {
static func uploadPublisher() -> UploadPublisher {
return UploadPublisher()
}
struct UploadPublisher: Publisher {
typealias Output = Progressable<String>
typealias Failure = Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
<#code#>
}
}
}
创建发布者(通过调用 SDK.uploadPublisher()
)不会启动任何工作。我们将用开始上传的代码替换 <#code#>
:
extension SDK {
static func uploadPublisher() -> UploadPublisher {
return UploadPublisher()
}
struct UploadPublisher: Publisher {
typealias Output = Progressable<String>
typealias Failure = Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
let subject = PassthroughSubject<Output, Failure>()
subject.receive(subscriber: subscriber)
upload(
progress: { subject.send(.progress([=12=])) },
completion: {
switch [=12=] {
case .success(let value):
subject.send(.value(value))
subject.send(completion: .finished)
case .failure(let error):
subject.send(completion: .failure(error))
}
}
)
}
}
}
请注意,我们在 开始上传之前调用 subject.receive(subscriber: subscriber)
。这个很重要!如果 upload
在返回之前同步调用其回调之一怎么办?通过在调用上传之前将订阅者传递给主题,我们确保即使 upload
同步调用其回调,订阅者也有机会收到通知。
Note: started writing an answer that's has a largely similar intent to @robmayoff's answer, but using
Deferred
, so posting here for completeness.
Swift Combine 仅适用于值和错误 - 没有单独的进度类型。但是您可以将进度建模为输出的一部分,或者作为元组,如另一个答案中所建议的,或者作为自定义枚举,同时将进度和结果作为案例,这是我的首选方法。
class CombineSDK {
enum UploadProgress<T> {
case progress(Double)
case result(T)
}
func upload() -> AnyPublisher<UploadProgress<String>, Error> {
Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
let subject = PassthroughSubject<UploadProgress<String>, Error>()
SDK.upload(
progress: { subject.send(.progress([=10=])) },
completion: { r in
let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
})
return subject.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}
编辑
根据@robmayoff 的评论,上述解决方案不处理在返回 subject
之前调用 subject.send
的同步情况。
解决方案在很大程度上是相同的,但它确实引入了一个小的并发症,即必须捕获这些值,以防万一。这可以通过 Record
来完成,它将为 subject
func upload() -> AnyPublisher<UploadProgress<String>, Error> {
Deferred { () -> AnyPublisher<UploadProgress<String>, Error> in
let subject = PassthroughSubject<UploadProgress<String>, Error>()
var recording = Record<UploadProgress<String>, Error>.Recording()
subject.sink(
receiveCompletion: { recording.receive(completion: [=11=]) },
receiveValue: { recording.receive([=11=]) })
SDK.upload(
progress: { subject.send(.progress([=11=])) },
completion: { r in
let _ = r.map(UploadProgress.result).publisher.subscribe(subject)
})
return Record(recording: recording).append(subject).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}