仅当元素想要完成而不发布任何值时才将元素附加到发布者

Append element to publisher only if it wants to finish without publishing any value

我有两个发布者,它们都可以发布一个值或完成而不发布任何值。我合并了两个发布者并比较它们的值并对值进行一些 post 处理并将它们保存到我的本地 CoreData。简化代码如下:

let p1 = ["1"]
    .publisher
    .map { Int([=10=]) }

let p2 = ["2"]
    .publisher
    .map { Int([=10=]) }

let p1p2 = p1.combineLatest(p2)
    .map { [=10=] ==  }
    .sink { print([=10=]) }

但是如果其中一个发布者没有发布任何值,我仍然需要将剩余的值保存到我的本地数据库中。问题在于,如果其中一个发布者在没有触发值的情况下结束,则 combineLatest 运算符不会触发任何事件。我试过前置和追加运算符,但它们不能与任何条件结合使用,比如如果其中一个发布者完成而没有发布任何值。

例如:

let p1String: [String] = []
let p1 = p1String.publisher
    .map { Int([=11=]) }
    .if(completionWithoutPublishing, perform: { prepend(nil) })

如有任何想法,我们将不胜感激。

您可以创建执行此操作的自定义运算符。您只需要实施 receive(subscriber:)。您可以通过将自定义逻辑添加到上游发布者(在您的情况下为 map),然后将订阅者参数附加到它来实现。

struct IfEmpty<Upstream: Publisher>: Publisher {
    let upstream: Upstream
    let output: Output
    let handler: (() -> Void)?
    
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input {
        var hasOutput = false
        upstream.handleEvents(receiveOutput: { (_) in
            hasOutput = true
        }, receiveCompletion: { (_) in
            if !hasOutput {
                subscriber.receive(output)
                handler?()
            }
        }).receive(subscriber: subscriber)

    }
    
    typealias Output = Upstream.Output
    
    typealias Failure = Upstream.Failure
}

extension Publisher {
    func ifEmpty(publish output: Output, andDo handler: (() -> Void)? = nil) -> IfEmpty<Self> {
        IfEmpty(upstream: self, output: output, handler: handler)
    }
}

示例:

let p1String: [String] = []
let p1 = p1String.publisher
    .map { Int([=11=]) }
    .ifEmpty(publish: nil)
    .print()
    .sink(receiveValue: { _ in })
/*
receive subscription: (HandleEvents)
request unlimited
receive value: (nil)
receive finished
*/
let p1String: [String] = ["1", "2", "3"]
let p1 = p1String.publisher
    .map { Int([=12=]) }
    .ifEmpty(publish: nil)
    .print()
    .sink(receiveValue: { _ in })
receive subscription: (HandleEvents)
request unlimited
receive value: (Optional(1))
receive value: (Optional(2))
receive value: (Optional(3))
receive finished
*/

您正在寻找 replaceEmpty 运算符:

https://developer.apple.com/documentation/combine/publishers/zip4/replaceempty(with:)

https://www.apeth.com/UnderstandingCombine/operators/operatorsTransformersBlockers/operatorsreplaceempty.html

它的作用正是您所描述的:仅当上游完成但尚未发布时,它才会发出其特殊值。